Skip to main content

kyu_executor/
context.rs

1//! Execution context — storage trait and execution context.
2
3use hashbrown::HashMap;
4use kyu_catalog::CatalogContent;
5use kyu_common::id::TableId;
6use kyu_types::TypedValue;
7
8use crate::data_chunk::DataChunk;
9
10/// Abstraction over table storage backends.
11///
12/// kyu-executor depends only on this trait. Concrete implementations
13/// (MockStorage for tests, NodeGroupStorage for real storage) live
14/// in their respective crates.
15pub trait Storage: std::fmt::Debug {
16    /// Iterate DataChunk batches for a table. Returns empty iterator if table missing.
17    fn scan_table(&self, table_id: TableId) -> Box<dyn Iterator<Item = DataChunk> + '_>;
18}
19
20/// Mock in-memory storage: table_id → rows (each row is a Vec<TypedValue>).
21#[derive(Clone, Debug)]
22pub struct MockStorage {
23    tables: HashMap<TableId, Vec<Vec<TypedValue>>>,
24}
25
26impl MockStorage {
27    pub fn new() -> Self {
28        Self {
29            tables: HashMap::new(),
30        }
31    }
32
33    /// Insert rows for a table.
34    pub fn insert_table(&mut self, table_id: TableId, rows: Vec<Vec<TypedValue>>) {
35        self.tables.insert(table_id, rows);
36    }
37}
38
39impl Default for MockStorage {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl Storage for MockStorage {
46    fn scan_table(&self, table_id: TableId) -> Box<dyn Iterator<Item = DataChunk> + '_> {
47        match self.tables.get(&table_id) {
48            Some(rows) if !rows.is_empty() => {
49                let num_cols = rows[0].len();
50                Box::new(std::iter::once(DataChunk::from_rows(rows, num_cols)))
51            }
52            _ => Box::new(std::iter::empty()),
53        }
54    }
55}
56
57/// Execution context holding catalog and storage references.
58pub struct ExecutionContext<'a> {
59    pub catalog: CatalogContent,
60    pub storage: &'a dyn Storage,
61    #[cfg(feature = "jit")]
62    jit_cache: Option<std::sync::Arc<crate::jit::ExpressionCache>>,
63}
64
65impl std::fmt::Debug for ExecutionContext<'_> {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("ExecutionContext")
68            .field("catalog", &self.catalog)
69            .field("storage", &self.storage)
70            .finish()
71    }
72}
73
74impl<'a> ExecutionContext<'a> {
75    pub fn new(catalog: CatalogContent, storage: &'a dyn Storage) -> Self {
76        Self {
77            catalog,
78            storage,
79            #[cfg(feature = "jit")]
80            jit_cache: None,
81        }
82    }
83
84    /// Create an execution context with a JIT expression cache.
85    #[cfg(feature = "jit")]
86    pub fn with_jit_cache(
87        catalog: CatalogContent,
88        storage: &'a dyn Storage,
89        cache: std::sync::Arc<crate::jit::ExpressionCache>,
90    ) -> Self {
91        Self {
92            catalog,
93            storage,
94            jit_cache: Some(cache),
95        }
96    }
97
98    /// Get the JIT expression cache, if available.
99    #[cfg(feature = "jit")]
100    pub fn jit_cache(&self) -> Option<&std::sync::Arc<crate::jit::ExpressionCache>> {
101        self.jit_cache.as_ref()
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use smol_str::SmolStr;
109
110    #[test]
111    fn mock_storage_insert_and_scan() {
112        let mut storage = MockStorage::new();
113        let rows = vec![
114            vec![
115                TypedValue::Int64(1),
116                TypedValue::String(SmolStr::new("Alice")),
117            ],
118            vec![
119                TypedValue::Int64(2),
120                TypedValue::String(SmolStr::new("Bob")),
121            ],
122        ];
123        storage.insert_table(TableId(0), rows);
124        let chunks: Vec<DataChunk> = storage.scan_table(TableId(0)).collect();
125        assert_eq!(chunks.len(), 1);
126        assert_eq!(chunks[0].num_rows(), 2);
127    }
128
129    #[test]
130    fn mock_storage_missing_table() {
131        let storage = MockStorage::new();
132        let chunks: Vec<DataChunk> = storage.scan_table(TableId(99)).collect();
133        assert!(chunks.is_empty());
134    }
135}