grafeo_core/execution/operators/
scan.rs1use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::GraphStoreSearch;
6use grafeo_common::types::{EpochId, LogicalType, NodeId, TransactionId};
7use std::sync::Arc;
8
9pub struct ScanOperator {
11 store: Arc<dyn GraphStoreSearch>,
13 label: Option<String>,
15 position: usize,
17 batch: Vec<NodeId>,
19 exhausted: bool,
21 chunk_capacity: usize,
23 transaction_id: Option<TransactionId>,
25 viewing_epoch: Option<EpochId>,
27}
28
29impl ScanOperator {
30 pub fn new(store: Arc<dyn GraphStoreSearch>) -> Self {
32 Self {
33 store,
34 label: None,
35 position: 0,
36 batch: Vec::new(),
37 exhausted: false,
38 chunk_capacity: 2048,
39 transaction_id: None,
40 viewing_epoch: None,
41 }
42 }
43
44 pub fn with_label(store: Arc<dyn GraphStoreSearch>, label: impl Into<String>) -> Self {
46 Self {
47 store,
48 label: Some(label.into()),
49 position: 0,
50 batch: Vec::new(),
51 exhausted: false,
52 chunk_capacity: 2048,
53 transaction_id: None,
54 viewing_epoch: None,
55 }
56 }
57
58 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
60 self.chunk_capacity = capacity;
61 self
62 }
63
64 pub fn with_transaction_context(
68 mut self,
69 epoch: EpochId,
70 transaction_id: Option<TransactionId>,
71 ) -> Self {
72 self.viewing_epoch = Some(epoch);
73 self.transaction_id = transaction_id;
74 self
75 }
76
77 fn load_batch(&mut self) {
78 if !self.batch.is_empty() || self.exhausted {
79 return;
80 }
81
82 let all_ids = match &self.label {
87 Some(label) => self.store.nodes_by_label(label),
88 None if self.viewing_epoch.is_some() => self.store.all_node_ids(),
89 None => self.store.node_ids(),
90 };
91
92 self.batch = if let Some(epoch) = self.viewing_epoch {
96 if let Some(tx) = self.transaction_id {
97 self.store
98 .filter_visible_node_ids_versioned(&all_ids, epoch, tx)
99 } else {
100 self.store.filter_visible_node_ids(&all_ids, epoch)
101 }
102 } else {
103 all_ids
104 };
105
106 if self.batch.is_empty() {
107 self.exhausted = true;
108 }
109 }
110}
111
112impl Operator for ScanOperator {
113 fn next(&mut self) -> OperatorResult {
114 self.load_batch();
115
116 if self.exhausted || self.position >= self.batch.len() {
117 return Ok(None);
118 }
119
120 let schema = [LogicalType::Node];
122 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
123
124 let end = (self.position + self.chunk_capacity).min(self.batch.len());
125 let count = end - self.position;
126
127 {
128 let col = chunk
130 .column_mut(0)
131 .expect("column 0 exists: chunk created with single-column schema");
132 for i in self.position..end {
133 col.push_node_id(self.batch[i]);
134 }
135 }
136
137 chunk.set_count(count);
138 self.position = end;
139
140 Ok(Some(chunk))
141 }
142
143 fn reset(&mut self) {
144 self.position = 0;
145 self.batch.clear();
146 self.exhausted = false;
147 }
148
149 fn name(&self) -> &'static str {
150 "Scan"
151 }
152
153 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
154 self
155 }
156}
157
158#[cfg(all(test, feature = "lpg"))]
159mod tests {
160 use super::*;
161 use crate::graph::GraphStoreMut;
162 use crate::graph::lpg::LpgStore;
163
164 #[test]
165 fn test_scan_by_label() {
166 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
167
168 store.create_node(&["Person"]);
169 store.create_node(&["Person"]);
170 store.create_node(&["Animal"]);
171
172 let mut scan =
173 ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person");
174
175 let chunk = scan.next().unwrap().unwrap();
176 assert_eq!(chunk.row_count(), 2);
177
178 let next = scan.next().unwrap();
180 assert!(next.is_none());
181 }
182
183 #[test]
184 fn test_scan_reset() {
185 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
186 store.create_node(&["Person"]);
187
188 let mut scan =
189 ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person");
190
191 let chunk1 = scan.next().unwrap().unwrap();
193 assert_eq!(chunk1.row_count(), 1);
194
195 scan.reset();
197
198 let chunk2 = scan.next().unwrap().unwrap();
200 assert_eq!(chunk2.row_count(), 1);
201 }
202
203 #[test]
204 fn test_full_scan() {
205 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
206
207 store.create_node(&["Person"]);
209 store.create_node(&["Person"]);
210 store.create_node(&["Animal"]);
211 store.create_node(&["Place"]);
212
213 let mut scan = ScanOperator::new(store.clone() as Arc<dyn GraphStoreSearch>);
215
216 let chunk = scan.next().unwrap().unwrap();
217 assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
218
219 let next = scan.next().unwrap();
221 assert!(next.is_none());
222 }
223
224 #[test]
225 fn test_scan_with_mvcc_context() {
226 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
227
228 let epoch1 = EpochId::new(1);
231 store.create_node_versioned(&["Person"], epoch1, TransactionId::SYSTEM);
232 store.create_node_versioned(&["Person"], epoch1, TransactionId::SYSTEM);
233
234 let epoch5 = EpochId::new(5);
236 store.create_node_versioned(&["Person"], epoch5, TransactionId::SYSTEM);
237
238 let mut scan =
240 ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person")
241 .with_transaction_context(EpochId::new(3), None);
242
243 let chunk = scan.next().unwrap().unwrap();
244 assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
245
246 let mut scan_all =
248 ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person")
249 .with_transaction_context(EpochId::new(5), None);
250
251 let chunk_all = scan_all.next().unwrap().unwrap();
252 assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
253 }
254
255 #[test]
256 fn test_scan_into_any() {
257 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
258 let op = ScanOperator::with_label(store.clone() as Arc<dyn GraphStoreSearch>, "Person");
259 let any = Box::new(op).into_any();
260 assert!(any.downcast::<ScanOperator>().is_ok());
261 }
262}