1use hashbrown::HashMap;
4use kyu_common::id::TableId;
5use kyu_common::{KyuError, KyuResult};
6use kyu_executor::value_vector::{BoolVector, FlatVector, StringVector};
7use kyu_executor::{DataChunk, SelectionVector, Storage, ValueVector};
8use kyu_storage::{ChunkedNodeGroup, ColumnChunk, NodeGroup, NodeGroupIdx, NullMask};
9use kyu_types::{LogicalType, TypedValue};
10
11struct TableData {
12 schema: Vec<LogicalType>,
13 node_group: NodeGroup,
14 deleted: NullMask,
17}
18
19#[derive(Debug)]
21pub struct NodeGroupStorage {
22 tables: HashMap<TableId, TableData>,
23}
24
25impl std::fmt::Debug for TableData {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("TableData")
28 .field("schema", &self.schema)
29 .field("num_rows", &self.node_group.num_rows())
30 .finish()
31 }
32}
33
34impl Default for NodeGroupStorage {
35 fn default() -> Self {
36 Self {
37 tables: HashMap::new(),
38 }
39 }
40}
41
42impl NodeGroupStorage {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn create_table(&mut self, table_id: TableId, schema: Vec<LogicalType>) {
49 self.tables.insert(
50 table_id,
51 TableData {
52 node_group: NodeGroup::new(NodeGroupIdx(table_id.0), schema.clone()),
53 schema,
54 deleted: NullMask::new(0),
55 },
56 );
57 }
58
59 pub fn drop_table(&mut self, table_id: TableId) {
61 self.tables.remove(&table_id);
62 }
63
64 pub fn has_table(&self, table_id: TableId) -> bool {
66 self.tables.contains_key(&table_id)
67 }
68
69 pub fn table_schema(&self, table_id: TableId) -> Option<&[LogicalType]> {
71 self.tables.get(&table_id).map(|t| t.schema.as_slice())
72 }
73
74 pub fn num_rows(&self, table_id: TableId) -> u64 {
76 self.tables
77 .get(&table_id)
78 .map_or(0, |t| t.node_group.num_rows())
79 }
80
81 pub fn insert_row(&mut self, table_id: TableId, values: &[TypedValue]) -> KyuResult<()> {
83 let table = self
84 .tables
85 .get_mut(&table_id)
86 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
87
88 let raw_values: Vec<Option<Vec<u8>>> = values
89 .iter()
90 .zip(&table.schema)
91 .map(|(val, ty)| typed_value_to_bytes(val, ty))
92 .collect();
93
94 let refs: Vec<Option<&[u8]>> = raw_values.iter().map(|opt| opt.as_deref()).collect();
95 table.node_group.append_row(&refs);
96
97 let num_rows = table.node_group.num_rows();
99 table.deleted = NullMask::new(num_rows);
100
101 Ok(())
102 }
103
104 pub fn update_cell(
106 &mut self,
107 table_id: TableId,
108 row_idx: u64,
109 col_idx: usize,
110 value: &TypedValue,
111 ) -> KyuResult<()> {
112 let table = self
113 .tables
114 .get_mut(&table_id)
115 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
116
117 let (chunk_idx, local_row) = table.node_group.global_row_to_chunked_group(row_idx);
118 let chunk = table.node_group.chunked_group_mut(chunk_idx);
119 let col = chunk.column_mut(col_idx);
120
121 match (col, value) {
122 (ColumnChunk::Fixed(c), TypedValue::Null) => {
123 c.set_null(local_row, true);
124 }
125 (ColumnChunk::Fixed(c), TypedValue::Int8(v)) => {
126 c.set_value::<i8>(local_row, *v);
127 c.set_null(local_row, false);
128 }
129 (ColumnChunk::Fixed(c), TypedValue::Int16(v)) => {
130 c.set_value::<i16>(local_row, *v);
131 c.set_null(local_row, false);
132 }
133 (ColumnChunk::Fixed(c), TypedValue::Int32(v)) => {
134 c.set_value::<i32>(local_row, *v);
135 c.set_null(local_row, false);
136 }
137 (ColumnChunk::Fixed(c), TypedValue::Int64(v)) => {
138 c.set_value::<i64>(local_row, *v);
139 c.set_null(local_row, false);
140 }
141 (ColumnChunk::Fixed(c), TypedValue::Float(v)) => {
142 c.set_value::<f32>(local_row, *v);
143 c.set_null(local_row, false);
144 }
145 (ColumnChunk::Fixed(c), TypedValue::Double(v)) => {
146 c.set_value::<f64>(local_row, *v);
147 c.set_null(local_row, false);
148 }
149 (ColumnChunk::Bool(c), TypedValue::Null) => {
150 c.set_null(local_row, true);
151 }
152 (ColumnChunk::Bool(c), TypedValue::Bool(v)) => {
153 c.set_bool(local_row, *v);
154 }
155 (ColumnChunk::String(c), TypedValue::Null) => {
156 c.set_null(local_row, true);
157 }
158 (ColumnChunk::String(c), TypedValue::String(s)) => {
159 c.set_string(local_row, s.clone());
160 }
161 _ => {
162 return Err(KyuError::Storage(format!(
163 "type mismatch: cannot write {:?} to column {}",
164 value, col_idx
165 )));
166 }
167 }
168 Ok(())
169 }
170
171 pub fn scan_rows(&self, table_id: TableId) -> KyuResult<Vec<(u64, Vec<TypedValue>)>> {
174 let table = self
175 .tables
176 .get(&table_id)
177 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
178
179 let num_chunks = table.node_group.num_chunked_groups();
180 let has_deletions = !table.deleted.has_no_nulls_guarantee();
181 let mut rows = Vec::new();
182
183 for chunk_idx in 0..num_chunks {
184 let cng = table.node_group.chunked_group(chunk_idx);
185 let base_row = chunk_idx as u64 * kyu_storage::CHUNKED_NODE_GROUP_CAPACITY;
186 let num_rows = cng.num_rows() as usize;
187
188 let columns: Vec<ValueVector> = (0..cng.num_columns())
189 .map(|col| column_chunk_to_value_vector(cng.column(col), num_rows))
190 .collect();
191
192 for local_row in 0..num_rows {
193 let global_row = base_row + local_row as u64;
194 if has_deletions && table.deleted.is_null(global_row) {
195 continue;
196 }
197 let row: Vec<TypedValue> =
198 columns.iter().map(|col| col.get_value(local_row)).collect();
199 rows.push((global_row, row));
200 }
201 }
202
203 Ok(rows)
204 }
205
206 pub fn delete_row(&mut self, table_id: TableId, row_idx: u64) -> KyuResult<()> {
208 let table = self
209 .tables
210 .get_mut(&table_id)
211 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
212
213 table.deleted.set_null(row_idx, true);
214 Ok(())
215 }
216}
217
218impl Storage for NodeGroupStorage {
219 fn scan_table(&self, table_id: TableId) -> Box<dyn Iterator<Item = DataChunk> + '_> {
220 let table = match self.tables.get(&table_id) {
221 Some(t) if t.node_group.num_rows() > 0 => t,
222 _ => return Box::new(std::iter::empty()),
223 };
224 let num_chunks = table.node_group.num_chunked_groups();
225 let has_deletions = !table.deleted.has_no_nulls_guarantee();
226 Box::new((0..num_chunks).filter_map(move |idx| {
227 let cng = table.node_group.chunked_group(idx);
228 let base_row = idx as u64 * kyu_storage::CHUNKED_NODE_GROUP_CAPACITY;
229 let chunk = if has_deletions {
230 chunked_group_to_data_chunk_filtered(cng, &table.deleted, base_row)
231 } else {
232 chunked_group_to_data_chunk(cng)
233 };
234 if chunk.num_rows() == 0 {
235 None
236 } else {
237 Some(chunk)
238 }
239 }))
240 }
241}
242
243fn chunked_group_to_data_chunk_filtered(
245 cng: &ChunkedNodeGroup,
246 deleted: &NullMask,
247 base_row: u64,
248) -> DataChunk {
249 let num_rows = cng.num_rows() as usize;
250 let columns: Vec<ValueVector> = (0..cng.num_columns())
251 .map(|i| column_chunk_to_value_vector(cng.column(i), num_rows))
252 .collect();
253 let live_indices: Vec<u32> = (0..num_rows)
254 .filter(|&i| !deleted.is_null(base_row + i as u64))
255 .map(|i| i as u32)
256 .collect();
257 let sel = if live_indices.len() == num_rows {
258 SelectionVector::identity(num_rows)
259 } else {
260 SelectionVector::from_indices(live_indices)
261 };
262 DataChunk::from_vectors(columns, sel)
263}
264
265fn chunked_group_to_data_chunk(cng: &ChunkedNodeGroup) -> DataChunk {
267 let num_rows = cng.num_rows() as usize;
268 let columns: Vec<ValueVector> = (0..cng.num_columns())
269 .map(|i| column_chunk_to_value_vector(cng.column(i), num_rows))
270 .collect();
271 DataChunk::from_vectors(columns, SelectionVector::identity(num_rows))
272}
273
274fn column_chunk_to_value_vector(chunk: &ColumnChunk, num_rows: usize) -> ValueVector {
276 match chunk {
277 ColumnChunk::Fixed(c) => ValueVector::Flat(FlatVector::from_column_chunk(c, num_rows)),
278 ColumnChunk::Bool(c) => ValueVector::Bool(BoolVector::from_bool_chunk(c, num_rows)),
279 ColumnChunk::String(c) => ValueVector::String(StringVector::from_string_chunk(c, num_rows)),
280 }
281}
282
283fn typed_value_to_bytes(val: &TypedValue, _ty: &LogicalType) -> Option<Vec<u8>> {
285 match val {
286 TypedValue::Null => None,
287 TypedValue::Bool(b) => Some(vec![if *b { 1u8 } else { 0u8 }]),
288 TypedValue::Int8(v) => Some(v.to_ne_bytes().to_vec()),
289 TypedValue::Int16(v) => Some(v.to_ne_bytes().to_vec()),
290 TypedValue::Int32(v) => Some(v.to_ne_bytes().to_vec()),
291 TypedValue::Int64(v) => Some(v.to_ne_bytes().to_vec()),
292 TypedValue::Float(v) => Some(v.to_ne_bytes().to_vec()),
293 TypedValue::Double(v) => Some(v.to_ne_bytes().to_vec()),
294 TypedValue::String(s) => Some(s.as_bytes().to_vec()),
295 _ => None, }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use smol_str::SmolStr;
303
304 #[test]
305 fn create_and_scan_empty_table() {
306 let mut storage = NodeGroupStorage::new();
307 storage.create_table(TableId(0), vec![LogicalType::Int64]);
308 assert!(storage.has_table(TableId(0)));
309
310 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
311 assert!(chunks.is_empty()); }
313
314 #[test]
315 fn insert_and_scan_int64() {
316 let mut storage = NodeGroupStorage::new();
317 storage.create_table(TableId(0), vec![LogicalType::Int64]);
318
319 storage
320 .insert_row(TableId(0), &[TypedValue::Int64(42)])
321 .unwrap();
322 storage
323 .insert_row(TableId(0), &[TypedValue::Int64(100)])
324 .unwrap();
325
326 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
327 assert_eq!(chunks.len(), 1);
328 assert_eq!(chunks[0].num_rows(), 2);
329 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(42));
330 assert_eq!(chunks[0].get_value(1, 0), TypedValue::Int64(100));
331 }
332
333 #[test]
334 fn insert_and_scan_string() {
335 let mut storage = NodeGroupStorage::new();
336 storage.create_table(TableId(0), vec![LogicalType::Int64, LogicalType::String]);
337
338 storage
339 .insert_row(
340 TableId(0),
341 &[
342 TypedValue::Int64(1),
343 TypedValue::String(SmolStr::new("Alice")),
344 ],
345 )
346 .unwrap();
347 storage
348 .insert_row(
349 TableId(0),
350 &[
351 TypedValue::Int64(2),
352 TypedValue::String(SmolStr::new("Bob")),
353 ],
354 )
355 .unwrap();
356
357 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
358 assert_eq!(chunks.len(), 1);
359 assert_eq!(chunks[0].num_rows(), 2);
360 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(1));
361 assert_eq!(
362 chunks[0].get_value(0, 1),
363 TypedValue::String(SmolStr::new("Alice"))
364 );
365 assert_eq!(
366 chunks[0].get_value(1, 1),
367 TypedValue::String(SmolStr::new("Bob"))
368 );
369 }
370
371 #[test]
372 fn insert_and_scan_bool() {
373 let mut storage = NodeGroupStorage::new();
374 storage.create_table(TableId(0), vec![LogicalType::Bool]);
375
376 storage
377 .insert_row(TableId(0), &[TypedValue::Bool(true)])
378 .unwrap();
379 storage
380 .insert_row(TableId(0), &[TypedValue::Bool(false)])
381 .unwrap();
382 storage.insert_row(TableId(0), &[TypedValue::Null]).unwrap();
383
384 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
385 assert_eq!(chunks[0].num_rows(), 3);
386 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Bool(true));
387 assert_eq!(chunks[0].get_value(1, 0), TypedValue::Bool(false));
388 assert_eq!(chunks[0].get_value(2, 0), TypedValue::Null);
389 }
390
391 #[test]
392 fn drop_table() {
393 let mut storage = NodeGroupStorage::new();
394 storage.create_table(TableId(0), vec![LogicalType::Int64]);
395 assert!(storage.has_table(TableId(0)));
396 storage.drop_table(TableId(0));
397 assert!(!storage.has_table(TableId(0)));
398 }
399
400 #[test]
401 fn insert_into_missing_table_errors() {
402 let mut storage = NodeGroupStorage::new();
403 let result = storage.insert_row(TableId(99), &[TypedValue::Int64(1)]);
404 assert!(result.is_err());
405 }
406
407 #[test]
408 fn scan_missing_table_returns_empty() {
409 let storage = NodeGroupStorage::new();
410 let chunks: Vec<DataChunk> = storage.scan_table(TableId(99)).collect();
411 assert!(chunks.is_empty());
412 }
413
414 #[test]
415 fn update_cell_int64() {
416 let mut storage = NodeGroupStorage::new();
417 storage.create_table(TableId(0), vec![LogicalType::Int64]);
418 storage
419 .insert_row(TableId(0), &[TypedValue::Int64(42)])
420 .unwrap();
421
422 storage
423 .update_cell(TableId(0), 0, 0, &TypedValue::Int64(99))
424 .unwrap();
425
426 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
427 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(99));
428 }
429
430 #[test]
431 fn update_cell_string() {
432 let mut storage = NodeGroupStorage::new();
433 storage.create_table(TableId(0), vec![LogicalType::String]);
434 storage
435 .insert_row(TableId(0), &[TypedValue::String(SmolStr::new("old"))])
436 .unwrap();
437
438 storage
439 .update_cell(TableId(0), 0, 0, &TypedValue::String(SmolStr::new("new")))
440 .unwrap();
441
442 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
443 assert_eq!(
444 chunks[0].get_value(0, 0),
445 TypedValue::String(SmolStr::new("new"))
446 );
447 }
448
449 #[test]
450 fn update_cell_to_null() {
451 let mut storage = NodeGroupStorage::new();
452 storage.create_table(TableId(0), vec![LogicalType::Int64]);
453 storage
454 .insert_row(TableId(0), &[TypedValue::Int64(42)])
455 .unwrap();
456
457 storage
458 .update_cell(TableId(0), 0, 0, &TypedValue::Null)
459 .unwrap();
460
461 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
462 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Null);
463 }
464
465 #[test]
466 fn delete_row_skips_in_scan() {
467 let mut storage = NodeGroupStorage::new();
468 storage.create_table(TableId(0), vec![LogicalType::Int64]);
469 storage
470 .insert_row(TableId(0), &[TypedValue::Int64(1)])
471 .unwrap();
472 storage
473 .insert_row(TableId(0), &[TypedValue::Int64(2)])
474 .unwrap();
475 storage
476 .insert_row(TableId(0), &[TypedValue::Int64(3)])
477 .unwrap();
478
479 storage.delete_row(TableId(0), 1).unwrap(); let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
482 assert_eq!(chunks[0].num_rows(), 2);
483 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(1));
484 assert_eq!(chunks[0].get_value(1, 0), TypedValue::Int64(3));
485 }
486
487 #[test]
488 fn delete_all_rows_returns_empty_scan() {
489 let mut storage = NodeGroupStorage::new();
490 storage.create_table(TableId(0), vec![LogicalType::Int64]);
491 storage
492 .insert_row(TableId(0), &[TypedValue::Int64(1)])
493 .unwrap();
494 storage
495 .insert_row(TableId(0), &[TypedValue::Int64(2)])
496 .unwrap();
497
498 storage.delete_row(TableId(0), 0).unwrap();
499 storage.delete_row(TableId(0), 1).unwrap();
500
501 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
502 assert!(chunks.is_empty());
503 }
504}