Skip to main content

kyu_executor/operators/
scan.rs

1//! Scan operator — reads rows from Storage trait.
2//!
3//! Streams chunks one-at-a-time from storage, preserving native
4//! FlatVector/BoolVector/StringVector column formats (no materialization).
5
6use std::collections::VecDeque;
7
8use kyu_common::KyuResult;
9use kyu_common::id::TableId;
10
11use crate::context::ExecutionContext;
12use crate::data_chunk::DataChunk;
13
14pub struct ScanNodeOp {
15    pub table_id: TableId,
16    /// Optional column indices to project during scan. When set, only these
17    /// columns are kept, avoiding copies of unused columns (especially strings).
18    pub column_indices: Option<Vec<usize>>,
19    chunks: Option<VecDeque<DataChunk>>,
20}
21
22impl ScanNodeOp {
23    pub fn new(table_id: TableId) -> Self {
24        Self {
25            table_id,
26            column_indices: None,
27            chunks: None,
28        }
29    }
30
31    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
32        if self.chunks.is_none() {
33            let col_indices = self.column_indices.clone();
34            self.chunks = Some(
35                ctx.storage
36                    .scan_table(self.table_id)
37                    .map(move |chunk| {
38                        if let Some(ref indices) = col_indices {
39                            chunk.select_columns(indices)
40                        } else {
41                            chunk
42                        }
43                    })
44                    .collect(),
45            );
46        }
47        Ok(self.chunks.as_mut().unwrap().pop_front())
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use crate::context::MockStorage;
55    use kyu_types::TypedValue;
56    use smol_str::SmolStr;
57
58    fn make_storage() -> MockStorage {
59        let mut storage = MockStorage::new();
60        storage.insert_table(
61            TableId(0),
62            vec![
63                vec![
64                    TypedValue::String(SmolStr::new("Alice")),
65                    TypedValue::Int64(25),
66                ],
67                vec![
68                    TypedValue::String(SmolStr::new("Bob")),
69                    TypedValue::Int64(30),
70                ],
71            ],
72        );
73        storage
74    }
75
76    #[test]
77    fn scan_returns_all_rows() {
78        let storage = make_storage();
79        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
80        let mut op = ScanNodeOp::new(TableId(0));
81        let chunk = op.next(&ctx).unwrap().unwrap();
82        assert_eq!(chunk.num_rows(), 2);
83        assert_eq!(chunk.num_columns(), 2);
84    }
85
86    #[test]
87    fn scan_exhausts_after_one_call() {
88        let storage = make_storage();
89        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
90        let mut op = ScanNodeOp::new(TableId(0));
91        assert!(op.next(&ctx).unwrap().is_some());
92        assert!(op.next(&ctx).unwrap().is_none());
93    }
94
95    #[test]
96    fn scan_missing_table_returns_none() {
97        let storage = make_storage();
98        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
99        let mut op = ScanNodeOp::new(TableId(99));
100        assert!(op.next(&ctx).unwrap().is_none());
101    }
102}