Skip to main content

grafeo_core/execution/operators/
map_collect.rs

1//! Map collect operator.
2//!
3//! Reads all rows from a child operator and collects key-value pairs
4//! into a single `Value::Map`. Used for Gremlin `groupCount()` semantics.
5
6use super::{Operator, OperatorResult};
7use crate::execution::DataChunk;
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::BTreeMap;
10use std::sync::Arc;
11
12/// Collects key-value pairs from child rows into a single Map value.
13pub struct MapCollectOperator {
14    child: Box<dyn Operator>,
15    key_col: usize,
16    value_col: usize,
17    done: bool,
18}
19
20impl MapCollectOperator {
21    /// Creates a new map collect operator.
22    #[must_use]
23    pub fn new(child: Box<dyn Operator>, key_col: usize, value_col: usize) -> Self {
24        Self {
25            child,
26            key_col,
27            value_col,
28            done: false,
29        }
30    }
31}
32
33impl Operator for MapCollectOperator {
34    fn next(&mut self) -> OperatorResult {
35        if self.done {
36            return Ok(None);
37        }
38        self.done = true;
39
40        let mut map = BTreeMap::new();
41        while let Some(chunk) = self.child.next()? {
42            for row in chunk.selected_indices() {
43                let key = chunk.column(self.key_col).and_then(|c| c.get_value(row));
44                let value = chunk.column(self.value_col).and_then(|c| c.get_value(row));
45                if let (Some(k), Some(v)) = (key, value) {
46                    let key_str: PropertyKey = match &k {
47                        Value::String(s) => PropertyKey::from(s.as_str()),
48                        other => PropertyKey::from(format!("{other}").as_str()),
49                    };
50                    map.insert(key_str, v);
51                }
52            }
53        }
54
55        let mut output = DataChunk::with_capacity(&[LogicalType::Any], 1);
56        output
57            .column_mut(0)
58            .expect("column 0 exists: single-column schema")
59            .push_value(Value::Map(Arc::new(map)));
60        output.set_count(1);
61
62        Ok(Some(output))
63    }
64
65    fn reset(&mut self) {
66        self.done = false;
67        self.child.reset();
68    }
69
70    fn name(&self) -> &'static str {
71        "MapCollect"
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use crate::execution::DataChunk;
79
80    /// A simple mock operator that yields pre-built chunks one at a time.
81    struct MockOperator {
82        chunks: Vec<DataChunk>,
83        position: usize,
84    }
85
86    impl MockOperator {
87        fn new(chunks: Vec<DataChunk>) -> Self {
88            Self {
89                chunks,
90                position: 0,
91            }
92        }
93    }
94
95    impl Operator for MockOperator {
96        fn next(&mut self) -> OperatorResult {
97            if self.position >= self.chunks.len() {
98                return Ok(None);
99            }
100            let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
101            self.position += 1;
102            Ok(Some(chunk))
103        }
104
105        fn reset(&mut self) {
106            self.position = 0;
107        }
108
109        fn name(&self) -> &'static str {
110            "Mock"
111        }
112    }
113
114    /// Helper: builds a two-column chunk (String key, Int64 value) from slices.
115    fn build_chunk(keys: &[&str], values: &[i64]) -> DataChunk {
116        assert_eq!(keys.len(), values.len());
117        let mut chunk =
118            DataChunk::with_capacity(&[LogicalType::String, LogicalType::Int64], keys.len());
119        for key in keys {
120            chunk
121                .column_mut(0)
122                .unwrap()
123                .push_value(Value::String((*key).into()));
124        }
125        for val in values {
126            chunk.column_mut(1).unwrap().push_value(Value::Int64(*val));
127        }
128        chunk.set_count(keys.len());
129        chunk
130    }
131
132    #[test]
133    fn test_basic_map_collection() {
134        let chunk = build_chunk(&["NYC", "LA"], &[2, 1]);
135        let mock = MockOperator::new(vec![chunk]);
136
137        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
138
139        // First call: single row with a Map value
140        let result = op.next().unwrap();
141        assert!(result.is_some());
142        let result = result.unwrap();
143        assert_eq!(result.row_count(), 1);
144        assert_eq!(result.column_count(), 1);
145
146        let value = result.column(0).unwrap().get_value(0).unwrap();
147        match value {
148            Value::Map(map) => {
149                assert_eq!(map.len(), 2);
150                assert_eq!(map.get(&PropertyKey::new("NYC")), Some(&Value::Int64(2)));
151                assert_eq!(map.get(&PropertyKey::new("LA")), Some(&Value::Int64(1)));
152            }
153            other => panic!("Expected Value::Map, got {:?}", other),
154        }
155
156        // Second call: exhausted
157        assert!(op.next().unwrap().is_none());
158    }
159
160    #[test]
161    fn test_empty_input_produces_empty_map() {
162        let mock = MockOperator::new(vec![]);
163
164        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
165
166        let result = op.next().unwrap();
167        assert!(result.is_some());
168        let result = result.unwrap();
169        assert_eq!(result.row_count(), 1);
170
171        let value = result.column(0).unwrap().get_value(0).unwrap();
172        match value {
173            Value::Map(map) => {
174                assert!(map.is_empty(), "Expected empty map, got {map:?}");
175            }
176            other => panic!("Expected Value::Map, got {:?}", other),
177        }
178
179        // Second call: exhausted
180        assert!(op.next().unwrap().is_none());
181    }
182
183    #[test]
184    fn test_reset_allows_reprocessing() {
185        let chunk = build_chunk(&["a"], &[10]);
186        let mock = MockOperator::new(vec![chunk]);
187
188        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
189
190        // Consume the first result
191        let result = op.next().unwrap();
192        assert!(result.is_some());
193        assert!(op.next().unwrap().is_none());
194
195        // After reset, the child mock is also reset but its chunks were
196        // consumed via mem::replace, so we get an empty map (the mock's
197        // chunks are replaced with DataChunk::empty()). The important
198        // thing is that reset() clears the `done` flag and produces a
199        // new result instead of returning None.
200        op.reset();
201        let result = op.next().unwrap();
202        assert!(
203            result.is_some(),
204            "After reset, next() should produce a result"
205        );
206
207        // The result is a single-row chunk with a Map
208        let result = result.unwrap();
209        assert_eq!(result.row_count(), 1);
210        let value = result.column(0).unwrap().get_value(0).unwrap();
211        assert!(
212            matches!(value, Value::Map(_)),
213            "Expected Value::Map after reset"
214        );
215    }
216
217    #[test]
218    fn test_name_returns_map_collect() {
219        let mock = MockOperator::new(vec![]);
220        let op = MapCollectOperator::new(Box::new(mock), 0, 1);
221        assert_eq!(op.name(), "MapCollect");
222    }
223
224    #[test]
225    fn test_multiple_chunks_merged_into_single_map() {
226        let chunk1 = build_chunk(&["x", "y"], &[1, 2]);
227        let chunk2 = build_chunk(&["z"], &[3]);
228        let mock = MockOperator::new(vec![chunk1, chunk2]);
229
230        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
231
232        let result = op.next().unwrap().unwrap();
233        assert_eq!(result.row_count(), 1);
234
235        let value = result.column(0).unwrap().get_value(0).unwrap();
236        match value {
237            Value::Map(map) => {
238                assert_eq!(map.len(), 3);
239                assert_eq!(map.get(&PropertyKey::new("x")), Some(&Value::Int64(1)));
240                assert_eq!(map.get(&PropertyKey::new("y")), Some(&Value::Int64(2)));
241                assert_eq!(map.get(&PropertyKey::new("z")), Some(&Value::Int64(3)));
242            }
243            other => panic!("Expected Value::Map, got {:?}", other),
244        }
245    }
246
247    #[test]
248    fn test_duplicate_keys_last_value_wins() {
249        // When the same key appears multiple times, the last value should win
250        // because BTreeMap::insert overwrites.
251        let chunk = build_chunk(&["k", "k"], &[1, 2]);
252        let mock = MockOperator::new(vec![chunk]);
253
254        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
255
256        let result = op.next().unwrap().unwrap();
257        let value = result.column(0).unwrap().get_value(0).unwrap();
258        match value {
259            Value::Map(map) => {
260                assert_eq!(map.len(), 1);
261                assert_eq!(
262                    map.get(&PropertyKey::new("k")),
263                    Some(&Value::Int64(2)),
264                    "Last value should win for duplicate keys"
265                );
266            }
267            other => panic!("Expected Value::Map, got {:?}", other),
268        }
269    }
270
271    #[test]
272    fn test_non_string_keys_converted_via_display() {
273        // When the key column contains non-string values (e.g. Int64),
274        // they should be converted to strings via Display formatting.
275        let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64, LogicalType::String], 2);
276        chunk.column_mut(0).unwrap().push_value(Value::Int64(42));
277        chunk.column_mut(0).unwrap().push_value(Value::Int64(99));
278        chunk
279            .column_mut(1)
280            .unwrap()
281            .push_value(Value::String("val_a".into()));
282        chunk
283            .column_mut(1)
284            .unwrap()
285            .push_value(Value::String("val_b".into()));
286        chunk.set_count(2);
287
288        let mock = MockOperator::new(vec![chunk]);
289        let mut op = MapCollectOperator::new(Box::new(mock), 0, 1);
290
291        let result = op.next().unwrap().unwrap();
292        let value = result.column(0).unwrap().get_value(0).unwrap();
293        match value {
294            Value::Map(map) => {
295                assert_eq!(map.len(), 2);
296                assert_eq!(
297                    map.get(&PropertyKey::new("42")),
298                    Some(&Value::String("val_a".into()))
299                );
300                assert_eq!(
301                    map.get(&PropertyKey::new("99")),
302                    Some(&Value::String("val_b".into()))
303                );
304            }
305            other => panic!("Expected Value::Map, got {:?}", other),
306        }
307    }
308}