Skip to main content

graphos_core/execution/operators/
scan.rs

1//! Scan operator for reading data from storage.
2
3use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::lpg::LpgStore;
6use graphos_common::types::{EpochId, LogicalType, NodeId, TxId};
7use std::sync::Arc;
8
9/// A scan operator that reads nodes from storage.
10pub struct ScanOperator {
11    /// The store to scan from.
12    store: Arc<LpgStore>,
13    /// Label filter (None = all nodes).
14    label: Option<String>,
15    /// Current position in the scan.
16    position: usize,
17    /// Batch of node IDs to scan.
18    batch: Vec<NodeId>,
19    /// Whether the scan is exhausted.
20    exhausted: bool,
21    /// Chunk capacity.
22    chunk_capacity: usize,
23    /// Transaction ID for MVCC visibility (None = use current epoch).
24    tx_id: Option<TxId>,
25    /// Epoch for version visibility.
26    viewing_epoch: Option<EpochId>,
27}
28
29impl ScanOperator {
30    /// Creates a new scan operator for all nodes.
31    pub fn new(store: Arc<LpgStore>) -> Self {
32        Self {
33            store,
34            label: None,
35            position: 0,
36            batch: Vec::new(),
37            exhausted: false,
38            chunk_capacity: 2048,
39            tx_id: None,
40            viewing_epoch: None,
41        }
42    }
43
44    /// Creates a new scan operator for nodes with a specific label.
45    pub fn with_label(store: Arc<LpgStore>, label: impl Into<String>) -> Self {
46        Self {
47            store,
48            label: Some(label.into()),
49            position: 0,
50            batch: Vec::new(),
51            exhausted: false,
52            chunk_capacity: 2048,
53            tx_id: None,
54            viewing_epoch: None,
55        }
56    }
57
58    /// Sets the chunk capacity.
59    pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
60        self.chunk_capacity = capacity;
61        self
62    }
63
64    /// Sets the transaction context for MVCC visibility.
65    ///
66    /// When set, the scan will only return nodes visible to this transaction.
67    pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
68        self.viewing_epoch = Some(epoch);
69        self.tx_id = tx_id;
70        self
71    }
72
73    fn load_batch(&mut self) {
74        if !self.batch.is_empty() || self.exhausted {
75            return;
76        }
77
78        // Get nodes, using versioned method if tx context is set
79        let all_ids = match &self.label {
80            Some(label) => self.store.nodes_by_label(label),
81            None => self.store.node_ids(),
82        };
83
84        // Filter by visibility if we have tx context
85        self.batch = if let Some(epoch) = self.viewing_epoch {
86            let tx = self.tx_id.unwrap_or(TxId::SYSTEM);
87            all_ids
88                .into_iter()
89                .filter(|id| self.store.get_node_versioned(*id, epoch, tx).is_some())
90                .collect()
91        } else {
92            all_ids
93        };
94
95        if self.batch.is_empty() {
96            self.exhausted = true;
97        }
98    }
99}
100
101impl Operator for ScanOperator {
102    fn next(&mut self) -> OperatorResult {
103        self.load_batch();
104
105        if self.exhausted || self.position >= self.batch.len() {
106            return Ok(None);
107        }
108
109        // Create output chunk with node IDs
110        let schema = [LogicalType::Node];
111        let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
112
113        let end = (self.position + self.chunk_capacity).min(self.batch.len());
114        let count = end - self.position;
115
116        {
117            // Column 0 guaranteed to exist: chunk created with single-column schema above
118            let col = chunk
119                .column_mut(0)
120                .expect("column 0 exists: chunk created with single-column schema");
121            for i in self.position..end {
122                col.push_node_id(self.batch[i]);
123            }
124        }
125
126        chunk.set_count(count);
127        self.position = end;
128
129        Ok(Some(chunk))
130    }
131
132    fn reset(&mut self) {
133        self.position = 0;
134        self.batch.clear();
135        self.exhausted = false;
136    }
137
138    fn name(&self) -> &'static str {
139        "Scan"
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn test_scan_by_label() {
149        let store = Arc::new(LpgStore::new());
150
151        store.create_node(&["Person"]);
152        store.create_node(&["Person"]);
153        store.create_node(&["Animal"]);
154
155        let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person");
156
157        let chunk = scan.next().unwrap().unwrap();
158        assert_eq!(chunk.row_count(), 2);
159
160        // Should be exhausted
161        let next = scan.next().unwrap();
162        assert!(next.is_none());
163    }
164
165    #[test]
166    fn test_scan_reset() {
167        let store = Arc::new(LpgStore::new());
168        store.create_node(&["Person"]);
169
170        let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person");
171
172        // First scan
173        let chunk1 = scan.next().unwrap().unwrap();
174        assert_eq!(chunk1.row_count(), 1);
175
176        // Reset
177        scan.reset();
178
179        // Second scan should work
180        let chunk2 = scan.next().unwrap().unwrap();
181        assert_eq!(chunk2.row_count(), 1);
182    }
183
184    #[test]
185    fn test_full_scan() {
186        let store = Arc::new(LpgStore::new());
187
188        // Create nodes with different labels
189        store.create_node(&["Person"]);
190        store.create_node(&["Person"]);
191        store.create_node(&["Animal"]);
192        store.create_node(&["Place"]);
193
194        // Full scan (no label filter) should return all nodes
195        let mut scan = ScanOperator::new(Arc::clone(&store));
196
197        let chunk = scan.next().unwrap().unwrap();
198        assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
199
200        // Should be exhausted
201        let next = scan.next().unwrap();
202        assert!(next.is_none());
203    }
204
205    #[test]
206    fn test_scan_with_mvcc_context() {
207        let store = Arc::new(LpgStore::new());
208
209        // Create nodes at epoch 1
210        let epoch1 = EpochId::new(1);
211        let tx1 = TxId::new(1);
212        store.create_node_versioned(&["Person"], epoch1, tx1);
213        store.create_node_versioned(&["Person"], epoch1, tx1);
214
215        // Create a node at epoch 5
216        let epoch5 = EpochId::new(5);
217        let tx2 = TxId::new(2);
218        store.create_node_versioned(&["Person"], epoch5, tx2);
219
220        // Scan at epoch 3 should see only the first 2 nodes (created at epoch 1)
221        let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person")
222            .with_tx_context(EpochId::new(3), None);
223
224        let chunk = scan.next().unwrap().unwrap();
225        assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
226
227        // Scan at epoch 5 should see all 3 nodes
228        let mut scan_all = ScanOperator::with_label(Arc::clone(&store), "Person")
229            .with_tx_context(EpochId::new(5), None);
230
231        let chunk_all = scan_all.next().unwrap().unwrap();
232        assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
233    }
234}