kyu_executor/operators/
scan.rs1use std::collections::VecDeque;
7
8use kyu_common::id::TableId;
9use kyu_common::KyuResult;
10
11use crate::context::ExecutionContext;
12use crate::data_chunk::DataChunk;
13
14pub struct ScanNodeOp {
15 pub table_id: TableId,
16 pub column_indices: Option<Vec<usize>>,
19 chunks: Option<VecDeque<DataChunk>>,
20}
21
22impl ScanNodeOp {
23 pub fn new(table_id: TableId) -> Self {
24 Self {
25 table_id,
26 column_indices: None,
27 chunks: None,
28 }
29 }
30
31 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
32 if self.chunks.is_none() {
33 let col_indices = self.column_indices.clone();
34 self.chunks = Some(
35 ctx.storage
36 .scan_table(self.table_id)
37 .map(move |chunk| {
38 if let Some(ref indices) = col_indices {
39 chunk.select_columns(indices)
40 } else {
41 chunk
42 }
43 })
44 .collect(),
45 );
46 }
47 Ok(self.chunks.as_mut().unwrap().pop_front())
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use super::*;
54 use crate::context::MockStorage;
55 use kyu_types::TypedValue;
56 use smol_str::SmolStr;
57
58 fn make_storage() -> MockStorage {
59 let mut storage = MockStorage::new();
60 storage.insert_table(
61 TableId(0),
62 vec![
63 vec![TypedValue::String(SmolStr::new("Alice")), TypedValue::Int64(25)],
64 vec![TypedValue::String(SmolStr::new("Bob")), TypedValue::Int64(30)],
65 ],
66 );
67 storage
68 }
69
70 #[test]
71 fn scan_returns_all_rows() {
72 let storage = make_storage();
73 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
74 let mut op = ScanNodeOp::new(TableId(0));
75 let chunk = op.next(&ctx).unwrap().unwrap();
76 assert_eq!(chunk.num_rows(), 2);
77 assert_eq!(chunk.num_columns(), 2);
78 }
79
80 #[test]
81 fn scan_exhausts_after_one_call() {
82 let storage = make_storage();
83 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
84 let mut op = ScanNodeOp::new(TableId(0));
85 assert!(op.next(&ctx).unwrap().is_some());
86 assert!(op.next(&ctx).unwrap().is_none());
87 }
88
89 #[test]
90 fn scan_missing_table_returns_none() {
91 let storage = make_storage();
92 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
93 let mut op = ScanNodeOp::new(TableId(99));
94 assert!(op.next(&ctx).unwrap().is_none());
95 }
96}