Skip to main content

kyu_api/
storage.rs

1//! NodeGroupStorage — bridges kyu-storage's columnar engine with kyu-executor's DataChunk.
2
3use hashbrown::HashMap;
4use kyu_common::id::TableId;
5use kyu_common::{KyuError, KyuResult};
6use kyu_executor::value_vector::{BoolVector, FlatVector, StringVector};
7use kyu_executor::{DataChunk, SelectionVector, Storage, ValueVector};
8use kyu_storage::{ChunkedNodeGroup, ColumnChunk, NodeGroup, NodeGroupIdx, NullMask};
9use kyu_types::{LogicalType, TypedValue};
10
11struct TableData {
12    schema: Vec<LogicalType>,
13    node_group: NodeGroup,
14    /// Soft-delete bitset: bit=1 at position i means row i is deleted.
15    /// Uses the same NullMask from kyu-storage (packed u64, O(1) set/check).
16    deleted: NullMask,
17}
18
19/// Real columnar storage backed by NodeGroup/ColumnChunk.
20#[derive(Debug)]
21pub struct NodeGroupStorage {
22    tables: HashMap<TableId, TableData>,
23}
24
25impl std::fmt::Debug for TableData {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("TableData")
28            .field("schema", &self.schema)
29            .field("num_rows", &self.node_group.num_rows())
30            .finish()
31    }
32}
33
34impl Default for NodeGroupStorage {
35    fn default() -> Self {
36        Self {
37            tables: HashMap::new(),
38        }
39    }
40}
41
42impl NodeGroupStorage {
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Create a table with the given column types.
48    pub fn create_table(&mut self, table_id: TableId, schema: Vec<LogicalType>) {
49        self.tables.insert(
50            table_id,
51            TableData {
52                node_group: NodeGroup::new(NodeGroupIdx(table_id.0), schema.clone()),
53                schema,
54                deleted: NullMask::new(0),
55            },
56        );
57    }
58
59    /// Drop a table.
60    pub fn drop_table(&mut self, table_id: TableId) {
61        self.tables.remove(&table_id);
62    }
63
64    /// Check if a table exists.
65    pub fn has_table(&self, table_id: TableId) -> bool {
66        self.tables.contains_key(&table_id)
67    }
68
69    /// Get the schema (column types) for a table.
70    pub fn table_schema(&self, table_id: TableId) -> Option<&[LogicalType]> {
71        self.tables.get(&table_id).map(|t| t.schema.as_slice())
72    }
73
74    /// Get the number of rows in a table.
75    pub fn num_rows(&self, table_id: TableId) -> u64 {
76        self.tables
77            .get(&table_id)
78            .map_or(0, |t| t.node_group.num_rows())
79    }
80
81    /// Insert a row of TypedValues into a table's NodeGroup.
82    pub fn insert_row(&mut self, table_id: TableId, values: &[TypedValue]) -> KyuResult<()> {
83        let table = self
84            .tables
85            .get_mut(&table_id)
86            .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
87
88        let raw_values: Vec<Option<Vec<u8>>> = values
89            .iter()
90            .zip(&table.schema)
91            .map(|(val, ty)| typed_value_to_bytes(val, ty))
92            .collect();
93
94        let refs: Vec<Option<&[u8]>> = raw_values.iter().map(|opt| opt.as_deref()).collect();
95        table.node_group.append_row(&refs);
96
97        // Grow the deleted mask to cover the new row.
98        let num_rows = table.node_group.num_rows();
99        table.deleted = NullMask::new(num_rows);
100
101        Ok(())
102    }
103
104    /// Update a single cell in-place.
105    pub fn update_cell(
106        &mut self,
107        table_id: TableId,
108        row_idx: u64,
109        col_idx: usize,
110        value: &TypedValue,
111    ) -> KyuResult<()> {
112        let table = self
113            .tables
114            .get_mut(&table_id)
115            .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
116
117        let (chunk_idx, local_row) = table.node_group.global_row_to_chunked_group(row_idx);
118        let chunk = table.node_group.chunked_group_mut(chunk_idx);
119        let col = chunk.column_mut(col_idx);
120
121        match (col, value) {
122            (ColumnChunk::Fixed(c), TypedValue::Null) => {
123                c.set_null(local_row, true);
124            }
125            (ColumnChunk::Fixed(c), TypedValue::Int8(v)) => {
126                c.set_value::<i8>(local_row, *v);
127                c.set_null(local_row, false);
128            }
129            (ColumnChunk::Fixed(c), TypedValue::Int16(v)) => {
130                c.set_value::<i16>(local_row, *v);
131                c.set_null(local_row, false);
132            }
133            (ColumnChunk::Fixed(c), TypedValue::Int32(v)) => {
134                c.set_value::<i32>(local_row, *v);
135                c.set_null(local_row, false);
136            }
137            (ColumnChunk::Fixed(c), TypedValue::Int64(v)) => {
138                c.set_value::<i64>(local_row, *v);
139                c.set_null(local_row, false);
140            }
141            (ColumnChunk::Fixed(c), TypedValue::Float(v)) => {
142                c.set_value::<f32>(local_row, *v);
143                c.set_null(local_row, false);
144            }
145            (ColumnChunk::Fixed(c), TypedValue::Double(v)) => {
146                c.set_value::<f64>(local_row, *v);
147                c.set_null(local_row, false);
148            }
149            (ColumnChunk::Bool(c), TypedValue::Null) => {
150                c.set_null(local_row, true);
151            }
152            (ColumnChunk::Bool(c), TypedValue::Bool(v)) => {
153                c.set_bool(local_row, *v);
154            }
155            (ColumnChunk::String(c), TypedValue::Null) => {
156                c.set_null(local_row, true);
157            }
158            (ColumnChunk::String(c), TypedValue::String(s)) => {
159                c.set_string(local_row, s.clone());
160            }
161            _ => {
162                return Err(KyuError::Storage(format!(
163                    "type mismatch: cannot write {:?} to column {}",
164                    value, col_idx
165                )));
166            }
167        }
168        Ok(())
169    }
170
171    /// Scan all non-deleted rows with their global row indices.
172    /// Returns (global_row_idx, row_values) for each live row.
173    pub fn scan_rows(&self, table_id: TableId) -> KyuResult<Vec<(u64, Vec<TypedValue>)>> {
174        let table = self
175            .tables
176            .get(&table_id)
177            .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
178
179        let num_chunks = table.node_group.num_chunked_groups();
180        let has_deletions = !table.deleted.has_no_nulls_guarantee();
181        let mut rows = Vec::new();
182
183        for chunk_idx in 0..num_chunks {
184            let cng = table.node_group.chunked_group(chunk_idx);
185            let base_row = chunk_idx as u64 * kyu_storage::CHUNKED_NODE_GROUP_CAPACITY;
186            let num_rows = cng.num_rows() as usize;
187
188            let columns: Vec<ValueVector> = (0..cng.num_columns())
189                .map(|col| column_chunk_to_value_vector(cng.column(col), num_rows))
190                .collect();
191
192            for local_row in 0..num_rows {
193                let global_row = base_row + local_row as u64;
194                if has_deletions && table.deleted.is_null(global_row) {
195                    continue;
196                }
197                let row: Vec<TypedValue> =
198                    columns.iter().map(|col| col.get_value(local_row)).collect();
199                rows.push((global_row, row));
200            }
201        }
202
203        Ok(rows)
204    }
205
206    /// Soft-delete a row (mark as deleted; skipped during scans).
207    pub fn delete_row(&mut self, table_id: TableId, row_idx: u64) -> KyuResult<()> {
208        let table = self
209            .tables
210            .get_mut(&table_id)
211            .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
212
213        table.deleted.set_null(row_idx, true);
214        Ok(())
215    }
216}
217
218impl Storage for NodeGroupStorage {
219    fn scan_table(&self, table_id: TableId) -> Box<dyn Iterator<Item = DataChunk> + '_> {
220        let table = match self.tables.get(&table_id) {
221            Some(t) if t.node_group.num_rows() > 0 => t,
222            _ => return Box::new(std::iter::empty()),
223        };
224        let num_chunks = table.node_group.num_chunked_groups();
225        let has_deletions = !table.deleted.has_no_nulls_guarantee();
226        Box::new((0..num_chunks).filter_map(move |idx| {
227            let cng = table.node_group.chunked_group(idx);
228            let base_row = idx as u64 * kyu_storage::CHUNKED_NODE_GROUP_CAPACITY;
229            let chunk = if has_deletions {
230                chunked_group_to_data_chunk_filtered(cng, &table.deleted, base_row)
231            } else {
232                chunked_group_to_data_chunk(cng)
233            };
234            if chunk.num_rows() == 0 {
235                None
236            } else {
237                Some(chunk)
238            }
239        }))
240    }
241}
242
243/// Convert a ChunkedNodeGroup to a DataChunk, skipping rows marked deleted.
244fn chunked_group_to_data_chunk_filtered(
245    cng: &ChunkedNodeGroup,
246    deleted: &NullMask,
247    base_row: u64,
248) -> DataChunk {
249    let num_rows = cng.num_rows() as usize;
250    let columns: Vec<ValueVector> = (0..cng.num_columns())
251        .map(|i| column_chunk_to_value_vector(cng.column(i), num_rows))
252        .collect();
253    let live_indices: Vec<u32> = (0..num_rows)
254        .filter(|&i| !deleted.is_null(base_row + i as u64))
255        .map(|i| i as u32)
256        .collect();
257    let sel = if live_indices.len() == num_rows {
258        SelectionVector::identity(num_rows)
259    } else {
260        SelectionVector::from_indices(live_indices)
261    };
262    DataChunk::from_vectors(columns, sel)
263}
264
265/// Convert a ChunkedNodeGroup to a DataChunk.
266fn chunked_group_to_data_chunk(cng: &ChunkedNodeGroup) -> DataChunk {
267    let num_rows = cng.num_rows() as usize;
268    let columns: Vec<ValueVector> = (0..cng.num_columns())
269        .map(|i| column_chunk_to_value_vector(cng.column(i), num_rows))
270        .collect();
271    DataChunk::from_vectors(columns, SelectionVector::identity(num_rows))
272}
273
274/// Convert a ColumnChunk to a ValueVector.
275fn column_chunk_to_value_vector(chunk: &ColumnChunk, num_rows: usize) -> ValueVector {
276    match chunk {
277        ColumnChunk::Fixed(c) => ValueVector::Flat(FlatVector::from_column_chunk(c, num_rows)),
278        ColumnChunk::Bool(c) => ValueVector::Bool(BoolVector::from_bool_chunk(c, num_rows)),
279        ColumnChunk::String(c) => ValueVector::String(StringVector::from_string_chunk(c, num_rows)),
280    }
281}
282
283/// Convert a TypedValue to raw bytes for storage in a NodeGroup.
284fn typed_value_to_bytes(val: &TypedValue, _ty: &LogicalType) -> Option<Vec<u8>> {
285    match val {
286        TypedValue::Null => None,
287        TypedValue::Bool(b) => Some(vec![if *b { 1u8 } else { 0u8 }]),
288        TypedValue::Int8(v) => Some(v.to_ne_bytes().to_vec()),
289        TypedValue::Int16(v) => Some(v.to_ne_bytes().to_vec()),
290        TypedValue::Int32(v) => Some(v.to_ne_bytes().to_vec()),
291        TypedValue::Int64(v) => Some(v.to_ne_bytes().to_vec()),
292        TypedValue::Float(v) => Some(v.to_ne_bytes().to_vec()),
293        TypedValue::Double(v) => Some(v.to_ne_bytes().to_vec()),
294        TypedValue::String(s) => Some(s.as_bytes().to_vec()),
295        _ => None, // unsupported types stored as null
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use smol_str::SmolStr;
303
304    #[test]
305    fn create_and_scan_empty_table() {
306        let mut storage = NodeGroupStorage::new();
307        storage.create_table(TableId(0), vec![LogicalType::Int64]);
308        assert!(storage.has_table(TableId(0)));
309
310        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
311        assert!(chunks.is_empty()); // no rows yet
312    }
313
314    #[test]
315    fn insert_and_scan_int64() {
316        let mut storage = NodeGroupStorage::new();
317        storage.create_table(TableId(0), vec![LogicalType::Int64]);
318
319        storage.insert_row(TableId(0), &[TypedValue::Int64(42)]).unwrap();
320        storage.insert_row(TableId(0), &[TypedValue::Int64(100)]).unwrap();
321
322        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
323        assert_eq!(chunks.len(), 1);
324        assert_eq!(chunks[0].num_rows(), 2);
325        assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(42));
326        assert_eq!(chunks[0].get_value(1, 0), TypedValue::Int64(100));
327    }
328
329    #[test]
330    fn insert_and_scan_string() {
331        let mut storage = NodeGroupStorage::new();
332        storage.create_table(TableId(0), vec![LogicalType::Int64, LogicalType::String]);
333
334        storage
335            .insert_row(
336                TableId(0),
337                &[TypedValue::Int64(1), TypedValue::String(SmolStr::new("Alice"))],
338            )
339            .unwrap();
340        storage
341            .insert_row(
342                TableId(0),
343                &[TypedValue::Int64(2), TypedValue::String(SmolStr::new("Bob"))],
344            )
345            .unwrap();
346
347        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
348        assert_eq!(chunks.len(), 1);
349        assert_eq!(chunks[0].num_rows(), 2);
350        assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(1));
351        assert_eq!(chunks[0].get_value(0, 1), TypedValue::String(SmolStr::new("Alice")));
352        assert_eq!(chunks[0].get_value(1, 1), TypedValue::String(SmolStr::new("Bob")));
353    }
354
355    #[test]
356    fn insert_and_scan_bool() {
357        let mut storage = NodeGroupStorage::new();
358        storage.create_table(TableId(0), vec![LogicalType::Bool]);
359
360        storage.insert_row(TableId(0), &[TypedValue::Bool(true)]).unwrap();
361        storage.insert_row(TableId(0), &[TypedValue::Bool(false)]).unwrap();
362        storage.insert_row(TableId(0), &[TypedValue::Null]).unwrap();
363
364        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
365        assert_eq!(chunks[0].num_rows(), 3);
366        assert_eq!(chunks[0].get_value(0, 0), TypedValue::Bool(true));
367        assert_eq!(chunks[0].get_value(1, 0), TypedValue::Bool(false));
368        assert_eq!(chunks[0].get_value(2, 0), TypedValue::Null);
369    }
370
371    #[test]
372    fn drop_table() {
373        let mut storage = NodeGroupStorage::new();
374        storage.create_table(TableId(0), vec![LogicalType::Int64]);
375        assert!(storage.has_table(TableId(0)));
376        storage.drop_table(TableId(0));
377        assert!(!storage.has_table(TableId(0)));
378    }
379
380    #[test]
381    fn insert_into_missing_table_errors() {
382        let mut storage = NodeGroupStorage::new();
383        let result = storage.insert_row(TableId(99), &[TypedValue::Int64(1)]);
384        assert!(result.is_err());
385    }
386
387    #[test]
388    fn scan_missing_table_returns_empty() {
389        let storage = NodeGroupStorage::new();
390        let chunks: Vec<DataChunk> = storage.scan_table(TableId(99)).collect();
391        assert!(chunks.is_empty());
392    }
393
394    #[test]
395    fn update_cell_int64() {
396        let mut storage = NodeGroupStorage::new();
397        storage.create_table(TableId(0), vec![LogicalType::Int64]);
398        storage.insert_row(TableId(0), &[TypedValue::Int64(42)]).unwrap();
399
400        storage.update_cell(TableId(0), 0, 0, &TypedValue::Int64(99)).unwrap();
401
402        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
403        assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(99));
404    }
405
406    #[test]
407    fn update_cell_string() {
408        let mut storage = NodeGroupStorage::new();
409        storage.create_table(TableId(0), vec![LogicalType::String]);
410        storage.insert_row(TableId(0), &[TypedValue::String(SmolStr::new("old"))]).unwrap();
411
412        storage.update_cell(TableId(0), 0, 0, &TypedValue::String(SmolStr::new("new"))).unwrap();
413
414        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
415        assert_eq!(chunks[0].get_value(0, 0), TypedValue::String(SmolStr::new("new")));
416    }
417
418    #[test]
419    fn update_cell_to_null() {
420        let mut storage = NodeGroupStorage::new();
421        storage.create_table(TableId(0), vec![LogicalType::Int64]);
422        storage.insert_row(TableId(0), &[TypedValue::Int64(42)]).unwrap();
423
424        storage.update_cell(TableId(0), 0, 0, &TypedValue::Null).unwrap();
425
426        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
427        assert_eq!(chunks[0].get_value(0, 0), TypedValue::Null);
428    }
429
430    #[test]
431    fn delete_row_skips_in_scan() {
432        let mut storage = NodeGroupStorage::new();
433        storage.create_table(TableId(0), vec![LogicalType::Int64]);
434        storage.insert_row(TableId(0), &[TypedValue::Int64(1)]).unwrap();
435        storage.insert_row(TableId(0), &[TypedValue::Int64(2)]).unwrap();
436        storage.insert_row(TableId(0), &[TypedValue::Int64(3)]).unwrap();
437
438        storage.delete_row(TableId(0), 1).unwrap(); // delete row with value 2
439
440        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
441        assert_eq!(chunks[0].num_rows(), 2);
442        assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(1));
443        assert_eq!(chunks[0].get_value(1, 0), TypedValue::Int64(3));
444    }
445
446    #[test]
447    fn delete_all_rows_returns_empty_scan() {
448        let mut storage = NodeGroupStorage::new();
449        storage.create_table(TableId(0), vec![LogicalType::Int64]);
450        storage.insert_row(TableId(0), &[TypedValue::Int64(1)]).unwrap();
451        storage.insert_row(TableId(0), &[TypedValue::Int64(2)]).unwrap();
452
453        storage.delete_row(TableId(0), 0).unwrap();
454        storage.delete_row(TableId(0), 1).unwrap();
455
456        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
457        assert!(chunks.is_empty());
458    }
459}