1use hashbrown::HashMap;
4use kyu_common::id::TableId;
5use kyu_common::{KyuError, KyuResult};
6use kyu_executor::value_vector::{BoolVector, FlatVector, StringVector};
7use kyu_executor::{DataChunk, SelectionVector, Storage, ValueVector};
8use kyu_storage::{ChunkedNodeGroup, ColumnChunk, NodeGroup, NodeGroupIdx, NullMask};
9use kyu_types::{LogicalType, TypedValue};
10
11struct TableData {
12 schema: Vec<LogicalType>,
13 node_group: NodeGroup,
14 deleted: NullMask,
17}
18
19#[derive(Debug)]
21pub struct NodeGroupStorage {
22 tables: HashMap<TableId, TableData>,
23}
24
25impl std::fmt::Debug for TableData {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("TableData")
28 .field("schema", &self.schema)
29 .field("num_rows", &self.node_group.num_rows())
30 .finish()
31 }
32}
33
34impl Default for NodeGroupStorage {
35 fn default() -> Self {
36 Self {
37 tables: HashMap::new(),
38 }
39 }
40}
41
42impl NodeGroupStorage {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn create_table(&mut self, table_id: TableId, schema: Vec<LogicalType>) {
49 self.tables.insert(
50 table_id,
51 TableData {
52 node_group: NodeGroup::new(NodeGroupIdx(table_id.0), schema.clone()),
53 schema,
54 deleted: NullMask::new(0),
55 },
56 );
57 }
58
59 pub fn drop_table(&mut self, table_id: TableId) {
61 self.tables.remove(&table_id);
62 }
63
64 pub fn has_table(&self, table_id: TableId) -> bool {
66 self.tables.contains_key(&table_id)
67 }
68
69 pub fn table_schema(&self, table_id: TableId) -> Option<&[LogicalType]> {
71 self.tables.get(&table_id).map(|t| t.schema.as_slice())
72 }
73
74 pub fn num_rows(&self, table_id: TableId) -> u64 {
76 self.tables
77 .get(&table_id)
78 .map_or(0, |t| t.node_group.num_rows())
79 }
80
81 pub fn insert_row(&mut self, table_id: TableId, values: &[TypedValue]) -> KyuResult<()> {
83 let table = self
84 .tables
85 .get_mut(&table_id)
86 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
87
88 let raw_values: Vec<Option<Vec<u8>>> = values
89 .iter()
90 .zip(&table.schema)
91 .map(|(val, ty)| typed_value_to_bytes(val, ty))
92 .collect();
93
94 let refs: Vec<Option<&[u8]>> = raw_values.iter().map(|opt| opt.as_deref()).collect();
95 table.node_group.append_row(&refs);
96
97 let num_rows = table.node_group.num_rows();
99 table.deleted = NullMask::new(num_rows);
100
101 Ok(())
102 }
103
104 pub fn update_cell(
106 &mut self,
107 table_id: TableId,
108 row_idx: u64,
109 col_idx: usize,
110 value: &TypedValue,
111 ) -> KyuResult<()> {
112 let table = self
113 .tables
114 .get_mut(&table_id)
115 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
116
117 let (chunk_idx, local_row) = table.node_group.global_row_to_chunked_group(row_idx);
118 let chunk = table.node_group.chunked_group_mut(chunk_idx);
119 let col = chunk.column_mut(col_idx);
120
121 match (col, value) {
122 (ColumnChunk::Fixed(c), TypedValue::Null) => {
123 c.set_null(local_row, true);
124 }
125 (ColumnChunk::Fixed(c), TypedValue::Int8(v)) => {
126 c.set_value::<i8>(local_row, *v);
127 c.set_null(local_row, false);
128 }
129 (ColumnChunk::Fixed(c), TypedValue::Int16(v)) => {
130 c.set_value::<i16>(local_row, *v);
131 c.set_null(local_row, false);
132 }
133 (ColumnChunk::Fixed(c), TypedValue::Int32(v)) => {
134 c.set_value::<i32>(local_row, *v);
135 c.set_null(local_row, false);
136 }
137 (ColumnChunk::Fixed(c), TypedValue::Int64(v)) => {
138 c.set_value::<i64>(local_row, *v);
139 c.set_null(local_row, false);
140 }
141 (ColumnChunk::Fixed(c), TypedValue::Float(v)) => {
142 c.set_value::<f32>(local_row, *v);
143 c.set_null(local_row, false);
144 }
145 (ColumnChunk::Fixed(c), TypedValue::Double(v)) => {
146 c.set_value::<f64>(local_row, *v);
147 c.set_null(local_row, false);
148 }
149 (ColumnChunk::Bool(c), TypedValue::Null) => {
150 c.set_null(local_row, true);
151 }
152 (ColumnChunk::Bool(c), TypedValue::Bool(v)) => {
153 c.set_bool(local_row, *v);
154 }
155 (ColumnChunk::String(c), TypedValue::Null) => {
156 c.set_null(local_row, true);
157 }
158 (ColumnChunk::String(c), TypedValue::String(s)) => {
159 c.set_string(local_row, s.clone());
160 }
161 _ => {
162 return Err(KyuError::Storage(format!(
163 "type mismatch: cannot write {:?} to column {}",
164 value, col_idx
165 )));
166 }
167 }
168 Ok(())
169 }
170
171 pub fn scan_rows(&self, table_id: TableId) -> KyuResult<Vec<(u64, Vec<TypedValue>)>> {
174 let table = self
175 .tables
176 .get(&table_id)
177 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
178
179 let num_chunks = table.node_group.num_chunked_groups();
180 let has_deletions = !table.deleted.has_no_nulls_guarantee();
181 let mut rows = Vec::new();
182
183 for chunk_idx in 0..num_chunks {
184 let cng = table.node_group.chunked_group(chunk_idx);
185 let base_row = chunk_idx as u64 * kyu_storage::CHUNKED_NODE_GROUP_CAPACITY;
186 let num_rows = cng.num_rows() as usize;
187
188 let columns: Vec<ValueVector> = (0..cng.num_columns())
189 .map(|col| column_chunk_to_value_vector(cng.column(col), num_rows))
190 .collect();
191
192 for local_row in 0..num_rows {
193 let global_row = base_row + local_row as u64;
194 if has_deletions && table.deleted.is_null(global_row) {
195 continue;
196 }
197 let row: Vec<TypedValue> =
198 columns.iter().map(|col| col.get_value(local_row)).collect();
199 rows.push((global_row, row));
200 }
201 }
202
203 Ok(rows)
204 }
205
206 pub fn delete_row(&mut self, table_id: TableId, row_idx: u64) -> KyuResult<()> {
208 let table = self
209 .tables
210 .get_mut(&table_id)
211 .ok_or_else(|| KyuError::Storage(format!("table {:?} not found", table_id)))?;
212
213 table.deleted.set_null(row_idx, true);
214 Ok(())
215 }
216}
217
218impl Storage for NodeGroupStorage {
219 fn scan_table(&self, table_id: TableId) -> Box<dyn Iterator<Item = DataChunk> + '_> {
220 let table = match self.tables.get(&table_id) {
221 Some(t) if t.node_group.num_rows() > 0 => t,
222 _ => return Box::new(std::iter::empty()),
223 };
224 let num_chunks = table.node_group.num_chunked_groups();
225 let has_deletions = !table.deleted.has_no_nulls_guarantee();
226 Box::new((0..num_chunks).filter_map(move |idx| {
227 let cng = table.node_group.chunked_group(idx);
228 let base_row = idx as u64 * kyu_storage::CHUNKED_NODE_GROUP_CAPACITY;
229 let chunk = if has_deletions {
230 chunked_group_to_data_chunk_filtered(cng, &table.deleted, base_row)
231 } else {
232 chunked_group_to_data_chunk(cng)
233 };
234 if chunk.num_rows() == 0 {
235 None
236 } else {
237 Some(chunk)
238 }
239 }))
240 }
241}
242
243fn chunked_group_to_data_chunk_filtered(
245 cng: &ChunkedNodeGroup,
246 deleted: &NullMask,
247 base_row: u64,
248) -> DataChunk {
249 let num_rows = cng.num_rows() as usize;
250 let columns: Vec<ValueVector> = (0..cng.num_columns())
251 .map(|i| column_chunk_to_value_vector(cng.column(i), num_rows))
252 .collect();
253 let live_indices: Vec<u32> = (0..num_rows)
254 .filter(|&i| !deleted.is_null(base_row + i as u64))
255 .map(|i| i as u32)
256 .collect();
257 let sel = if live_indices.len() == num_rows {
258 SelectionVector::identity(num_rows)
259 } else {
260 SelectionVector::from_indices(live_indices)
261 };
262 DataChunk::from_vectors(columns, sel)
263}
264
265fn chunked_group_to_data_chunk(cng: &ChunkedNodeGroup) -> DataChunk {
267 let num_rows = cng.num_rows() as usize;
268 let columns: Vec<ValueVector> = (0..cng.num_columns())
269 .map(|i| column_chunk_to_value_vector(cng.column(i), num_rows))
270 .collect();
271 DataChunk::from_vectors(columns, SelectionVector::identity(num_rows))
272}
273
274fn column_chunk_to_value_vector(chunk: &ColumnChunk, num_rows: usize) -> ValueVector {
276 match chunk {
277 ColumnChunk::Fixed(c) => ValueVector::Flat(FlatVector::from_column_chunk(c, num_rows)),
278 ColumnChunk::Bool(c) => ValueVector::Bool(BoolVector::from_bool_chunk(c, num_rows)),
279 ColumnChunk::String(c) => ValueVector::String(StringVector::from_string_chunk(c, num_rows)),
280 }
281}
282
283fn typed_value_to_bytes(val: &TypedValue, _ty: &LogicalType) -> Option<Vec<u8>> {
285 match val {
286 TypedValue::Null => None,
287 TypedValue::Bool(b) => Some(vec![if *b { 1u8 } else { 0u8 }]),
288 TypedValue::Int8(v) => Some(v.to_ne_bytes().to_vec()),
289 TypedValue::Int16(v) => Some(v.to_ne_bytes().to_vec()),
290 TypedValue::Int32(v) => Some(v.to_ne_bytes().to_vec()),
291 TypedValue::Int64(v) => Some(v.to_ne_bytes().to_vec()),
292 TypedValue::Float(v) => Some(v.to_ne_bytes().to_vec()),
293 TypedValue::Double(v) => Some(v.to_ne_bytes().to_vec()),
294 TypedValue::String(s) => Some(s.as_bytes().to_vec()),
295 _ => None, }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use smol_str::SmolStr;
303
304 #[test]
305 fn create_and_scan_empty_table() {
306 let mut storage = NodeGroupStorage::new();
307 storage.create_table(TableId(0), vec![LogicalType::Int64]);
308 assert!(storage.has_table(TableId(0)));
309
310 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
311 assert!(chunks.is_empty()); }
313
314 #[test]
315 fn insert_and_scan_int64() {
316 let mut storage = NodeGroupStorage::new();
317 storage.create_table(TableId(0), vec![LogicalType::Int64]);
318
319 storage.insert_row(TableId(0), &[TypedValue::Int64(42)]).unwrap();
320 storage.insert_row(TableId(0), &[TypedValue::Int64(100)]).unwrap();
321
322 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
323 assert_eq!(chunks.len(), 1);
324 assert_eq!(chunks[0].num_rows(), 2);
325 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(42));
326 assert_eq!(chunks[0].get_value(1, 0), TypedValue::Int64(100));
327 }
328
329 #[test]
330 fn insert_and_scan_string() {
331 let mut storage = NodeGroupStorage::new();
332 storage.create_table(TableId(0), vec![LogicalType::Int64, LogicalType::String]);
333
334 storage
335 .insert_row(
336 TableId(0),
337 &[TypedValue::Int64(1), TypedValue::String(SmolStr::new("Alice"))],
338 )
339 .unwrap();
340 storage
341 .insert_row(
342 TableId(0),
343 &[TypedValue::Int64(2), TypedValue::String(SmolStr::new("Bob"))],
344 )
345 .unwrap();
346
347 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
348 assert_eq!(chunks.len(), 1);
349 assert_eq!(chunks[0].num_rows(), 2);
350 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(1));
351 assert_eq!(chunks[0].get_value(0, 1), TypedValue::String(SmolStr::new("Alice")));
352 assert_eq!(chunks[0].get_value(1, 1), TypedValue::String(SmolStr::new("Bob")));
353 }
354
355 #[test]
356 fn insert_and_scan_bool() {
357 let mut storage = NodeGroupStorage::new();
358 storage.create_table(TableId(0), vec![LogicalType::Bool]);
359
360 storage.insert_row(TableId(0), &[TypedValue::Bool(true)]).unwrap();
361 storage.insert_row(TableId(0), &[TypedValue::Bool(false)]).unwrap();
362 storage.insert_row(TableId(0), &[TypedValue::Null]).unwrap();
363
364 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
365 assert_eq!(chunks[0].num_rows(), 3);
366 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Bool(true));
367 assert_eq!(chunks[0].get_value(1, 0), TypedValue::Bool(false));
368 assert_eq!(chunks[0].get_value(2, 0), TypedValue::Null);
369 }
370
371 #[test]
372 fn drop_table() {
373 let mut storage = NodeGroupStorage::new();
374 storage.create_table(TableId(0), vec![LogicalType::Int64]);
375 assert!(storage.has_table(TableId(0)));
376 storage.drop_table(TableId(0));
377 assert!(!storage.has_table(TableId(0)));
378 }
379
380 #[test]
381 fn insert_into_missing_table_errors() {
382 let mut storage = NodeGroupStorage::new();
383 let result = storage.insert_row(TableId(99), &[TypedValue::Int64(1)]);
384 assert!(result.is_err());
385 }
386
387 #[test]
388 fn scan_missing_table_returns_empty() {
389 let storage = NodeGroupStorage::new();
390 let chunks: Vec<DataChunk> = storage.scan_table(TableId(99)).collect();
391 assert!(chunks.is_empty());
392 }
393
394 #[test]
395 fn update_cell_int64() {
396 let mut storage = NodeGroupStorage::new();
397 storage.create_table(TableId(0), vec![LogicalType::Int64]);
398 storage.insert_row(TableId(0), &[TypedValue::Int64(42)]).unwrap();
399
400 storage.update_cell(TableId(0), 0, 0, &TypedValue::Int64(99)).unwrap();
401
402 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
403 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(99));
404 }
405
406 #[test]
407 fn update_cell_string() {
408 let mut storage = NodeGroupStorage::new();
409 storage.create_table(TableId(0), vec![LogicalType::String]);
410 storage.insert_row(TableId(0), &[TypedValue::String(SmolStr::new("old"))]).unwrap();
411
412 storage.update_cell(TableId(0), 0, 0, &TypedValue::String(SmolStr::new("new"))).unwrap();
413
414 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
415 assert_eq!(chunks[0].get_value(0, 0), TypedValue::String(SmolStr::new("new")));
416 }
417
418 #[test]
419 fn update_cell_to_null() {
420 let mut storage = NodeGroupStorage::new();
421 storage.create_table(TableId(0), vec![LogicalType::Int64]);
422 storage.insert_row(TableId(0), &[TypedValue::Int64(42)]).unwrap();
423
424 storage.update_cell(TableId(0), 0, 0, &TypedValue::Null).unwrap();
425
426 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
427 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Null);
428 }
429
430 #[test]
431 fn delete_row_skips_in_scan() {
432 let mut storage = NodeGroupStorage::new();
433 storage.create_table(TableId(0), vec![LogicalType::Int64]);
434 storage.insert_row(TableId(0), &[TypedValue::Int64(1)]).unwrap();
435 storage.insert_row(TableId(0), &[TypedValue::Int64(2)]).unwrap();
436 storage.insert_row(TableId(0), &[TypedValue::Int64(3)]).unwrap();
437
438 storage.delete_row(TableId(0), 1).unwrap(); let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
441 assert_eq!(chunks[0].num_rows(), 2);
442 assert_eq!(chunks[0].get_value(0, 0), TypedValue::Int64(1));
443 assert_eq!(chunks[0].get_value(1, 0), TypedValue::Int64(3));
444 }
445
446 #[test]
447 fn delete_all_rows_returns_empty_scan() {
448 let mut storage = NodeGroupStorage::new();
449 storage.create_table(TableId(0), vec![LogicalType::Int64]);
450 storage.insert_row(TableId(0), &[TypedValue::Int64(1)]).unwrap();
451 storage.insert_row(TableId(0), &[TypedValue::Int64(2)]).unwrap();
452
453 storage.delete_row(TableId(0), 0).unwrap();
454 storage.delete_row(TableId(0), 1).unwrap();
455
456 let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
457 assert!(chunks.is_empty());
458 }
459}