grafeo_core/execution/operators/
scan.rs1use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::GraphStore;
6use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId};
7use std::sync::Arc;
8
9pub struct ScanOperator {
11 store: Arc<dyn GraphStore>,
13 label: Option<String>,
15 position: usize,
17 batch: Vec<NodeId>,
19 exhausted: bool,
21 chunk_capacity: usize,
23 transaction_id: Option<TransactionId>,
25 viewing_epoch: Option<EpochId>,
27}
28
29impl ScanOperator {
30 pub fn new(store: Arc<dyn GraphStore>) -> Self {
32 Self {
33 store,
34 label: None,
35 position: 0,
36 batch: Vec::new(),
37 exhausted: false,
38 chunk_capacity: 2048,
39 transaction_id: None,
40 viewing_epoch: None,
41 }
42 }
43
44 pub fn with_label(store: Arc<dyn GraphStore>, label: impl Into<String>) -> Self {
46 Self {
47 store,
48 label: Some(label.into()),
49 position: 0,
50 batch: Vec::new(),
51 exhausted: false,
52 chunk_capacity: 2048,
53 transaction_id: None,
54 viewing_epoch: None,
55 }
56 }
57
58 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
60 self.chunk_capacity = capacity;
61 self
62 }
63
64 pub fn with_transaction_context(
68 mut self,
69 epoch: EpochId,
70 transaction_id: Option<TransactionId>,
71 ) -> Self {
72 self.viewing_epoch = Some(epoch);
73 self.transaction_id = transaction_id;
74 self
75 }
76
77 fn load_batch(&mut self) {
78 if !self.batch.is_empty() || self.exhausted {
79 return;
80 }
81
82 let all_ids = match &self.label {
84 Some(label) => self.store.nodes_by_label(label),
85 None => self.store.node_ids(),
86 };
87
88 self.batch = if let Some(epoch) = self.viewing_epoch {
90 if let Some(tx) = self.transaction_id {
91 all_ids
93 .into_iter()
94 .filter(|id| self.store.get_node_versioned(*id, epoch, tx).is_some())
95 .collect()
96 } else {
97 all_ids
99 .into_iter()
100 .filter(|id| self.store.get_node_at_epoch(*id, epoch).is_some())
101 .collect()
102 }
103 } else {
104 all_ids
105 };
106
107 if self.batch.is_empty() {
108 self.exhausted = true;
109 }
110 }
111}
112
113impl Operator for ScanOperator {
114 fn next(&mut self) -> OperatorResult {
115 self.load_batch();
116
117 if self.exhausted || self.position >= self.batch.len() {
118 return Ok(None);
119 }
120
121 let schema = [LogicalType::Node];
123 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
124
125 let end = (self.position + self.chunk_capacity).min(self.batch.len());
126 let count = end - self.position;
127
128 {
129 let col = chunk
131 .column_mut(0)
132 .expect("column 0 exists: chunk created with single-column schema");
133 for i in self.position..end {
134 col.push_node_id(self.batch[i]);
135 }
136 }
137
138 chunk.set_count(count);
139 self.position = end;
140
141 Ok(Some(chunk))
142 }
143
144 fn reset(&mut self) {
145 self.position = 0;
146 self.batch.clear();
147 self.exhausted = false;
148 }
149
150 fn name(&self) -> &'static str {
151 "Scan"
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::graph::GraphStoreMut;
159 use crate::graph::lpg::LpgStore;
160
161 #[test]
162 fn test_scan_by_label() {
163 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
164
165 store.create_node(&["Person"]);
166 store.create_node(&["Person"]);
167 store.create_node(&["Animal"]);
168
169 let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
170
171 let chunk = scan.next().unwrap().unwrap();
172 assert_eq!(chunk.row_count(), 2);
173
174 let next = scan.next().unwrap();
176 assert!(next.is_none());
177 }
178
179 #[test]
180 fn test_scan_reset() {
181 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
182 store.create_node(&["Person"]);
183
184 let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
185
186 let chunk1 = scan.next().unwrap().unwrap();
188 assert_eq!(chunk1.row_count(), 1);
189
190 scan.reset();
192
193 let chunk2 = scan.next().unwrap().unwrap();
195 assert_eq!(chunk2.row_count(), 1);
196 }
197
198 #[test]
199 fn test_full_scan() {
200 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
201
202 store.create_node(&["Person"]);
204 store.create_node(&["Person"]);
205 store.create_node(&["Animal"]);
206 store.create_node(&["Place"]);
207
208 let mut scan = ScanOperator::new(store.clone() as Arc<dyn GraphStore>);
210
211 let chunk = scan.next().unwrap().unwrap();
212 assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
213
214 let next = scan.next().unwrap();
216 assert!(next.is_none());
217 }
218
219 #[test]
220 fn test_scan_with_mvcc_context() {
221 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
222
223 let epoch1 = EpochId::new(1);
225 let tx1 = TransactionId::new(1);
226 store.create_node_versioned(&["Person"], epoch1, tx1);
227 store.create_node_versioned(&["Person"], epoch1, tx1);
228
229 let epoch5 = EpochId::new(5);
231 let tx2 = TransactionId::new(2);
232 store.create_node_versioned(&["Person"], epoch5, tx2);
233
234 let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
236 .with_transaction_context(EpochId::new(3), None);
237
238 let chunk = scan.next().unwrap().unwrap();
239 assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
240
241 let mut scan_all = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
243 .with_transaction_context(EpochId::new(5), None);
244
245 let chunk_all = scan_all.next().unwrap().unwrap();
246 assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
247 }
248}