grafeo_core/execution/operators/
single_row.rs1use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use grafeo_common::types::{LogicalType, NodeId};
13
14pub struct SingleRowOperator {
19 produced: bool,
21}
22
23impl SingleRowOperator {
24 #[must_use]
26 pub fn new() -> Self {
27 Self { produced: false }
28 }
29}
30
31impl Default for SingleRowOperator {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl Operator for SingleRowOperator {
38 fn next(&mut self) -> OperatorResult {
39 if self.produced {
40 return Ok(None);
41 }
42
43 self.produced = true;
44
45 let mut chunk = DataChunk::with_capacity(&[], 1);
47 chunk.set_count(1);
48
49 Ok(Some(chunk))
50 }
51
52 fn reset(&mut self) {
53 self.produced = false;
54 }
55
56 fn name(&self) -> &'static str {
57 "SingleRow"
58 }
59}
60
61pub struct EmptyOperator;
67
68impl EmptyOperator {
69 #[must_use]
71 pub fn new(_schema: Vec<LogicalType>) -> Self {
72 Self
73 }
74}
75
76impl Operator for EmptyOperator {
77 fn next(&mut self) -> OperatorResult {
78 Ok(None)
80 }
81
82 fn reset(&mut self) {
83 }
85
86 fn name(&self) -> &'static str {
87 "Empty"
88 }
89}
90
91pub struct NodeListOperator {
111 nodes: Vec<NodeId>,
113 position: usize,
115 chunk_size: usize,
117}
118
119impl NodeListOperator {
120 #[must_use]
122 pub fn new(nodes: Vec<NodeId>, chunk_size: usize) -> Self {
123 Self {
124 nodes,
125 position: 0,
126 chunk_size,
127 }
128 }
129}
130
131impl Operator for NodeListOperator {
132 fn next(&mut self) -> OperatorResult {
133 if self.position >= self.nodes.len() {
134 return Ok(None);
135 }
136
137 let end = (self.position + self.chunk_size).min(self.nodes.len());
138 let count = end - self.position;
139
140 let schema = [LogicalType::Node];
141 let mut chunk = DataChunk::with_capacity(&schema, self.chunk_size);
142
143 {
144 let col = chunk
145 .column_mut(0)
146 .expect("column 0 exists: chunk created with single-column schema");
147 for i in self.position..end {
148 col.push_node_id(self.nodes[i]);
149 }
150 }
151
152 chunk.set_count(count);
153 self.position = end;
154
155 Ok(Some(chunk))
156 }
157
158 fn reset(&mut self) {
159 self.position = 0;
160 }
161
162 fn name(&self) -> &'static str {
163 "NodeList"
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn test_single_row_operator() {
173 let mut op = SingleRowOperator::new();
174
175 let chunk = op.next().unwrap();
177 assert!(chunk.is_some());
178 let chunk = chunk.unwrap();
179 assert_eq!(chunk.row_count(), 1);
180
181 let chunk = op.next().unwrap();
183 assert!(chunk.is_none());
184
185 op.reset();
187 let chunk = op.next().unwrap();
188 assert!(chunk.is_some());
189 }
190
191 #[test]
192 fn test_empty_operator() {
193 let mut op = EmptyOperator::new(vec![LogicalType::Int64]);
194
195 let chunk = op.next().unwrap();
197 assert!(chunk.is_none());
198
199 op.reset();
201 let chunk = op.next().unwrap();
202 assert!(chunk.is_none());
203 }
204
205 #[test]
206 fn test_node_list_operator() {
207 let nodes = vec![NodeId::new(1), NodeId::new(5), NodeId::new(10)];
208 let mut op = NodeListOperator::new(nodes, 2);
209
210 let chunk = op.next().unwrap();
212 assert!(chunk.is_some());
213 let chunk = chunk.unwrap();
214 assert_eq!(chunk.row_count(), 2);
215
216 let chunk = op.next().unwrap();
218 assert!(chunk.is_some());
219 let chunk = chunk.unwrap();
220 assert_eq!(chunk.row_count(), 1);
221
222 let chunk = op.next().unwrap();
224 assert!(chunk.is_none());
225
226 op.reset();
228 let chunk = op.next().unwrap();
229 assert!(chunk.is_some());
230 let chunk = chunk.unwrap();
231 assert_eq!(chunk.row_count(), 2);
232 }
233
234 #[test]
235 fn test_node_list_operator_empty() {
236 let mut op = NodeListOperator::new(vec![], 10);
237
238 let chunk = op.next().unwrap();
240 assert!(chunk.is_none());
241 }
242}