grafeo_core/execution/operators/
single_row.rs1use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use grafeo_common::types::{LogicalType, NodeId};
13
14pub struct SingleRowOperator {
19 produced: bool,
21}
22
23impl SingleRowOperator {
24 #[must_use]
26 pub fn new() -> Self {
27 Self { produced: false }
28 }
29}
30
31impl Default for SingleRowOperator {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Operator for SingleRowOperator {
38 fn next(&mut self) -> OperatorResult {
39 if self.produced {
40 return Ok(None);
41 }
42
43 self.produced = true;
44
45 let mut chunk = DataChunk::with_capacity(&[], 1);
47 chunk.set_count(1);
48
49 Ok(Some(chunk))
50 }
51
52 fn reset(&mut self) {
53 self.produced = false;
54 }
55
56 fn name(&self) -> &'static str {
57 "SingleRow"
58 }
59}
60
61pub struct EmptyOperator {
67 #[allow(dead_code)]
69 schema: Vec<LogicalType>,
70}
71
72impl EmptyOperator {
73 #[must_use]
75 pub fn new(schema: Vec<LogicalType>) -> Self {
76 Self { schema }
77 }
78}
79
80impl Operator for EmptyOperator {
81 fn next(&mut self) -> OperatorResult {
82 Ok(None)
84 }
85
86 fn reset(&mut self) {
87 }
89
90 fn name(&self) -> &'static str {
91 "Empty"
92 }
93}
94
95pub struct NodeListOperator {
115 nodes: Vec<NodeId>,
117 position: usize,
119 chunk_size: usize,
121}
122
123impl NodeListOperator {
124 #[must_use]
126 pub fn new(nodes: Vec<NodeId>, chunk_size: usize) -> Self {
127 Self {
128 nodes,
129 position: 0,
130 chunk_size,
131 }
132 }
133}
134
135impl Operator for NodeListOperator {
136 fn next(&mut self) -> OperatorResult {
137 if self.position >= self.nodes.len() {
138 return Ok(None);
139 }
140
141 let end = (self.position + self.chunk_size).min(self.nodes.len());
142 let count = end - self.position;
143
144 let schema = [LogicalType::Node];
145 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_size);
146
147 {
148 let col = chunk
149 .column_mut(0)
150 .expect("column 0 exists: chunk created with single-column schema");
151 for i in self.position..end {
152 col.push_node_id(self.nodes[i]);
153 }
154 }
155
156 chunk.set_count(count);
157 self.position = end;
158
159 Ok(Some(chunk))
160 }
161
162 fn reset(&mut self) {
163 self.position = 0;
164 }
165
166 fn name(&self) -> &'static str {
167 "NodeList"
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_single_row_operator() {
177 let mut op = SingleRowOperator::new();
178
179 let chunk = op.next().unwrap();
181 assert!(chunk.is_some());
182 let chunk = chunk.unwrap();
183 assert_eq!(chunk.row_count(), 1);
184
185 let chunk = op.next().unwrap();
187 assert!(chunk.is_none());
188
189 op.reset();
191 let chunk = op.next().unwrap();
192 assert!(chunk.is_some());
193 }
194
195 #[test]
196 fn test_empty_operator() {
197 let mut op = EmptyOperator::new(vec![LogicalType::Int64]);
198
199 let chunk = op.next().unwrap();
201 assert!(chunk.is_none());
202
203 op.reset();
205 let chunk = op.next().unwrap();
206 assert!(chunk.is_none());
207 }
208
209 #[test]
210 fn test_node_list_operator() {
211 let nodes = vec![NodeId::new(1), NodeId::new(5), NodeId::new(10)];
212 let mut op = NodeListOperator::new(nodes, 2);
213
214 let chunk = op.next().unwrap();
216 assert!(chunk.is_some());
217 let chunk = chunk.unwrap();
218 assert_eq!(chunk.row_count(), 2);
219
220 let chunk = op.next().unwrap();
222 assert!(chunk.is_some());
223 let chunk = chunk.unwrap();
224 assert_eq!(chunk.row_count(), 1);
225
226 let chunk = op.next().unwrap();
228 assert!(chunk.is_none());
229
230 op.reset();
232 let chunk = op.next().unwrap();
233 assert!(chunk.is_some());
234 let chunk = chunk.unwrap();
235 assert_eq!(chunk.row_count(), 2);
236 }
237
238 #[test]
239 fn test_node_list_operator_empty() {
240 let mut op = NodeListOperator::new(vec![], 10);
241
242 let chunk = op.next().unwrap();
244 assert!(chunk.is_none());
245 }
246}