Skip to main content

grafeo_core/execution/operators/
scan.rs

1//! Scan operator for reading data from storage.
2
3use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::GraphStore;
6use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId};
7use std::sync::Arc;
8
9/// A scan operator that reads nodes from storage.
10pub struct ScanOperator {
11    /// The store to scan from.
12    store: Arc<dyn GraphStore>,
13    /// Label filter (None = all nodes).
14    label: Option<String>,
15    /// Current position in the scan.
16    position: usize,
17    /// Batch of node IDs to scan.
18    batch: Vec<NodeId>,
19    /// Whether the scan is exhausted.
20    exhausted: bool,
21    /// Chunk capacity.
22    chunk_capacity: usize,
23    /// Transaction ID for MVCC visibility (None = use current epoch).
24    transaction_id: Option<TransactionId>,
25    /// Epoch for version visibility.
26    viewing_epoch: Option<EpochId>,
27}
28
29impl ScanOperator {
30    /// Creates a new scan operator for all nodes.
31    pub fn new(store: Arc<dyn GraphStore>) -> Self {
32        Self {
33            store,
34            label: None,
35            position: 0,
36            batch: Vec::new(),
37            exhausted: false,
38            chunk_capacity: 2048,
39            transaction_id: None,
40            viewing_epoch: None,
41        }
42    }
43
44    /// Creates a new scan operator for nodes with a specific label.
45    pub fn with_label(store: Arc<dyn GraphStore>, label: impl Into<String>) -> Self {
46        Self {
47            store,
48            label: Some(label.into()),
49            position: 0,
50            batch: Vec::new(),
51            exhausted: false,
52            chunk_capacity: 2048,
53            transaction_id: None,
54            viewing_epoch: None,
55        }
56    }
57
58    /// Sets the chunk capacity.
59    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
60        self.chunk_capacity = capacity;
61        self
62    }
63
64    /// Sets the transaction context for MVCC visibility.
65    ///
66    /// When set, the scan will only return nodes visible to this transaction.
67    pub fn with_transaction_context(
68        mut self,
69        epoch: EpochId,
70        transaction_id: Option<TransactionId>,
71    ) -> Self {
72        self.viewing_epoch = Some(epoch);
73        self.transaction_id = transaction_id;
74        self
75    }
76
77    fn load_batch(&mut self) {
78        if !self.batch.is_empty() || self.exhausted {
79            return;
80        }
81
82        // Get nodes, using versioned method if tx context is set
83        let all_ids = match &self.label {
84            Some(label) => self.store.nodes_by_label(label),
85            None => self.store.node_ids(),
86        };
87
88        // Filter by visibility if we have tx context
89        self.batch = if let Some(epoch) = self.viewing_epoch {
90            if let Some(tx) = self.transaction_id {
91                // Transaction-aware visibility (sees own uncommitted changes)
92                all_ids
93                    .into_iter()
94                    .filter(|id| self.store.get_node_versioned(*id, epoch, tx).is_some())
95                    .collect()
96            } else {
97                // Pure epoch-based visibility (time-travel, no tx)
98                all_ids
99                    .into_iter()
100                    .filter(|id| self.store.get_node_at_epoch(*id, epoch).is_some())
101                    .collect()
102            }
103        } else {
104            all_ids
105        };
106
107        if self.batch.is_empty() {
108            self.exhausted = true;
109        }
110    }
111}
112
113impl Operator for ScanOperator {
114    fn next(&mut self) -> OperatorResult {
115        self.load_batch();
116
117        if self.exhausted || self.position >= self.batch.len() {
118            return Ok(None);
119        }
120
121        // Create output chunk with node IDs
122        let schema = [LogicalType::Node];
123        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
124
125        let end = (self.position + self.chunk_capacity).min(self.batch.len());
126        let count = end - self.position;
127
128        {
129            // Column 0 guaranteed to exist: chunk created with single-column schema above
130            let col = chunk
131                .column_mut(0)
132                .expect("column 0 exists: chunk created with single-column schema");
133            for i in self.position..end {
134                col.push_node_id(self.batch[i]);
135            }
136        }
137
138        chunk.set_count(count);
139        self.position = end;
140
141        Ok(Some(chunk))
142    }
143
144    fn reset(&mut self) {
145        self.position = 0;
146        self.batch.clear();
147        self.exhausted = false;
148    }
149
150    fn name(&self) -> &'static str {
151        "Scan"
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::graph::GraphStoreMut;
159    use crate::graph::lpg::LpgStore;
160
161    #[test]
162    fn test_scan_by_label() {
163        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
164
165        store.create_node(&["Person"]);
166        store.create_node(&["Person"]);
167        store.create_node(&["Animal"]);
168
169        let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
170
171        let chunk = scan.next().unwrap().unwrap();
172        assert_eq!(chunk.row_count(), 2);
173
174        // Should be exhausted
175        let next = scan.next().unwrap();
176        assert!(next.is_none());
177    }
178
179    #[test]
180    fn test_scan_reset() {
181        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
182        store.create_node(&["Person"]);
183
184        let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
185
186        // First scan
187        let chunk1 = scan.next().unwrap().unwrap();
188        assert_eq!(chunk1.row_count(), 1);
189
190        // Reset
191        scan.reset();
192
193        // Second scan should work
194        let chunk2 = scan.next().unwrap().unwrap();
195        assert_eq!(chunk2.row_count(), 1);
196    }
197
198    #[test]
199    fn test_full_scan() {
200        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
201
202        // Create nodes with different labels
203        store.create_node(&["Person"]);
204        store.create_node(&["Person"]);
205        store.create_node(&["Animal"]);
206        store.create_node(&["Place"]);
207
208        // Full scan (no label filter) should return all nodes
209        let mut scan = ScanOperator::new(store.clone() as Arc<dyn GraphStore>);
210
211        let chunk = scan.next().unwrap().unwrap();
212        assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
213
214        // Should be exhausted
215        let next = scan.next().unwrap();
216        assert!(next.is_none());
217    }
218
219    #[test]
220    fn test_scan_with_mvcc_context() {
221        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
222
223        // Create nodes at epoch 1
224        let epoch1 = EpochId::new(1);
225        let tx1 = TransactionId::new(1);
226        store.create_node_versioned(&["Person"], epoch1, tx1);
227        store.create_node_versioned(&["Person"], epoch1, tx1);
228
229        // Create a node at epoch 5
230        let epoch5 = EpochId::new(5);
231        let tx2 = TransactionId::new(2);
232        store.create_node_versioned(&["Person"], epoch5, tx2);
233
234        // Scan at epoch 3 should see only the first 2 nodes (created at epoch 1)
235        let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
236            .with_transaction_context(EpochId::new(3), None);
237
238        let chunk = scan.next().unwrap().unwrap();
239        assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
240
241        // Scan at epoch 5 should see all 3 nodes
242        let mut scan_all = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
243            .with_transaction_context(EpochId::new(5), None);
244
245        let chunk_all = scan_all.next().unwrap().unwrap();
246        assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
247    }
248}