grafeo_core/execution/operators/
single_row.rs1use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use grafeo_common::types::{LogicalType, NodeId};
13
14pub struct SingleRowOperator {
19 produced: bool,
21}
22
23impl SingleRowOperator {
24 #[must_use]
26 pub fn new() -> Self {
27 Self { produced: false }
28 }
29}
30
31impl Default for SingleRowOperator {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Operator for SingleRowOperator {
38 fn next(&mut self) -> OperatorResult {
39 if self.produced {
40 return Ok(None);
41 }
42
43 self.produced = true;
44
45 let mut chunk = DataChunk::with_capacity(&[], 1);
47 chunk.set_count(1);
48
49 Ok(Some(chunk))
50 }
51
52 fn reset(&mut self) {
53 self.produced = false;
54 }
55
56 fn name(&self) -> &'static str {
57 "SingleRow"
58 }
59
60 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
61 self
62 }
63}
64
65pub struct EmptyOperator;
71
72impl EmptyOperator {
73 #[must_use]
75 pub fn new(_schema: Vec<LogicalType>) -> Self {
76 Self
77 }
78}
79
80impl Operator for EmptyOperator {
81 fn next(&mut self) -> OperatorResult {
82 Ok(None)
84 }
85
86 fn reset(&mut self) {
87 }
89
90 fn name(&self) -> &'static str {
91 "Empty"
92 }
93
94 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
95 self
96 }
97}
98
99pub struct NodeListOperator {
119 nodes: Vec<NodeId>,
121 position: usize,
123 chunk_size: usize,
125}
126
127impl NodeListOperator {
128 #[must_use]
130 pub fn new(nodes: Vec<NodeId>, chunk_size: usize) -> Self {
131 Self {
132 nodes,
133 position: 0,
134 chunk_size,
135 }
136 }
137}
138
139impl Operator for NodeListOperator {
140 fn next(&mut self) -> OperatorResult {
141 if self.position >= self.nodes.len() {
142 return Ok(None);
143 }
144
145 let end = (self.position + self.chunk_size).min(self.nodes.len());
146 let count = end - self.position;
147
148 let schema = [LogicalType::Node];
149 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_size);
150
151 {
152 let col = chunk
153 .column_mut(0)
154 .expect("column 0 exists: chunk created with single-column schema");
155 for i in self.position..end {
156 col.push_node_id(self.nodes[i]);
157 }
158 }
159
160 chunk.set_count(count);
161 self.position = end;
162
163 Ok(Some(chunk))
164 }
165
166 fn reset(&mut self) {
167 self.position = 0;
168 }
169
170 fn name(&self) -> &'static str {
171 "NodeList"
172 }
173
174 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
175 self
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn test_single_row_operator() {
185 let mut op = SingleRowOperator::new();
186
187 let chunk = op.next().unwrap();
189 assert!(chunk.is_some());
190 let chunk = chunk.unwrap();
191 assert_eq!(chunk.row_count(), 1);
192
193 let chunk = op.next().unwrap();
195 assert!(chunk.is_none());
196
197 op.reset();
199 let chunk = op.next().unwrap();
200 assert!(chunk.is_some());
201 }
202
203 #[test]
204 fn test_empty_operator() {
205 let mut op = EmptyOperator::new(vec![LogicalType::Int64]);
206
207 let chunk = op.next().unwrap();
209 assert!(chunk.is_none());
210
211 op.reset();
213 let chunk = op.next().unwrap();
214 assert!(chunk.is_none());
215 }
216
217 #[test]
218 fn test_node_list_operator() {
219 let nodes = vec![NodeId::new(1), NodeId::new(5), NodeId::new(10)];
220 let mut op = NodeListOperator::new(nodes, 2);
221
222 let chunk = op.next().unwrap();
224 assert!(chunk.is_some());
225 let chunk = chunk.unwrap();
226 assert_eq!(chunk.row_count(), 2);
227
228 let chunk = op.next().unwrap();
230 assert!(chunk.is_some());
231 let chunk = chunk.unwrap();
232 assert_eq!(chunk.row_count(), 1);
233
234 let chunk = op.next().unwrap();
236 assert!(chunk.is_none());
237
238 op.reset();
240 let chunk = op.next().unwrap();
241 assert!(chunk.is_some());
242 let chunk = chunk.unwrap();
243 assert_eq!(chunk.row_count(), 2);
244 }
245
246 #[test]
247 fn test_node_list_operator_empty() {
248 let mut op = NodeListOperator::new(vec![], 10);
249
250 let chunk = op.next().unwrap();
252 assert!(chunk.is_none());
253 }
254
255 #[test]
256 fn test_single_row_into_any() {
257 let op = SingleRowOperator::new();
258 let any = Box::new(op).into_any();
259 assert!(any.downcast::<SingleRowOperator>().is_ok());
260 }
261
262 #[test]
263 fn test_empty_operator_into_any() {
264 let op = EmptyOperator::new(vec![LogicalType::Int64]);
265 let any = Box::new(op).into_any();
266 assert!(any.downcast::<EmptyOperator>().is_ok());
267 }
268
269 #[test]
270 fn test_node_list_operator_into_any() {
271 let op = NodeListOperator::new(vec![NodeId::new(1)], 10);
272 let any = Box::new(op).into_any();
273 assert!(any.downcast::<NodeListOperator>().is_ok());
274 }
275}