Skip to main content

reddb_server/storage/query/step/
flatmap.rs

1//! FlatMap Steps
2//!
3//! Steps that expand traversers 1:N (one input produces N outputs).
4//!
5//! # Steps
6//!
7//! - `out()`: Follow outgoing edges
8//! - `in()`: Follow incoming edges
9//! - `both()`: Follow both directions
10//! - `outE()`: Get outgoing edges
11//! - `inE()`: Get incoming edges
12//! - `bothE()`: Get all adjacent edges
13//! - `outV()`, `inV()`, `bothV()`, `otherV()`: Edge vertex accessors
14
15use super::{Step, StepResult, Traverser, TraverserRequirement, TraverserValue};
16use std::any::Any;
17
18/// Trait for flatmap steps (1:N expansion)
19pub trait FlatMapStep: Step {
20    /// Expand a single traverser to multiple
21    fn flat_map(&self, traverser: &Traverser) -> Vec<Traverser>;
22}
23
24/// Direction for edge traversal
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Direction {
27    /// Outgoing edges
28    Out,
29    /// Incoming edges
30    In,
31    /// Both directions
32    Both,
33}
34
35/// Generic vertex step - traverses edges to adjacent vertices
36#[derive(Debug, Clone)]
37pub struct VertexStep {
38    id: String,
39    labels: Vec<String>,
40    /// Direction to traverse
41    direction: Direction,
42    /// Edge labels to follow (empty = all)
43    edge_labels: Vec<String>,
44    /// Return edges instead of vertices
45    return_edges: bool,
46}
47
48impl VertexStep {
49    /// Create out() step
50    pub fn out(edge_labels: Vec<String>) -> Self {
51        Self {
52            id: format!("out_{}", edge_labels.join("_")),
53            labels: Vec::new(),
54            direction: Direction::Out,
55            edge_labels,
56            return_edges: false,
57        }
58    }
59
60    /// Create in() step
61    pub fn in_(edge_labels: Vec<String>) -> Self {
62        Self {
63            id: format!("in_{}", edge_labels.join("_")),
64            labels: Vec::new(),
65            direction: Direction::In,
66            edge_labels,
67            return_edges: false,
68        }
69    }
70
71    /// Create both() step
72    pub fn both(edge_labels: Vec<String>) -> Self {
73        Self {
74            id: format!("both_{}", edge_labels.join("_")),
75            labels: Vec::new(),
76            direction: Direction::Both,
77            edge_labels,
78            return_edges: false,
79        }
80    }
81
82    /// Create outE() step
83    pub fn out_e(edge_labels: Vec<String>) -> Self {
84        Self {
85            id: format!("outE_{}", edge_labels.join("_")),
86            labels: Vec::new(),
87            direction: Direction::Out,
88            edge_labels,
89            return_edges: true,
90        }
91    }
92
93    /// Create inE() step
94    pub fn in_e(edge_labels: Vec<String>) -> Self {
95        Self {
96            id: format!("inE_{}", edge_labels.join("_")),
97            labels: Vec::new(),
98            direction: Direction::In,
99            edge_labels,
100            return_edges: true,
101        }
102    }
103
104    /// Create bothE() step
105    pub fn both_e(edge_labels: Vec<String>) -> Self {
106        Self {
107            id: format!("bothE_{}", edge_labels.join("_")),
108            labels: Vec::new(),
109            direction: Direction::Both,
110            edge_labels,
111            return_edges: true,
112        }
113    }
114
115    /// Get direction
116    pub fn direction(&self) -> Direction {
117        self.direction
118    }
119
120    /// Get edge labels
121    pub fn edge_labels(&self) -> &[String] {
122        &self.edge_labels
123    }
124
125    /// Check if returning edges
126    pub fn returns_edges(&self) -> bool {
127        self.return_edges
128    }
129
130    /// Set step ID
131    pub fn with_id(mut self, id: String) -> Self {
132        self.id = id;
133        self
134    }
135}
136
137impl Step for VertexStep {
138    fn id(&self) -> &str {
139        &self.id
140    }
141
142    fn name(&self) -> &str {
143        match (self.direction, self.return_edges) {
144            (Direction::Out, false) => "OutStep",
145            (Direction::In, false) => "InStep",
146            (Direction::Both, false) => "BothStep",
147            (Direction::Out, true) => "OutEStep",
148            (Direction::In, true) => "InEStep",
149            (Direction::Both, true) => "BothEStep",
150        }
151    }
152
153    fn labels(&self) -> &[String] {
154        &self.labels
155    }
156
157    fn add_label(&mut self, label: String) {
158        if !self.labels.contains(&label) {
159            self.labels.push(label);
160        }
161    }
162
163    fn requirements(&self) -> &[TraverserRequirement] {
164        &[]
165    }
166
167    fn process_traverser(&self, traverser: Traverser) -> StepResult {
168        let new_traversers = self.flat_map(&traverser);
169        StepResult::emit_many(new_traversers)
170    }
171
172    fn reset(&mut self) {}
173
174    fn clone_step(&self) -> Box<dyn Step> {
175        Box::new(self.clone())
176    }
177
178    fn as_any(&self) -> &dyn Any {
179        self
180    }
181
182    fn as_any_mut(&mut self) -> &mut dyn Any {
183        self
184    }
185}
186
187impl FlatMapStep for VertexStep {
188    fn flat_map(&self, traverser: &Traverser) -> Vec<Traverser> {
189        // In real implementation, this would query the graph store
190        // For now, return empty - execution engine will populate
191        Vec::new()
192    }
193}
194
195/// Out step - convenience alias
196pub type OutStep = VertexStep;
197
198/// In step - convenience alias
199pub type InStep = VertexStep;
200
201/// Both step - convenience alias
202pub type BothStep = VertexStep;
203
204/// Edge step - get edges from current element
205#[derive(Debug, Clone)]
206pub struct EdgeStep {
207    id: String,
208    labels: Vec<String>,
209    /// Direction
210    direction: Direction,
211    /// Edge labels
212    edge_labels: Vec<String>,
213}
214
215impl EdgeStep {
216    /// Create outE() step
217    pub fn out(edge_labels: Vec<String>) -> Self {
218        Self {
219            id: format!("outE_{}", edge_labels.join("_")),
220            labels: Vec::new(),
221            direction: Direction::Out,
222            edge_labels,
223        }
224    }
225
226    /// Create inE() step
227    pub fn in_(edge_labels: Vec<String>) -> Self {
228        Self {
229            id: format!("inE_{}", edge_labels.join("_")),
230            labels: Vec::new(),
231            direction: Direction::In,
232            edge_labels,
233        }
234    }
235
236    /// Create bothE() step
237    pub fn both(edge_labels: Vec<String>) -> Self {
238        Self {
239            id: format!("bothE_{}", edge_labels.join("_")),
240            labels: Vec::new(),
241            direction: Direction::Both,
242            edge_labels,
243        }
244    }
245}
246
247impl Step for EdgeStep {
248    fn id(&self) -> &str {
249        &self.id
250    }
251
252    fn name(&self) -> &str {
253        match self.direction {
254            Direction::Out => "OutEStep",
255            Direction::In => "InEStep",
256            Direction::Both => "BothEStep",
257        }
258    }
259
260    fn labels(&self) -> &[String] {
261        &self.labels
262    }
263
264    fn add_label(&mut self, label: String) {
265        if !self.labels.contains(&label) {
266            self.labels.push(label);
267        }
268    }
269
270    fn requirements(&self) -> &[TraverserRequirement] {
271        &[]
272    }
273
274    fn process_traverser(&self, traverser: Traverser) -> StepResult {
275        let new_traversers = self.flat_map(&traverser);
276        StepResult::emit_many(new_traversers)
277    }
278
279    fn reset(&mut self) {}
280
281    fn clone_step(&self) -> Box<dyn Step> {
282        Box::new(self.clone())
283    }
284
285    fn as_any(&self) -> &dyn Any {
286        self
287    }
288
289    fn as_any_mut(&mut self) -> &mut dyn Any {
290        self
291    }
292}
293
294impl FlatMapStep for EdgeStep {
295    fn flat_map(&self, _traverser: &Traverser) -> Vec<Traverser> {
296        // Would query graph store for edges
297        Vec::new()
298    }
299}
300
301/// Edge vertex accessor - gets vertex from edge
302#[derive(Debug, Clone)]
303pub struct EdgeVertexStep {
304    id: String,
305    labels: Vec<String>,
306    /// Which vertex to get
307    direction: Direction,
308}
309
310impl EdgeVertexStep {
311    /// Create outV() step - get target vertex
312    pub fn out_v() -> Self {
313        Self {
314            id: "outV_0".to_string(),
315            labels: Vec::new(),
316            direction: Direction::Out,
317        }
318    }
319
320    /// Create inV() step - get source vertex
321    pub fn in_v() -> Self {
322        Self {
323            id: "inV_0".to_string(),
324            labels: Vec::new(),
325            direction: Direction::In,
326        }
327    }
328
329    /// Create bothV() step - get both vertices
330    pub fn both_v() -> Self {
331        Self {
332            id: "bothV_0".to_string(),
333            labels: Vec::new(),
334            direction: Direction::Both,
335        }
336    }
337
338    /// Create otherV() step - get opposite vertex from traversal
339    pub fn other_v() -> Self {
340        // otherV is context-dependent - needs path info
341        Self {
342            id: "otherV_0".to_string(),
343            labels: Vec::new(),
344            direction: Direction::Both, // Direction determined at runtime
345        }
346    }
347}
348
349impl Step for EdgeVertexStep {
350    fn id(&self) -> &str {
351        &self.id
352    }
353
354    fn name(&self) -> &str {
355        match self.direction {
356            Direction::Out => "OutVStep",
357            Direction::In => "InVStep",
358            Direction::Both => "BothVStep",
359        }
360    }
361
362    fn labels(&self) -> &[String] {
363        &self.labels
364    }
365
366    fn add_label(&mut self, label: String) {
367        if !self.labels.contains(&label) {
368            self.labels.push(label);
369        }
370    }
371
372    fn requirements(&self) -> &[TraverserRequirement] {
373        &[]
374    }
375
376    fn process_traverser(&self, traverser: Traverser) -> StepResult {
377        // Get edge and extract vertex
378        if let TraverserValue::Edge {
379            id: _,
380            source,
381            target,
382            label: _,
383        } = traverser.value()
384        {
385            let new_traversers = match self.direction {
386                Direction::Out => {
387                    vec![traverser.clone_with_value(TraverserValue::Vertex(target.clone()))]
388                }
389                Direction::In => {
390                    vec![traverser.clone_with_value(TraverserValue::Vertex(source.clone()))]
391                }
392                Direction::Both => vec![
393                    traverser.clone_with_value(TraverserValue::Vertex(source.clone())),
394                    traverser.clone_with_value(TraverserValue::Vertex(target.clone())),
395                ],
396            };
397            StepResult::emit_many(new_traversers)
398        } else {
399            StepResult::Filter
400        }
401    }
402
403    fn reset(&mut self) {}
404
405    fn clone_step(&self) -> Box<dyn Step> {
406        Box::new(self.clone())
407    }
408
409    fn as_any(&self) -> &dyn Any {
410        self
411    }
412
413    fn as_any_mut(&mut self) -> &mut dyn Any {
414        self
415    }
416}
417
418impl FlatMapStep for EdgeVertexStep {
419    fn flat_map(&self, traverser: &Traverser) -> Vec<Traverser> {
420        // Delegate to process_traverser
421        match self.process_traverser(traverser.clone()) {
422            StepResult::Emit(t) => t,
423            _ => Vec::new(),
424        }
425    }
426}
427
428/// Properties step - get all properties as map
429#[derive(Debug, Clone)]
430pub struct PropertiesStep {
431    id: String,
432    labels: Vec<String>,
433    /// Property keys to get (empty = all)
434    keys: Vec<String>,
435}
436
437impl PropertiesStep {
438    /// Create properties() step
439    pub fn new() -> Self {
440        Self {
441            id: "properties_0".to_string(),
442            labels: Vec::new(),
443            keys: Vec::new(),
444        }
445    }
446
447    /// Create properties(keys) step
448    pub fn with_keys(keys: Vec<String>) -> Self {
449        Self {
450            id: format!("properties_{}", keys.join("_")),
451            labels: Vec::new(),
452            keys,
453        }
454    }
455}
456
457impl Default for PropertiesStep {
458    fn default() -> Self {
459        Self::new()
460    }
461}
462
463impl Step for PropertiesStep {
464    fn id(&self) -> &str {
465        &self.id
466    }
467
468    fn name(&self) -> &str {
469        "PropertiesStep"
470    }
471
472    fn labels(&self) -> &[String] {
473        &self.labels
474    }
475
476    fn add_label(&mut self, label: String) {
477        if !self.labels.contains(&label) {
478            self.labels.push(label);
479        }
480    }
481
482    fn requirements(&self) -> &[TraverserRequirement] {
483        &[]
484    }
485
486    fn process_traverser(&self, traverser: Traverser) -> StepResult {
487        let new_traversers = self.flat_map(&traverser);
488        StepResult::emit_many(new_traversers)
489    }
490
491    fn reset(&mut self) {}
492
493    fn clone_step(&self) -> Box<dyn Step> {
494        Box::new(self.clone())
495    }
496
497    fn as_any(&self) -> &dyn Any {
498        self
499    }
500
501    fn as_any_mut(&mut self) -> &mut dyn Any {
502        self
503    }
504}
505
506impl FlatMapStep for PropertiesStep {
507    fn flat_map(&self, traverser: &Traverser) -> Vec<Traverser> {
508        // Would extract properties from vertex/edge
509        // For now, return single property traverser per key
510        if let TraverserValue::Map(map) = traverser.value() {
511            let mut result = Vec::new();
512            for (key, value) in map {
513                if self.keys.is_empty() || self.keys.contains(key) {
514                    result.push(
515                        traverser
516                            .clone_with_value(TraverserValue::Property(key.clone(), value.clone())),
517                    );
518                }
519            }
520            result
521        } else {
522            Vec::new()
523        }
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530
531    #[test]
532    fn test_out_step() {
533        let step = VertexStep::out(vec!["knows".to_string()]);
534        assert_eq!(step.direction(), Direction::Out);
535        assert_eq!(step.edge_labels(), &["knows"]);
536        assert!(!step.returns_edges());
537    }
538
539    #[test]
540    fn test_in_step() {
541        let step = VertexStep::in_(vec![]);
542        assert_eq!(step.direction(), Direction::In);
543        assert!(step.edge_labels().is_empty());
544    }
545
546    #[test]
547    fn test_both_step() {
548        let step = VertexStep::both(vec!["connects".to_string(), "knows".to_string()]);
549        assert_eq!(step.direction(), Direction::Both);
550        assert_eq!(step.edge_labels().len(), 2);
551    }
552
553    #[test]
554    fn test_out_e_step() {
555        let step = VertexStep::out_e(vec![]);
556        assert_eq!(step.direction(), Direction::Out);
557        assert!(step.returns_edges());
558    }
559
560    #[test]
561    fn test_edge_vertex_step() {
562        let edge = TraverserValue::Edge {
563            id: "e1".to_string(),
564            source: "v1".to_string(),
565            target: "v2".to_string(),
566            label: "knows".to_string(),
567        };
568        let traverser = Traverser::with_value(edge);
569
570        // outV should return target
571        let out_v = EdgeVertexStep::out_v();
572        let result = out_v.process_traverser(traverser.clone());
573        if let StepResult::Emit(t) = result {
574            assert_eq!(t.len(), 1);
575            assert!(matches!(t[0].value(), TraverserValue::Vertex(id) if id == "v2"));
576        }
577
578        // inV should return source
579        let in_v = EdgeVertexStep::in_v();
580        let result = in_v.process_traverser(traverser.clone());
581        if let StepResult::Emit(t) = result {
582            assert_eq!(t.len(), 1);
583            assert!(matches!(t[0].value(), TraverserValue::Vertex(id) if id == "v1"));
584        }
585
586        // bothV should return both
587        let both_v = EdgeVertexStep::both_v();
588        let result = both_v.process_traverser(traverser);
589        if let StepResult::Emit(t) = result {
590            assert_eq!(t.len(), 2);
591        }
592    }
593
594    #[test]
595    fn test_properties_step() {
596        let step = PropertiesStep::with_keys(vec!["name".to_string(), "age".to_string()]);
597        assert_eq!(step.name(), "PropertiesStep");
598    }
599}