grafeo_core/execution/operators/
scan.rs1use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::GraphStore;
6use grafeo_common::types::{EpochId, LogicalType, NodeId, TxId};
7use std::sync::Arc;
8
9pub struct ScanOperator {
11 store: Arc<dyn GraphStore>,
13 label: Option<String>,
15 position: usize,
17 batch: Vec<NodeId>,
19 exhausted: bool,
21 chunk_capacity: usize,
23 tx_id: Option<TxId>,
25 viewing_epoch: Option<EpochId>,
27}
28
29impl ScanOperator {
30 pub fn new(store: Arc<dyn GraphStore>) -> Self {
32 Self {
33 store,
34 label: None,
35 position: 0,
36 batch: Vec::new(),
37 exhausted: false,
38 chunk_capacity: 2048,
39 tx_id: None,
40 viewing_epoch: None,
41 }
42 }
43
44 pub fn with_label(store: Arc<dyn GraphStore>, label: impl Into<String>) -> Self {
46 Self {
47 store,
48 label: Some(label.into()),
49 position: 0,
50 batch: Vec::new(),
51 exhausted: false,
52 chunk_capacity: 2048,
53 tx_id: None,
54 viewing_epoch: None,
55 }
56 }
57
58 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
60 self.chunk_capacity = capacity;
61 self
62 }
63
64 pub fn with_tx_context(mut self, epoch: EpochId, tx_id: Option<TxId>) -> Self {
68 self.viewing_epoch = Some(epoch);
69 self.tx_id = tx_id;
70 self
71 }
72
73 fn load_batch(&mut self) {
74 if !self.batch.is_empty() || self.exhausted {
75 return;
76 }
77
78 let all_ids = match &self.label {
80 Some(label) => self.store.nodes_by_label(label),
81 None => self.store.node_ids(),
82 };
83
84 self.batch = if let Some(epoch) = self.viewing_epoch {
86 if let Some(tx) = self.tx_id {
87 all_ids
89 .into_iter()
90 .filter(|id| self.store.get_node_versioned(*id, epoch, tx).is_some())
91 .collect()
92 } else {
93 all_ids
95 .into_iter()
96 .filter(|id| self.store.get_node_at_epoch(*id, epoch).is_some())
97 .collect()
98 }
99 } else {
100 all_ids
101 };
102
103 if self.batch.is_empty() {
104 self.exhausted = true;
105 }
106 }
107}
108
109impl Operator for ScanOperator {
110 fn next(&mut self) -> OperatorResult {
111 self.load_batch();
112
113 if self.exhausted || self.position >= self.batch.len() {
114 return Ok(None);
115 }
116
117 let schema = [LogicalType::Node];
119 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
120
121 let end = (self.position + self.chunk_capacity).min(self.batch.len());
122 let count = end - self.position;
123
124 {
125 let col = chunk
127 .column_mut(0)
128 .expect("column 0 exists: chunk created with single-column schema");
129 for i in self.position..end {
130 col.push_node_id(self.batch[i]);
131 }
132 }
133
134 chunk.set_count(count);
135 self.position = end;
136
137 Ok(Some(chunk))
138 }
139
140 fn reset(&mut self) {
141 self.position = 0;
142 self.batch.clear();
143 self.exhausted = false;
144 }
145
146 fn name(&self) -> &'static str {
147 "Scan"
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::graph::GraphStoreMut;
155 use crate::graph::lpg::LpgStore;
156
157 #[test]
158 fn test_scan_by_label() {
159 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
160
161 store.create_node(&["Person"]);
162 store.create_node(&["Person"]);
163 store.create_node(&["Animal"]);
164
165 let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
166
167 let chunk = scan.next().unwrap().unwrap();
168 assert_eq!(chunk.row_count(), 2);
169
170 let next = scan.next().unwrap();
172 assert!(next.is_none());
173 }
174
175 #[test]
176 fn test_scan_reset() {
177 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
178 store.create_node(&["Person"]);
179
180 let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person");
181
182 let chunk1 = scan.next().unwrap().unwrap();
184 assert_eq!(chunk1.row_count(), 1);
185
186 scan.reset();
188
189 let chunk2 = scan.next().unwrap().unwrap();
191 assert_eq!(chunk2.row_count(), 1);
192 }
193
194 #[test]
195 fn test_full_scan() {
196 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
197
198 store.create_node(&["Person"]);
200 store.create_node(&["Person"]);
201 store.create_node(&["Animal"]);
202 store.create_node(&["Place"]);
203
204 let mut scan = ScanOperator::new(store.clone() as Arc<dyn GraphStore>);
206
207 let chunk = scan.next().unwrap().unwrap();
208 assert_eq!(chunk.row_count(), 4, "Full scan should return all 4 nodes");
209
210 let next = scan.next().unwrap();
212 assert!(next.is_none());
213 }
214
215 #[test]
216 fn test_scan_with_mvcc_context() {
217 let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
218
219 let epoch1 = EpochId::new(1);
221 let tx1 = TxId::new(1);
222 store.create_node_versioned(&["Person"], epoch1, tx1);
223 store.create_node_versioned(&["Person"], epoch1, tx1);
224
225 let epoch5 = EpochId::new(5);
227 let tx2 = TxId::new(2);
228 store.create_node_versioned(&["Person"], epoch5, tx2);
229
230 let mut scan = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
232 .with_tx_context(EpochId::new(3), None);
233
234 let chunk = scan.next().unwrap().unwrap();
235 assert_eq!(chunk.row_count(), 2, "Should see 2 nodes at epoch 3");
236
237 let mut scan_all = ScanOperator::with_label(store.clone() as Arc<dyn GraphStore>, "Person")
239 .with_tx_context(EpochId::new(5), None);
240
241 let chunk_all = scan_all.next().unwrap().unwrap();
242 assert_eq!(chunk_all.row_count(), 3, "Should see 3 nodes at epoch 5");
243 }
244}