Skip to main content

grafeo_core/execution/operators/
map_collect.rs

1//! Map collect operator.
2//!
3//! Reads all rows from a child operator and collects key-value pairs
4//! into a single `Value::Map`. Used for Gremlin `groupCount()` semantics.
5
6use super::{Operator, OperatorResult};
7use crate::execution::DataChunk;
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::BTreeMap;
10use std::sync::Arc;
11
12/// Collects key-value pairs from child rows into a single Map value.
13pub struct MapCollectOperator {
14    child: Box<dyn Operator>,
15    key_col: usize,
16    value_col: usize,
17    done: bool,
18}
19
20impl MapCollectOperator {
21    /// Creates a new map collect operator.
22    #[must_use]
23    pub fn new(child: Box<dyn Operator>, key_col: usize, value_col: usize) -> Self {
24        Self {
25            child,
26            key_col,
27            value_col,
28            done: false,
29        }
30    }
31}
32
33impl Operator for MapCollectOperator {
34    fn next(&mut self) -> OperatorResult {
35        if self.done {
36            return Ok(None);
37        }
38        self.done = true;
39
40        let mut map = BTreeMap::new();
41        while let Some(chunk) = self.child.next()? {
42            for row in chunk.selected_indices() {
43                let key = chunk.column(self.key_col).and_then(|c| c.get_value(row));
44                let value = chunk.column(self.value_col).and_then(|c| c.get_value(row));
45                if let (Some(k), Some(v)) = (key, value) {
46                    let key_str: PropertyKey = match &k {
47                        Value::String(s) => PropertyKey::from(s.as_str()),
48                        other => PropertyKey::from(format!("{other}").as_str()),
49                    };
50                    map.insert(key_str, v);
51                }
52            }
53        }
54
55        let mut output = DataChunk::with_capacity(&[LogicalType::Any], 1);
56        output
57            .column_mut(0)
58            .expect("column 0 exists: single-column schema")
59            .push_value(Value::Map(Arc::new(map)));
60        output.set_count(1);
61
62        Ok(Some(output))
63    }
64
65    fn reset(&mut self) {
66        self.done = false;
67        self.child.reset();
68    }
69
70    fn name(&self) -> &'static str {
71        "MapCollect"
72    }
73
74    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
75        self
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use crate::execution::DataChunk;
83
84    /// A simple mock operator that yields pre-built chunks one at a time.
85    struct MockOperator {
86        chunks: Vec<DataChunk>,
87        position: usize,
88    }
89
90    impl MockOperator {
91        fn new(chunks: Vec<DataChunk>) -> Self {
92            Self {
93                chunks,
94                position: 0,
95            }
96        }
97    }
98
99    impl Operator for MockOperator {
100        fn next(&mut self) -> OperatorResult {
101            if self.position >= self.chunks.len() {
102                return Ok(None);
103            }
104            let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
105            self.position += 1;
106            Ok(Some(chunk))
107        }
108
109        fn reset(&mut self) {
110            self.position = 0;
111        }
112
113        fn name(&self) -> &'static str {
114            "Mock"
115        }
116
117        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
118            self
119        }
120    }
121
122    /// Helper: builds a two-column chunk (String key, Int64 value) from slices.
123    fn build_chunk(keys: &[&str], values: &[i64]) -> DataChunk {
124        assert_eq!(keys.len(), values.len());
125        let mut chunk =
126            DataChunk::with_capacity(&[LogicalType::String, LogicalType::Int64], keys.len());
127        for key in keys {
128            chunk
129                .column_mut(0)
130                .unwrap()
131                .push_value(Value::String((*key).into()));
132        }
133        for val in values {
134            chunk.column_mut(1).unwrap().push_value(Value::Int64(*val));
135        }
136        chunk.set_count(keys.len());
137        chunk
138    }
139
140    #[test]
141    fn test_basic_map_collection() {
142        let chunk = build_chunk(&["NYC", "LA"], &[2, 1]);
143        let mock = MockOperator::new(vec![chunk]);
144
145        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
146
147        // First call: single row with a Map value
148        let result = op.next().unwrap();
149        assert!(result.is_some());
150        let result = result.unwrap();
151        assert_eq!(result.row_count(), 1);
152        assert_eq!(result.column_count(), 1);
153
154        let value = result.column(0).unwrap().get_value(0).unwrap();
155        match value {
156            Value::Map(map) => {
157                assert_eq!(map.len(), 2);
158                assert_eq!(map.get(&PropertyKey::new("NYC")), Some(&Value::Int64(2)));
159                assert_eq!(map.get(&PropertyKey::new("LA")), Some(&Value::Int64(1)));
160            }
161            other => panic!("Expected Value::Map, got {:?}", other),
162        }
163
164        // Second call: exhausted
165        assert!(op.next().unwrap().is_none());
166    }
167
168    #[test]
169    fn test_empty_input_produces_empty_map() {
170        let mock = MockOperator::new(vec![]);
171
172        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
173
174        let result = op.next().unwrap();
175        assert!(result.is_some());
176        let result = result.unwrap();
177        assert_eq!(result.row_count(), 1);
178
179        let value = result.column(0).unwrap().get_value(0).unwrap();
180        match value {
181            Value::Map(map) => {
182                assert!(map.is_empty(), "Expected empty map, got {map:?}");
183            }
184            other => panic!("Expected Value::Map, got {:?}", other),
185        }
186
187        // Second call: exhausted
188        assert!(op.next().unwrap().is_none());
189    }
190
191    #[test]
192    fn test_reset_allows_reprocessing() {
193        let chunk = build_chunk(&["a"], &[10]);
194        let mock = MockOperator::new(vec![chunk]);
195
196        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
197
198        // Consume the first result
199        let result = op.next().unwrap();
200        assert!(result.is_some());
201        assert!(op.next().unwrap().is_none());
202
203        // After reset, the child mock is also reset but its chunks were
204        // consumed via mem::replace, so we get an empty map (the mock's
205        // chunks are replaced with DataChunk::empty()). The important
206        // thing is that reset() clears the `done` flag and produces a
207        // new result instead of returning None.
208        op.reset();
209        let result = op.next().unwrap();
210        assert!(
211            result.is_some(),
212            "After reset, next() should produce a result"
213        );
214
215        // The result is a single-row chunk with a Map
216        let result = result.unwrap();
217        assert_eq!(result.row_count(), 1);
218        let value = result.column(0).unwrap().get_value(0).unwrap();
219        assert!(
220            matches!(value, Value::Map(_)),
221            "Expected Value::Map after reset"
222        );
223    }
224
225    #[test]
226    fn test_name_returns_map_collect() {
227        let mock = MockOperator::new(vec![]);
228        let op = MapCollectOperator::new(Box::new(mock), 0, 1);
229        assert_eq!(op.name(), "MapCollect");
230    }
231
232    #[test]
233    fn test_multiple_chunks_merged_into_single_map() {
234        let chunk1 = build_chunk(&["x", "y"], &[1, 2]);
235        let chunk2 = build_chunk(&["z"], &[3]);
236        let mock = MockOperator::new(vec![chunk1, chunk2]);
237
238        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
239
240        let result = op.next().unwrap().unwrap();
241        assert_eq!(result.row_count(), 1);
242
243        let value = result.column(0).unwrap().get_value(0).unwrap();
244        match value {
245            Value::Map(map) => {
246                assert_eq!(map.len(), 3);
247                assert_eq!(map.get(&PropertyKey::new("x")), Some(&Value::Int64(1)));
248                assert_eq!(map.get(&PropertyKey::new("y")), Some(&Value::Int64(2)));
249                assert_eq!(map.get(&PropertyKey::new("z")), Some(&Value::Int64(3)));
250            }
251            other => panic!("Expected Value::Map, got {:?}", other),
252        }
253    }
254
255    #[test]
256    fn test_duplicate_keys_last_value_wins() {
257        // When the same key appears multiple times, the last value should win
258        // because BTreeMap::insert overwrites.
259        let chunk = build_chunk(&["k", "k"], &[1, 2]);
260        let mock = MockOperator::new(vec![chunk]);
261
262        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
263
264        let result = op.next().unwrap().unwrap();
265        let value = result.column(0).unwrap().get_value(0).unwrap();
266        match value {
267            Value::Map(map) => {
268                assert_eq!(map.len(), 1);
269                assert_eq!(
270                    map.get(&PropertyKey::new("k")),
271                    Some(&Value::Int64(2)),
272                    "Last value should win for duplicate keys"
273                );
274            }
275            other => panic!("Expected Value::Map, got {:?}", other),
276        }
277    }
278
279    #[test]
280    fn test_non_string_keys_converted_via_display() {
281        // When the key column contains non-string values (e.g. Int64),
282        // they should be converted to strings via Display formatting.
283        let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64, LogicalType::String], 2);
284        chunk.column_mut(0).unwrap().push_value(Value::Int64(42));
285        chunk.column_mut(0).unwrap().push_value(Value::Int64(99));
286        chunk
287            .column_mut(1)
288            .unwrap()
289            .push_value(Value::String("val_a".into()));
290        chunk
291            .column_mut(1)
292            .unwrap()
293            .push_value(Value::String("val_b".into()));
294        chunk.set_count(2);
295
296        let mock = MockOperator::new(vec![chunk]);
297        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
298
299        let result = op.next().unwrap().unwrap();
300        let value = result.column(0).unwrap().get_value(0).unwrap();
301        match value {
302            Value::Map(map) => {
303                assert_eq!(map.len(), 2);
304                assert_eq!(
305                    map.get(&PropertyKey::new("42")),
306                    Some(&Value::String("val_a".into()))
307                );
308                assert_eq!(
309                    map.get(&PropertyKey::new("99")),
310                    Some(&Value::String("val_b".into()))
311                );
312            }
313            other => panic!("Expected Value::Map, got {:?}", other),
314        }
315    }
316
317    #[test]
318    fn test_map_collect_into_any() {
319        let mock = MockOperator::new(vec![]);
320        let op = MapCollectOperator::new(Box::new(mock), 0, 1);
321        let any = Box::new(op).into_any();
322        assert!(any.downcast::<MapCollectOperator>().is_ok());
323    }
324}