Skip to main content

reddb_server/storage/query/step/
map.rs

1//! Map Steps
2//!
3//! Steps that transform traversers 1:1 (one input produces one output).
4//!
5//! # Steps
6//!
7//! - `map()`: Apply transformation function
8//! - `select()`: Select labeled values
9//! - `project()`: Project specific properties
10//! - `path()`: Get traversal path
11//! - `valueMap()`: Get property map
12//! - `id()`: Get element ID
13
14use super::{Path, Step, StepResult, Traverser, TraverserRequirement, TraverserValue};
15use crate::json;
16use crate::serde_json::Value;
17use std::any::Any;
18use std::collections::HashMap;
19
20/// Trait for map steps (1:1 transformation)
21pub trait MapStep: Step {
22    /// Map a traverser to a new value
23    fn map(&self, traverser: &Traverser) -> TraverserValue;
24}
25
26/// Select step - selects labeled values from path
27#[derive(Debug, Clone)]
28pub struct SelectStep {
29    id: String,
30    labels: Vec<String>,
31    /// Labels to select
32    select_labels: Vec<String>,
33    /// Pop behavior (first, last, all, mixed)
34    pop: Pop,
35}
36
37/// Pop behavior for select
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum Pop {
40    /// First occurrence
41    First,
42    /// Last occurrence
43    Last,
44    /// All occurrences
45    All,
46    /// Mixed (per-label)
47    Mixed,
48}
49
50impl SelectStep {
51    /// Create select() step with labels
52    pub fn new(select_labels: Vec<String>) -> Self {
53        Self {
54            id: format!("select_{}", select_labels.join("_")),
55            labels: Vec::new(),
56            select_labels,
57            pop: Pop::Last,
58        }
59    }
60
61    /// Create select with pop behavior
62    pub fn with_pop(select_labels: Vec<String>, pop: Pop) -> Self {
63        Self {
64            id: format!("select_{:?}_{}", pop, select_labels.join("_")),
65            labels: Vec::new(),
66            select_labels,
67            pop,
68        }
69    }
70
71    /// Get selected labels
72    pub fn select_labels(&self) -> &[String] {
73        &self.select_labels
74    }
75}
76
77impl Step for SelectStep {
78    fn id(&self) -> &str {
79        &self.id
80    }
81
82    fn name(&self) -> &str {
83        "SelectStep"
84    }
85
86    fn labels(&self) -> &[String] {
87        &self.labels
88    }
89
90    fn add_label(&mut self, label: String) {
91        if !self.labels.contains(&label) {
92            self.labels.push(label);
93        }
94    }
95
96    fn requirements(&self) -> &[TraverserRequirement] {
97        static REQS: &[TraverserRequirement] =
98            &[TraverserRequirement::Path, TraverserRequirement::Labels];
99        REQS
100    }
101
102    fn process_traverser(&self, traverser: Traverser) -> StepResult {
103        let new_value = self.map(&traverser);
104        if new_value.is_null() {
105            StepResult::Filter
106        } else {
107            StepResult::emit_one(traverser.clone_with_value(new_value))
108        }
109    }
110
111    fn reset(&mut self) {}
112
113    fn clone_step(&self) -> Box<dyn Step> {
114        Box::new(self.clone())
115    }
116
117    fn as_any(&self) -> &dyn Any {
118        self
119    }
120
121    fn as_any_mut(&mut self) -> &mut dyn Any {
122        self
123    }
124}
125
126impl MapStep for SelectStep {
127    fn map(&self, traverser: &Traverser) -> TraverserValue {
128        if let Some(path) = traverser.path() {
129            if self.select_labels.len() == 1 {
130                // Single label - return value directly
131                let label = &self.select_labels[0];
132                match self.pop {
133                    Pop::Last => path.get(label).cloned().unwrap_or(TraverserValue::Null),
134                    Pop::First => {
135                        let all = path.get_all(label);
136                        all.first()
137                            .cloned()
138                            .cloned()
139                            .unwrap_or(TraverserValue::Null)
140                    }
141                    Pop::All => {
142                        let all: Vec<Value> = path
143                            .get_all(label)
144                            .into_iter()
145                            .map(|v| v.to_json())
146                            .collect();
147                        TraverserValue::List(all)
148                    }
149                    Pop::Mixed => path.get(label).cloned().unwrap_or(TraverserValue::Null),
150                }
151            } else {
152                // Multiple labels - return map
153                let mut map = HashMap::new();
154                for label in &self.select_labels {
155                    if let Some(value) = path.get(label) {
156                        map.insert(label.clone(), value.to_json());
157                    }
158                }
159                TraverserValue::Map(map)
160            }
161        } else {
162            TraverserValue::Null
163        }
164    }
165}
166
167/// Project step - projects specific properties
168#[derive(Debug, Clone)]
169pub struct ProjectStep {
170    id: String,
171    labels: Vec<String>,
172    /// Keys to project
173    keys: Vec<String>,
174    // In real impl, would have by-modulator traversals
175}
176
177impl ProjectStep {
178    /// Create project() step
179    pub fn new(keys: Vec<String>) -> Self {
180        Self {
181            id: format!("project_{}", keys.join("_")),
182            labels: Vec::new(),
183            keys,
184        }
185    }
186
187    /// Get projection keys
188    pub fn keys(&self) -> &[String] {
189        &self.keys
190    }
191}
192
193impl Step for ProjectStep {
194    fn id(&self) -> &str {
195        &self.id
196    }
197
198    fn name(&self) -> &str {
199        "ProjectStep"
200    }
201
202    fn labels(&self) -> &[String] {
203        &self.labels
204    }
205
206    fn add_label(&mut self, label: String) {
207        if !self.labels.contains(&label) {
208            self.labels.push(label);
209        }
210    }
211
212    fn requirements(&self) -> &[TraverserRequirement] {
213        &[]
214    }
215
216    fn process_traverser(&self, traverser: Traverser) -> StepResult {
217        let new_value = self.map(&traverser);
218        StepResult::emit_one(traverser.clone_with_value(new_value))
219    }
220
221    fn reset(&mut self) {}
222
223    fn clone_step(&self) -> Box<dyn Step> {
224        Box::new(self.clone())
225    }
226
227    fn as_any(&self) -> &dyn Any {
228        self
229    }
230
231    fn as_any_mut(&mut self) -> &mut dyn Any {
232        self
233    }
234}
235
236impl MapStep for ProjectStep {
237    fn map(&self, traverser: &Traverser) -> TraverserValue {
238        let mut result = HashMap::new();
239
240        // For each key, apply the corresponding by-modulator
241        // In real impl, each key would have a child traversal
242        if let TraverserValue::Map(current) = traverser.value() {
243            for key in &self.keys {
244                if let Some(value) = current.get(key) {
245                    result.insert(key.clone(), value.clone());
246                } else {
247                    result.insert(key.clone(), Value::Null);
248                }
249            }
250        }
251
252        TraverserValue::Map(result)
253    }
254}
255
256/// Path step - returns the traversal path
257#[derive(Debug, Clone)]
258pub struct PathStep {
259    id: String,
260    labels: Vec<String>,
261}
262
263impl PathStep {
264    /// Create path() step
265    pub fn new() -> Self {
266        Self {
267            id: "path_0".to_string(),
268            labels: Vec::new(),
269        }
270    }
271}
272
273impl Default for PathStep {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279impl Step for PathStep {
280    fn id(&self) -> &str {
281        &self.id
282    }
283
284    fn name(&self) -> &str {
285        "PathStep"
286    }
287
288    fn labels(&self) -> &[String] {
289        &self.labels
290    }
291
292    fn add_label(&mut self, label: String) {
293        if !self.labels.contains(&label) {
294            self.labels.push(label);
295        }
296    }
297
298    fn requirements(&self) -> &[TraverserRequirement] {
299        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Path];
300        REQS
301    }
302
303    fn process_traverser(&self, traverser: Traverser) -> StepResult {
304        let new_value = self.map(&traverser);
305        StepResult::emit_one(traverser.clone_with_value(new_value))
306    }
307
308    fn reset(&mut self) {}
309
310    fn clone_step(&self) -> Box<dyn Step> {
311        Box::new(self.clone())
312    }
313
314    fn as_any(&self) -> &dyn Any {
315        self
316    }
317
318    fn as_any_mut(&mut self) -> &mut dyn Any {
319        self
320    }
321}
322
323impl MapStep for PathStep {
324    fn map(&self, traverser: &Traverser) -> TraverserValue {
325        if let Some(path) = traverser.path() {
326            TraverserValue::Path(path.clone())
327        } else {
328            TraverserValue::Path(Path::new())
329        }
330    }
331}
332
333/// ValueMap step - returns property map
334#[derive(Debug, Clone)]
335pub struct ValueMapStep {
336    id: String,
337    labels: Vec<String>,
338    /// Property keys to include (empty = all)
339    keys: Vec<String>,
340    /// Include element type and id
341    with_tokens: bool,
342}
343
344impl ValueMapStep {
345    /// Create valueMap() step
346    pub fn new() -> Self {
347        Self {
348            id: "valueMap_0".to_string(),
349            labels: Vec::new(),
350            keys: Vec::new(),
351            with_tokens: false,
352        }
353    }
354
355    /// Create valueMap(keys) step
356    pub fn with_keys(keys: Vec<String>) -> Self {
357        Self {
358            id: format!("valueMap_{}", keys.join("_")),
359            labels: Vec::new(),
360            keys,
361            with_tokens: false,
362        }
363    }
364
365    /// Include T.id and T.label
366    pub fn with_tokens(mut self) -> Self {
367        self.with_tokens = true;
368        self
369    }
370}
371
372impl Default for ValueMapStep {
373    fn default() -> Self {
374        Self::new()
375    }
376}
377
378impl Step for ValueMapStep {
379    fn id(&self) -> &str {
380        &self.id
381    }
382
383    fn name(&self) -> &str {
384        "ValueMapStep"
385    }
386
387    fn labels(&self) -> &[String] {
388        &self.labels
389    }
390
391    fn add_label(&mut self, label: String) {
392        if !self.labels.contains(&label) {
393            self.labels.push(label);
394        }
395    }
396
397    fn requirements(&self) -> &[TraverserRequirement] {
398        &[]
399    }
400
401    fn process_traverser(&self, traverser: Traverser) -> StepResult {
402        let new_value = self.map(&traverser);
403        StepResult::emit_one(traverser.clone_with_value(new_value))
404    }
405
406    fn reset(&mut self) {}
407
408    fn clone_step(&self) -> Box<dyn Step> {
409        Box::new(self.clone())
410    }
411
412    fn as_any(&self) -> &dyn Any {
413        self
414    }
415
416    fn as_any_mut(&mut self) -> &mut dyn Any {
417        self
418    }
419}
420
421impl MapStep for ValueMapStep {
422    fn map(&self, traverser: &Traverser) -> TraverserValue {
423        let mut result = HashMap::new();
424
425        match traverser.value() {
426            TraverserValue::Vertex(id) if self.with_tokens => {
427                result.insert("@id".to_string(), json!(id));
428                result.insert("@type".to_string(), json!("vertex"));
429            }
430            TraverserValue::Edge {
431                id,
432                source,
433                target,
434                label,
435            } if self.with_tokens => {
436                result.insert("@id".to_string(), json!(id));
437                result.insert("@type".to_string(), json!("edge"));
438                result.insert("@label".to_string(), json!(label));
439                result.insert("@source".to_string(), json!(source));
440                result.insert("@target".to_string(), json!(target));
441            }
442            TraverserValue::Map(map) => {
443                for (key, value) in map {
444                    if self.keys.is_empty() || self.keys.contains(key) {
445                        result.insert(key.clone(), value.clone());
446                    }
447                }
448            }
449            _ => {}
450        }
451
452        TraverserValue::Map(result)
453    }
454}
455
456/// Id step - returns element ID
457#[derive(Debug, Clone)]
458pub struct IdStep {
459    id: String,
460    labels: Vec<String>,
461}
462
463impl IdStep {
464    /// Create id() step
465    pub fn new() -> Self {
466        Self {
467            id: "id_0".to_string(),
468            labels: Vec::new(),
469        }
470    }
471}
472
473impl Default for IdStep {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479impl Step for IdStep {
480    fn id(&self) -> &str {
481        &self.id
482    }
483
484    fn name(&self) -> &str {
485        "IdStep"
486    }
487
488    fn labels(&self) -> &[String] {
489        &self.labels
490    }
491
492    fn add_label(&mut self, label: String) {
493        if !self.labels.contains(&label) {
494            self.labels.push(label);
495        }
496    }
497
498    fn requirements(&self) -> &[TraverserRequirement] {
499        &[]
500    }
501
502    fn process_traverser(&self, traverser: Traverser) -> StepResult {
503        let new_value = self.map(&traverser);
504        StepResult::emit_one(traverser.clone_with_value(new_value))
505    }
506
507    fn reset(&mut self) {}
508
509    fn clone_step(&self) -> Box<dyn Step> {
510        Box::new(self.clone())
511    }
512
513    fn as_any(&self) -> &dyn Any {
514        self
515    }
516
517    fn as_any_mut(&mut self) -> &mut dyn Any {
518        self
519    }
520}
521
522impl MapStep for IdStep {
523    fn map(&self, traverser: &Traverser) -> TraverserValue {
524        match traverser.value() {
525            TraverserValue::Vertex(id) => TraverserValue::String(id.clone()),
526            TraverserValue::Edge { id, .. } => TraverserValue::String(id.clone()),
527            _ => TraverserValue::Null,
528        }
529    }
530}
531
532/// Label step - returns element label
533#[derive(Debug, Clone)]
534pub struct LabelStep {
535    id: String,
536    labels: Vec<String>,
537}
538
539impl LabelStep {
540    /// Create label() step
541    pub fn new() -> Self {
542        Self {
543            id: "label_0".to_string(),
544            labels: Vec::new(),
545        }
546    }
547}
548
549impl Default for LabelStep {
550    fn default() -> Self {
551        Self::new()
552    }
553}
554
555impl Step for LabelStep {
556    fn id(&self) -> &str {
557        &self.id
558    }
559
560    fn name(&self) -> &str {
561        "LabelStep"
562    }
563
564    fn labels(&self) -> &[String] {
565        &self.labels
566    }
567
568    fn add_label(&mut self, label: String) {
569        if !self.labels.contains(&label) {
570            self.labels.push(label);
571        }
572    }
573
574    fn requirements(&self) -> &[TraverserRequirement] {
575        &[]
576    }
577
578    fn process_traverser(&self, traverser: Traverser) -> StepResult {
579        let new_value = self.map(&traverser);
580        StepResult::emit_one(traverser.clone_with_value(new_value))
581    }
582
583    fn reset(&mut self) {}
584
585    fn clone_step(&self) -> Box<dyn Step> {
586        Box::new(self.clone())
587    }
588
589    fn as_any(&self) -> &dyn Any {
590        self
591    }
592
593    fn as_any_mut(&mut self) -> &mut dyn Any {
594        self
595    }
596}
597
598impl MapStep for LabelStep {
599    fn map(&self, traverser: &Traverser) -> TraverserValue {
600        match traverser.value() {
601            TraverserValue::Edge { label, .. } => TraverserValue::String(label.clone()),
602            // Vertices would need type lookup
603            _ => TraverserValue::Null,
604        }
605    }
606}
607
608/// Count step - counts traversers (reduce)
609#[derive(Debug, Clone)]
610pub struct CountStep {
611    id: String,
612    labels: Vec<String>,
613}
614
615impl CountStep {
616    /// Create count() step
617    pub fn new() -> Self {
618        Self {
619            id: "count_0".to_string(),
620            labels: Vec::new(),
621        }
622    }
623}
624
625impl Default for CountStep {
626    fn default() -> Self {
627        Self::new()
628    }
629}
630
631impl Step for CountStep {
632    fn id(&self) -> &str {
633        &self.id
634    }
635
636    fn name(&self) -> &str {
637        "CountStep"
638    }
639
640    fn labels(&self) -> &[String] {
641        &self.labels
642    }
643
644    fn add_label(&mut self, label: String) {
645        if !self.labels.contains(&label) {
646            self.labels.push(label);
647        }
648    }
649
650    fn requirements(&self) -> &[TraverserRequirement] {
651        static REQS: &[TraverserRequirement] =
652            &[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
653        REQS
654    }
655
656    fn process_traverser(&self, traverser: Traverser) -> StepResult {
657        // Count is a barrier - would accumulate and then emit single count
658        // For single traverser, count its bulk
659        let count = traverser.bulk();
660        StepResult::emit_one(traverser.clone_with_value(TraverserValue::Integer(count as i64)))
661    }
662
663    fn reset(&mut self) {}
664
665    fn clone_step(&self) -> Box<dyn Step> {
666        Box::new(self.clone())
667    }
668
669    fn as_any(&self) -> &dyn Any {
670        self
671    }
672
673    fn as_any_mut(&mut self) -> &mut dyn Any {
674        self
675    }
676}
677
678impl MapStep for CountStep {
679    fn map(&self, traverser: &Traverser) -> TraverserValue {
680        TraverserValue::Integer(traverser.bulk() as i64)
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    #[test]
689    fn test_select_single() {
690        let step = SelectStep::new(vec!["a".to_string()]);
691        assert_eq!(step.select_labels(), &["a"]);
692        assert_eq!(step.name(), "SelectStep");
693    }
694
695    #[test]
696    fn test_select_multiple() {
697        let step = SelectStep::new(vec!["a".to_string(), "b".to_string()]);
698        assert_eq!(step.select_labels().len(), 2);
699    }
700
701    #[test]
702    fn test_project_step() {
703        let step = ProjectStep::new(vec!["name".to_string(), "age".to_string()]);
704        assert_eq!(step.keys().len(), 2);
705    }
706
707    #[test]
708    fn test_path_step() {
709        let step = PathStep::new();
710        assert_eq!(step.name(), "PathStep");
711
712        let mut traverser = Traverser::new("v1");
713        traverser.enable_path();
714
715        let result = step.map(&traverser);
716        assert!(matches!(result, TraverserValue::Path(_)));
717    }
718
719    #[test]
720    fn test_value_map_step() {
721        let step = ValueMapStep::new().with_tokens();
722        assert_eq!(step.name(), "ValueMapStep");
723
724        let traverser = Traverser::new("v1");
725        let result = step.map(&traverser);
726
727        if let TraverserValue::Map(map) = result {
728            assert!(map.contains_key("@id"));
729            assert!(map.contains_key("@type"));
730        }
731    }
732
733    #[test]
734    fn test_id_step() {
735        let step = IdStep::new();
736
737        let traverser = Traverser::new("vertex123");
738        let result = step.map(&traverser);
739        assert!(matches!(result, TraverserValue::String(id) if id == "vertex123"));
740    }
741
742    #[test]
743    fn test_label_step() {
744        let step = LabelStep::new();
745
746        let edge = TraverserValue::Edge {
747            id: "e1".to_string(),
748            source: "v1".to_string(),
749            target: "v2".to_string(),
750            label: "knows".to_string(),
751        };
752        let traverser = Traverser::with_value(edge);
753        let result = step.map(&traverser);
754        assert!(matches!(result, TraverserValue::String(l) if l == "knows"));
755    }
756
757    #[test]
758    fn test_count_step() {
759        let step = CountStep::new();
760
761        let mut traverser = Traverser::new("v1");
762        traverser.set_bulk(5);
763
764        let result = step.map(&traverser);
765        assert!(matches!(result, TraverserValue::Integer(5)));
766    }
767}