grafeo_core/execution/operators/
map_collect.rs1use super::{Operator, OperatorResult};
7use crate::execution::DataChunk;
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::BTreeMap;
10use std::sync::Arc;
11
12pub struct MapCollectOperator {
14 child: Box<dyn Operator>,
15 key_col: usize,
16 value_col: usize,
17 done: bool,
18}
19
20impl MapCollectOperator {
21 #[must_use]
23 pub fn new(child: Box<dyn Operator>, key_col: usize, value_col: usize) -> Self {
24 Self {
25 child,
26 key_col,
27 value_col,
28 done: false,
29 }
30 }
31}
32
33impl Operator for MapCollectOperator {
34 fn next(&mut self) -> OperatorResult {
35 if self.done {
36 return Ok(None);
37 }
38 self.done = true;
39
40 let mut map = BTreeMap::new();
41 while let Some(chunk) = self.child.next()? {
42 for row in chunk.selected_indices() {
43 let key = chunk.column(self.key_col).and_then(|c| c.get_value(row));
44 let value = chunk.column(self.value_col).and_then(|c| c.get_value(row));
45 if let (Some(k), Some(v)) = (key, value) {
46 let key_str: PropertyKey = match &k {
47 Value::String(s) => PropertyKey::from(s.as_str()),
48 other => PropertyKey::from(format!("{other}").as_str()),
49 };
50 map.insert(key_str, v);
51 }
52 }
53 }
54
55 let mut output = DataChunk::with_capacity(&[LogicalType::Any], 1);
56 output
57 .column_mut(0)
58 .expect("column 0 exists: single-column schema")
59 .push_value(Value::Map(Arc::new(map)));
60 output.set_count(1);
61
62 Ok(Some(output))
63 }
64
65 fn reset(&mut self) {
66 self.done = false;
67 self.child.reset();
68 }
69
70 fn name(&self) -> &'static str {
71 "MapCollect"
72 }
73
74 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
75 self
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use crate::execution::DataChunk;
83
84 struct MockOperator {
86 chunks: Vec<DataChunk>,
87 position: usize,
88 }
89
90 impl MockOperator {
91 fn new(chunks: Vec<DataChunk>) -> Self {
92 Self {
93 chunks,
94 position: 0,
95 }
96 }
97 }
98
99 impl Operator for MockOperator {
100 fn next(&mut self) -> OperatorResult {
101 if self.position >= self.chunks.len() {
102 return Ok(None);
103 }
104 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
105 self.position += 1;
106 Ok(Some(chunk))
107 }
108
109 fn reset(&mut self) {
110 self.position = 0;
111 }
112
113 fn name(&self) -> &'static str {
114 "Mock"
115 }
116
117 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
118 self
119 }
120 }
121
122 fn build_chunk(keys: &[&str], values: &[i64]) -> DataChunk {
124 assert_eq!(keys.len(), values.len());
125 let mut chunk =
126 DataChunk::with_capacity(&[LogicalType::String, LogicalType::Int64], keys.len());
127 for key in keys {
128 chunk
129 .column_mut(0)
130 .unwrap()
131 .push_value(Value::String((*key).into()));
132 }
133 for val in values {
134 chunk.column_mut(1).unwrap().push_value(Value::Int64(*val));
135 }
136 chunk.set_count(keys.len());
137 chunk
138 }
139
140 #[test]
141 fn test_basic_map_collection() {
142 let chunk = build_chunk(&["NYC", "LA"], &[2, 1]);
143 let mock = MockOperator::new(vec![chunk]);
144
145 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
146
147 let result = op.next().unwrap();
149 assert!(result.is_some());
150 let result = result.unwrap();
151 assert_eq!(result.row_count(), 1);
152 assert_eq!(result.column_count(), 1);
153
154 let value = result.column(0).unwrap().get_value(0).unwrap();
155 match value {
156 Value::Map(map) => {
157 assert_eq!(map.len(), 2);
158 assert_eq!(map.get(&PropertyKey::new("NYC")), Some(&Value::Int64(2)));
159 assert_eq!(map.get(&PropertyKey::new("LA")), Some(&Value::Int64(1)));
160 }
161 other => panic!("Expected Value::Map, got {:?}", other),
162 }
163
164 assert!(op.next().unwrap().is_none());
166 }
167
168 #[test]
169 fn test_empty_input_produces_empty_map() {
170 let mock = MockOperator::new(vec![]);
171
172 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
173
174 let result = op.next().unwrap();
175 assert!(result.is_some());
176 let result = result.unwrap();
177 assert_eq!(result.row_count(), 1);
178
179 let value = result.column(0).unwrap().get_value(0).unwrap();
180 match value {
181 Value::Map(map) => {
182 assert!(map.is_empty(), "Expected empty map, got {map:?}");
183 }
184 other => panic!("Expected Value::Map, got {:?}", other),
185 }
186
187 assert!(op.next().unwrap().is_none());
189 }
190
191 #[test]
192 fn test_reset_allows_reprocessing() {
193 let chunk = build_chunk(&["a"], &[10]);
194 let mock = MockOperator::new(vec![chunk]);
195
196 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
197
198 let result = op.next().unwrap();
200 assert!(result.is_some());
201 assert!(op.next().unwrap().is_none());
202
203 op.reset();
209 let result = op.next().unwrap();
210 assert!(
211 result.is_some(),
212 "After reset, next() should produce a result"
213 );
214
215 let result = result.unwrap();
217 assert_eq!(result.row_count(), 1);
218 let value = result.column(0).unwrap().get_value(0).unwrap();
219 assert!(
220 matches!(value, Value::Map(_)),
221 "Expected Value::Map after reset"
222 );
223 }
224
225 #[test]
226 fn test_name_returns_map_collect() {
227 let mock = MockOperator::new(vec![]);
228 let op = MapCollectOperator::new(Box::new(mock), 0, 1);
229 assert_eq!(op.name(), "MapCollect");
230 }
231
232 #[test]
233 fn test_multiple_chunks_merged_into_single_map() {
234 let chunk1 = build_chunk(&["x", "y"], &[1, 2]);
235 let chunk2 = build_chunk(&["z"], &[3]);
236 let mock = MockOperator::new(vec![chunk1, chunk2]);
237
238 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
239
240 let result = op.next().unwrap().unwrap();
241 assert_eq!(result.row_count(), 1);
242
243 let value = result.column(0).unwrap().get_value(0).unwrap();
244 match value {
245 Value::Map(map) => {
246 assert_eq!(map.len(), 3);
247 assert_eq!(map.get(&PropertyKey::new("x")), Some(&Value::Int64(1)));
248 assert_eq!(map.get(&PropertyKey::new("y")), Some(&Value::Int64(2)));
249 assert_eq!(map.get(&PropertyKey::new("z")), Some(&Value::Int64(3)));
250 }
251 other => panic!("Expected Value::Map, got {:?}", other),
252 }
253 }
254
255 #[test]
256 fn test_duplicate_keys_last_value_wins() {
257 let chunk = build_chunk(&["k", "k"], &[1, 2]);
260 let mock = MockOperator::new(vec![chunk]);
261
262 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
263
264 let result = op.next().unwrap().unwrap();
265 let value = result.column(0).unwrap().get_value(0).unwrap();
266 match value {
267 Value::Map(map) => {
268 assert_eq!(map.len(), 1);
269 assert_eq!(
270 map.get(&PropertyKey::new("k")),
271 Some(&Value::Int64(2)),
272 "Last value should win for duplicate keys"
273 );
274 }
275 other => panic!("Expected Value::Map, got {:?}", other),
276 }
277 }
278
279 #[test]
280 fn test_non_string_keys_converted_via_display() {
281 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64, LogicalType::String], 2);
284 chunk.column_mut(0).unwrap().push_value(Value::Int64(42));
285 chunk.column_mut(0).unwrap().push_value(Value::Int64(99));
286 chunk
287 .column_mut(1)
288 .unwrap()
289 .push_value(Value::String("val_a".into()));
290 chunk
291 .column_mut(1)
292 .unwrap()
293 .push_value(Value::String("val_b".into()));
294 chunk.set_count(2);
295
296 let mock = MockOperator::new(vec![chunk]);
297 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
298
299 let result = op.next().unwrap().unwrap();
300 let value = result.column(0).unwrap().get_value(0).unwrap();
301 match value {
302 Value::Map(map) => {
303 assert_eq!(map.len(), 2);
304 assert_eq!(
305 map.get(&PropertyKey::new("42")),
306 Some(&Value::String("val_a".into()))
307 );
308 assert_eq!(
309 map.get(&PropertyKey::new("99")),
310 Some(&Value::String("val_b".into()))
311 );
312 }
313 other => panic!("Expected Value::Map, got {:?}", other),
314 }
315 }
316
317 #[test]
318 fn test_map_collect_into_any() {
319 let mock = MockOperator::new(vec![]);
320 let op = MapCollectOperator::new(Box::new(mock), 0, 1);
321 let any = Box::new(op).into_any();
322 assert!(any.downcast::<MapCollectOperator>().is_ok());
323 }
324}