Skip to main content

grafeo_core/execution/operators/
scan.rs

1//! Scan operator for reading data from storage.
2
3use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::GraphStoreSearch;
6use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId};
7use std::sync::Arc;
8
9/// A scan operator that reads nodes from storage.
10pub struct ScanOperator {
11    /// The store to scan from.
12    store: Arc<dyn GraphStoreSearch>,
13    /// Label filter (None = all nodes).
14    label: Option<String>,
15    /// Current position in the scan.
16    position: usize,
17    /// Batch of node IDs to scan.
18    batch: Vec<NodeId>,
19    /// Whether the scan is exhausted.
20    exhausted: bool,
21    /// Chunk capacity.
22    chunk_capacity: usize,
23    /// Transaction ID for MVCC visibility (None = use current epoch).
24    transaction_id: Option<TransactionId>,
25    /// Epoch for version visibility.
26    viewing_epoch: Option<EpochId>,
27}
28
29impl ScanOperator {
30    /// Creates a new scan operator for all nodes.
31    pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
32        Self {
33            store,
34            label: None,
35            position: 0,
36            batch: Vec::new(),
37            exhausted: false,
38            chunk_capacity: 2048,
39            transaction_id: None,
40            viewing_epoch: None,
41        }
42    }
43
44    /// Creates a new scan operator for nodes with a specific label.
45    pub fn with_label(store: Arc<dyn GraphStoreSearch>, label: impl Into<String>) -> Self {
46        Self {
47            store,
48            label: Some(label.into()),
49            position: 0,
50            batch: Vec::new(),
51            exhausted: false,
52            chunk_capacity: 2048,
53            transaction_id: None,
54            viewing_epoch: None,
55        }
56    }
57
58    /// Sets the chunk capacity.
59    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
60        self.chunk_capacity = capacity;
61        self
62    }
63
64    /// Sets the transaction context for MVCC visibility.
65    ///
66    /// When set, the scan will only return nodes visible to this transaction.
67    pub fn with_transaction_context(
68        mut self,
69        epoch: EpochId,
70        transaction_id: Option<TransactionId>,
71    ) -> Self {
72        self.viewing_epoch = Some(epoch);
73        self.transaction_id = transaction_id;
74        self
75    }
76
77    fn load_batch(&mut self) {
78        if !self.batch.is_empty() || self.exhausted {
79            return;
80        }
81
82        // Get nodes. When we have transaction context, use all_node_ids()
83        // to include uncommitted/PENDING versions (nodes_by_label already
84        // returns unfiltered IDs from the label index, but node_ids()
85        // pre-filters by epoch which excludes PENDING nodes).
86        let all_ids = match &self.label {
87            Some(label) => self.store.nodes_by_label(label),
88            None if self.viewing_epoch.is_some() => self.store.all_node_ids(),
89            None => self.store.node_ids(),
90        };
91
92        // Filter by visibility if we have tx context.
93        // Uses batch methods that hold a single lock for all IDs instead of
94        // acquiring/releasing per node (avoids N+1 lock pattern).
95        self.batch = if let Some(epoch) = self.viewing_epoch {
96            if let Some(tx) = self.transaction_id {
97                self.store
98                    .filter_visible_node_ids_versioned(&all_ids, epoch, tx)
99            } else {
100                self.store.filter_visible_node_ids(&all_ids, epoch)
101            }
102        } else {
103            all_ids
104        };
105
106        if self.batch.is_empty() {
107            self.exhausted = true;
108        }
109    }
110}
111
112impl Operator for ScanOperator {
113    fn next(&mut self) -> OperatorResult {
114        self.load_batch();
115
116        if self.exhausted || self.position >= self.batch.len() {
117            return Ok(None);
118        }
119
120        // Create output chunk with node IDs
121        let schema = [LogicalType::Node];
122        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
123
124        let end = (self.position + self.chunk_capacity).min(self.batch.len());
125        let count = end - self.position;
126
127        {
128            // Column 0 guaranteed to exist: chunk created with single-column schema above
129            let col = chunk
130                .column_mut(0)
131                .expect("column 0 exists: chunk created with single-column schema");
132            for i in self.position..end {
133                col.push_node_id(self.batch[i]);
134            }
135        }
136
137        chunk.set_count(count);
138        self.position = end;
139
140        Ok(Some(chunk))
141    }
142
143    fn reset(&mut self) {
144        self.position = 0;
145        self.batch.clear();
146        self.exhausted = false;
147    }
148
149    fn name(&self) -> &'static str {
150        "Scan"
151    }
152
153    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
154        self
155    }
156}
157
158#[cfg(all(test, feature = "lpg"))]
159mod tests {
160    use super::*;
161    use crate::graph::GraphStoreMut;
162    use crate::graph::lpg::LpgStore;
163
164    #[test]
165    fn test_scan_by_label() {
166        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
167
168        store.create_node(&["Person"]);
169        store.create_node(&["Person"]);
170        store.create_node(&["Animal"]);
171
172        let mut scan =
173            ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person");
174
175        let chunk = scan.next().unwrap().unwrap();
176        assert_eq!(chunk.row_count(), 2);
177
178        // Should be exhausted
179        let next = scan.next().unwrap();
180        assert!(next.is_none());
181    }
182
183    #[test]
184    fn test_scan_reset() {
185        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
186        store.create_node(&["Person"]);
187
188        let mut scan =
189            ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person");
190
191        // First scan
192        let chunk1 = scan.next().unwrap().unwrap();
193        assert_eq!(chunk1.row_count(), 1);
194
195        // Reset
196        scan.reset();
197
198        // Second scan should work
199        let chunk2 = scan.next().unwrap().unwrap();
200        assert_eq!(chunk2.row_count(), 1);
201    }
202
203    #[test]
204    fn test_full_scan() {
205        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
206
207        // Create nodes with different labels
208        store.create_node(&["Person"]);
209        store.create_node(&["Person"]);
210        store.create_node(&["Animal"]);
211        store.create_node(&["Place"]);
212
213        // Full scan (no label filter) should return all nodes
214        let mut scan = ScanOperator::new(store.clone() as Arc<dyn GraphStoreSearch>);
215
216        let chunk = scan.next().unwrap().unwrap();
217        assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
218
219        // Should be exhausted
220        let next = scan.next().unwrap();
221        assert!(next.is_none());
222    }
223
224    #[test]
225    fn test_scan_with_mvcc_context() {
226        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
227
228        // Create nodes at epoch 1 (using SYSTEM tx so they get real epochs,
229        // not PENDING; this test is about epoch-based time-travel scanning).
230        let epoch1 = EpochId::new(1);
231        store.create_node_versioned(&["Person"], epoch1, TransactionId::SYSTEM);
232        store.create_node_versioned(&["Person"], epoch1, TransactionId::SYSTEM);
233
234        // Create a node at epoch 5
235        let epoch5 = EpochId::new(5);
236        store.create_node_versioned(&["Person"], epoch5, TransactionId::SYSTEM);
237
238        // Scan at epoch 3 should see only the first 2 nodes (created at epoch 1)
239        let mut scan =
240            ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person")
241                .with_transaction_context(EpochId::new(3), None);
242
243        let chunk = scan.next().unwrap().unwrap();
244        assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
245
246        // Scan at epoch 5 should see all 3 nodes
247        let mut scan_all =
248            ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person")
249                .with_transaction_context(EpochId::new(5), None);
250
251        let chunk_all = scan_all.next().unwrap().unwrap();
252        assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
253    }
254
255    #[test]
256    fn test_scan_into_any() {
257        let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
258        let op = ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person");
259        let any = Box::new(op).into_any();
260        assert!(any.downcast::<ScanOperator>().is_ok());
261    }
262}