graphos_core/execution/operators/
scan.rs1use super::{Operator, OperatorResult};
4use crate::execution::DataChunk;
5use crate::graph::lpg::LpgStore;
6use graphos_common::types::{LogicalType, NodeId};
7use std::sync::Arc;
8
9pub struct ScanOperator {
11 store: Arc<LpgStore>,
13 label: Option<String>,
15 position: usize,
17 batch: Vec<NodeId>,
19 exhausted: bool,
21 chunk_capacity: usize,
23}
24
25impl ScanOperator {
26 pub fn new(store: Arc<LpgStore>) -> Self {
28 Self {
29 store,
30 label: None,
31 position: 0,
32 batch: Vec::new(),
33 exhausted: false,
34 chunk_capacity: 2048,
35 }
36 }
37
38 pub fn with_label(store: Arc<LpgStore>, label: impl Into<String>) -> Self {
40 Self {
41 store,
42 label: Some(label.into()),
43 position: 0,
44 batch: Vec::new(),
45 exhausted: false,
46 chunk_capacity: 2048,
47 }
48 }
49
50 pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
52 self.chunk_capacity = capacity;
53 self
54 }
55
56 fn load_batch(&mut self) {
57 if !self.batch.is_empty() || self.exhausted {
58 return;
59 }
60
61 self.batch = match &self.label {
62 Some(label) => self.store.nodes_by_label(label),
63 None => {
64 Vec::new()
67 }
68 };
69
70 if self.batch.is_empty() {
71 self.exhausted = true;
72 }
73 }
74}
75
76impl Operator for ScanOperator {
77 fn next(&mut self) -> OperatorResult {
78 self.load_batch();
79
80 if self.exhausted || self.position >= self.batch.len() {
81 return Ok(None);
82 }
83
84 let schema = [LogicalType::Node];
86 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
87
88 let end = (self.position + self.chunk_capacity).min(self.batch.len());
89 let count = end - self.position;
90
91 {
92 let col = chunk.column_mut(0).unwrap();
93 for i in self.position..end {
94 col.push_node_id(self.batch[i]);
95 }
96 }
97
98 chunk.set_count(count);
99 self.position = end;
100
101 Ok(Some(chunk))
102 }
103
104 fn reset(&mut self) {
105 self.position = 0;
106 self.batch.clear();
107 self.exhausted = false;
108 }
109
110 fn name(&self) -> &'static str {
111 "Scan"
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118
119 #[test]
120 fn test_scan_by_label() {
121 let store = Arc::new(LpgStore::new());
122
123 store.create_node(&["Person"]);
124 store.create_node(&["Person"]);
125 store.create_node(&["Animal"]);
126
127 let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person");
128
129 let chunk = scan.next().unwrap().unwrap();
130 assert_eq!(chunk.row_count(), 2);
131
132 let next = scan.next().unwrap();
134 assert!(next.is_none());
135 }
136
137 #[test]
138 fn test_scan_reset() {
139 let store = Arc::new(LpgStore::new());
140 store.create_node(&["Person"]);
141
142 let mut scan = ScanOperator::with_label(Arc::clone(&store), "Person");
143
144 let chunk1 = scan.next().unwrap().unwrap();
146 assert_eq!(chunk1.row_count(), 1);
147
148 scan.reset();
150
151 let chunk2 = scan.next().unwrap().unwrap();
153 assert_eq!(chunk2.row_count(), 1);
154 }
155}