grafeo_core/execution/operators/
map_collect.rs1use super::{Operator, OperatorResult};
7use crate::execution::DataChunk;
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::BTreeMap;
10use std::sync::Arc;
11
12pub struct MapCollectOperator {
14 child: Box<dyn Operator>,
15 key_col: usize,
16 value_col: usize,
17 done: bool,
18}
19
20impl MapCollectOperator {
21 #[must_use]
23 pub fn new(child: Box<dyn Operator>, key_col: usize, value_col: usize) -> Self {
24 Self {
25 child,
26 key_col,
27 value_col,
28 done: false,
29 }
30 }
31}
32
33impl Operator for MapCollectOperator {
34 fn next(&mut self) -> OperatorResult {
35 if self.done {
36 return Ok(None);
37 }
38 self.done = true;
39
40 let mut map = BTreeMap::new();
41 while let Some(chunk) = self.child.next()? {
42 for row in chunk.selected_indices() {
43 let key = chunk.column(self.key_col).and_then(|c| c.get_value(row));
44 let value = chunk.column(self.value_col).and_then(|c| c.get_value(row));
45 if let (Some(k), Some(v)) = (key, value) {
46 let key_str: PropertyKey = match &k {
47 Value::String(s) => PropertyKey::from(s.as_str()),
48 other => PropertyKey::from(format!("{other}").as_str()),
49 };
50 map.insert(key_str, v);
51 }
52 }
53 }
54
55 let mut output = DataChunk::with_capacity(&[LogicalType::Any], 1);
56 output
57 .column_mut(0)
58 .expect("column 0 exists: single-column schema")
59 .push_value(Value::Map(Arc::new(map)));
60 output.set_count(1);
61
62 Ok(Some(output))
63 }
64
65 fn reset(&mut self) {
66 self.done = false;
67 self.child.reset();
68 }
69
70 fn name(&self) -> &'static str {
71 "MapCollect"
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use crate::execution::DataChunk;
79
80 struct MockOperator {
82 chunks: Vec<DataChunk>,
83 position: usize,
84 }
85
86 impl MockOperator {
87 fn new(chunks: Vec<DataChunk>) -> Self {
88 Self {
89 chunks,
90 position: 0,
91 }
92 }
93 }
94
95 impl Operator for MockOperator {
96 fn next(&mut self) -> OperatorResult {
97 if self.position >= self.chunks.len() {
98 return Ok(None);
99 }
100 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
101 self.position += 1;
102 Ok(Some(chunk))
103 }
104
105 fn reset(&mut self) {
106 self.position = 0;
107 }
108
109 fn name(&self) -> &'static str {
110 "Mock"
111 }
112 }
113
114 fn build_chunk(keys: &[&str], values: &[i64]) -> DataChunk {
116 assert_eq!(keys.len(), values.len());
117 let mut chunk =
118 DataChunk::with_capacity(&[LogicalType::String, LogicalType::Int64], keys.len());
119 for key in keys {
120 chunk
121 .column_mut(0)
122 .unwrap()
123 .push_value(Value::String((*key).into()));
124 }
125 for val in values {
126 chunk.column_mut(1).unwrap().push_value(Value::Int64(*val));
127 }
128 chunk.set_count(keys.len());
129 chunk
130 }
131
132 #[test]
133 fn test_basic_map_collection() {
134 let chunk = build_chunk(&["NYC", "LA"], &[2, 1]);
135 let mock = MockOperator::new(vec![chunk]);
136
137 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
138
139 let result = op.next().unwrap();
141 assert!(result.is_some());
142 let result = result.unwrap();
143 assert_eq!(result.row_count(), 1);
144 assert_eq!(result.column_count(), 1);
145
146 let value = result.column(0).unwrap().get_value(0).unwrap();
147 match value {
148 Value::Map(map) => {
149 assert_eq!(map.len(), 2);
150 assert_eq!(map.get(&PropertyKey::new("NYC")), Some(&Value::Int64(2)));
151 assert_eq!(map.get(&PropertyKey::new("LA")), Some(&Value::Int64(1)));
152 }
153 other => panic!("Expected Value::Map, got {:?}", other),
154 }
155
156 assert!(op.next().unwrap().is_none());
158 }
159
160 #[test]
161 fn test_empty_input_produces_empty_map() {
162 let mock = MockOperator::new(vec![]);
163
164 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
165
166 let result = op.next().unwrap();
167 assert!(result.is_some());
168 let result = result.unwrap();
169 assert_eq!(result.row_count(), 1);
170
171 let value = result.column(0).unwrap().get_value(0).unwrap();
172 match value {
173 Value::Map(map) => {
174 assert!(map.is_empty(), "Expected empty map, got {map:?}");
175 }
176 other => panic!("Expected Value::Map, got {:?}", other),
177 }
178
179 assert!(op.next().unwrap().is_none());
181 }
182
183 #[test]
184 fn test_reset_allows_reprocessing() {
185 let chunk = build_chunk(&["a"], &[10]);
186 let mock = MockOperator::new(vec![chunk]);
187
188 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
189
190 let result = op.next().unwrap();
192 assert!(result.is_some());
193 assert!(op.next().unwrap().is_none());
194
195 op.reset();
201 let result = op.next().unwrap();
202 assert!(
203 result.is_some(),
204 "After reset, next() should produce a result"
205 );
206
207 let result = result.unwrap();
209 assert_eq!(result.row_count(), 1);
210 let value = result.column(0).unwrap().get_value(0).unwrap();
211 assert!(
212 matches!(value, Value::Map(_)),
213 "Expected Value::Map after reset"
214 );
215 }
216
217 #[test]
218 fn test_name_returns_map_collect() {
219 let mock = MockOperator::new(vec![]);
220 let op = MapCollectOperator::new(Box::new(mock), 0, 1);
221 assert_eq!(op.name(), "MapCollect");
222 }
223
224 #[test]
225 fn test_multiple_chunks_merged_into_single_map() {
226 let chunk1 = build_chunk(&["x", "y"], &[1, 2]);
227 let chunk2 = build_chunk(&["z"], &[3]);
228 let mock = MockOperator::new(vec![chunk1, chunk2]);
229
230 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
231
232 let result = op.next().unwrap().unwrap();
233 assert_eq!(result.row_count(), 1);
234
235 let value = result.column(0).unwrap().get_value(0).unwrap();
236 match value {
237 Value::Map(map) => {
238 assert_eq!(map.len(), 3);
239 assert_eq!(map.get(&PropertyKey::new("x")), Some(&Value::Int64(1)));
240 assert_eq!(map.get(&PropertyKey::new("y")), Some(&Value::Int64(2)));
241 assert_eq!(map.get(&PropertyKey::new("z")), Some(&Value::Int64(3)));
242 }
243 other => panic!("Expected Value::Map, got {:?}", other),
244 }
245 }
246
247 #[test]
248 fn test_duplicate_keys_last_value_wins() {
249 let chunk = build_chunk(&["k", "k"], &[1, 2]);
252 let mock = MockOperator::new(vec![chunk]);
253
254 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
255
256 let result = op.next().unwrap().unwrap();
257 let value = result.column(0).unwrap().get_value(0).unwrap();
258 match value {
259 Value::Map(map) => {
260 assert_eq!(map.len(), 1);
261 assert_eq!(
262 map.get(&PropertyKey::new("k")),
263 Some(&Value::Int64(2)),
264 "Last value should win for duplicate keys"
265 );
266 }
267 other => panic!("Expected Value::Map, got {:?}", other),
268 }
269 }
270
271 #[test]
272 fn test_non_string_keys_converted_via_display() {
273 let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64, LogicalType::String], 2);
276 chunk.column_mut(0).unwrap().push_value(Value::Int64(42));
277 chunk.column_mut(0).unwrap().push_value(Value::Int64(99));
278 chunk
279 .column_mut(1)
280 .unwrap()
281 .push_value(Value::String("val_a".into()));
282 chunk
283 .column_mut(1)
284 .unwrap()
285 .push_value(Value::String("val_b".into()));
286 chunk.set_count(2);
287
288 let mock = MockOperator::new(vec![chunk]);
289 let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
290
291 let result = op.next().unwrap().unwrap();
292 let value = result.column(0).unwrap().get_value(0).unwrap();
293 match value {
294 Value::Map(map) => {
295 assert_eq!(map.len(), 2);
296 assert_eq!(
297 map.get(&PropertyKey::new("42")),
298 Some(&Value::String("val_a".into()))
299 );
300 assert_eq!(
301 map.get(&PropertyKey::new("99")),
302 Some(&Value::String("val_b".into()))
303 );
304 }
305 other => panic!("Expected Value::Map, got {:?}", other),
306 }
307 }
308}