Skip to main content

kyu_executor/operators/
scan.rs

1//! Scan operator — reads rows from Storage trait.
2//!
3//! Streams chunks one-at-a-time from storage, preserving native
4//! FlatVector/BoolVector/StringVector column formats (no materialization).
5
6use std::collections::VecDeque;
7
8use kyu_common::id::TableId;
9use kyu_common::KyuResult;
10
11use crate::context::ExecutionContext;
12use crate::data_chunk::DataChunk;
13
14pub struct ScanNodeOp {
15    pub table_id: TableId,
16    /// Optional column indices to project during scan. When set, only these
17    /// columns are kept, avoiding copies of unused columns (especially strings).
18    pub column_indices: Option<Vec<usize>>,
19    chunks: Option<VecDeque<DataChunk>>,
20}
21
22impl ScanNodeOp {
23    pub fn new(table_id: TableId) -> Self {
24        Self {
25            table_id,
26            column_indices: None,
27            chunks: None,
28        }
29    }
30
31    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
32        if self.chunks.is_none() {
33            let col_indices = self.column_indices.clone();
34            self.chunks = Some(
35                ctx.storage
36                    .scan_table(self.table_id)
37                    .map(move |chunk| {
38                        if let Some(ref indices) = col_indices {
39                            chunk.select_columns(indices)
40                        } else {
41                            chunk
42                        }
43                    })
44                    .collect(),
45            );
46        }
47        Ok(self.chunks.as_mut().unwrap().pop_front())
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use crate::context::MockStorage;
55    use kyu_types::TypedValue;
56    use smol_str::SmolStr;
57
58    fn make_storage() -> MockStorage {
59        let mut storage = MockStorage::new();
60        storage.insert_table(
61            TableId(0),
62            vec![
63                vec![TypedValue::String(SmolStr::new("Alice")), TypedValue::Int64(25)],
64                vec![TypedValue::String(SmolStr::new("Bob")), TypedValue::Int64(30)],
65            ],
66        );
67        storage
68    }
69
70    #[test]
71    fn scan_returns_all_rows() {
72        let storage = make_storage();
73        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
74        let mut op = ScanNodeOp::new(TableId(0));
75        let chunk = op.next(&ctx).unwrap().unwrap();
76        assert_eq!(chunk.num_rows(), 2);
77        assert_eq!(chunk.num_columns(), 2);
78    }
79
80    #[test]
81    fn scan_exhausts_after_one_call() {
82        let storage = make_storage();
83        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
84        let mut op = ScanNodeOp::new(TableId(0));
85        assert!(op.next(&ctx).unwrap().is_some());
86        assert!(op.next(&ctx).unwrap().is_none());
87    }
88
89    #[test]
90    fn scan_missing_table_returns_none() {
91        let storage = make_storage();
92        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
93        let mut op = ScanNodeOp::new(TableId(99));
94        assert!(op.next(&ctx).unwrap().is_none());
95    }
96}