Skip to main content

reddb_server/storage/query/step/
branch.rs

1//! Branch Steps
2//!
3//! Steps that branch traversal flow based on conditions.
4//!
5//! # Steps
6//!
7//! - `choose()`: If-then-else branching
8//! - `union()`: Execute multiple traversals in parallel
9//! - `coalesce()`: First non-empty result wins
10//! - `optional()`: Execute traversal or pass through
11//! - `repeat()`: Loop execution
12
13use super::{
14    BasicTraversal, Step, StepResult, Traversal, Traverser, TraverserRequirement, TraverserValue,
15};
16use std::any::Any;
17
18/// Trait for branch steps
19pub trait BranchStep: Step {
20    /// Get branch options
21    fn branches(&self) -> Vec<&dyn Traversal>;
22}
23
24/// Choose step - conditional branching
25#[derive(Debug, Clone)]
26pub struct ChooseStep {
27    id: String,
28    labels: Vec<String>,
29    /// Condition traversal (produces value for picking)
30    condition: Option<BasicTraversal>,
31    /// Options: value -> traversal
32    options: Vec<(TraverserValue, BasicTraversal)>,
33    /// Default option (none)
34    default_option: Option<BasicTraversal>,
35}
36
37impl ChooseStep {
38    /// Create choose() step
39    pub fn new() -> Self {
40        Self {
41            id: "choose_0".to_string(),
42            labels: Vec::new(),
43            condition: None,
44            options: Vec::new(),
45            default_option: None,
46        }
47    }
48
49    /// Set condition traversal
50    pub fn with_condition(mut self, condition: BasicTraversal) -> Self {
51        self.condition = Some(condition);
52        self
53    }
54
55    /// Add option
56    pub fn option(mut self, value: TraverserValue, traversal: BasicTraversal) -> Self {
57        self.options.push((value, traversal));
58        self
59    }
60
61    /// Set default option
62    pub fn default(mut self, traversal: BasicTraversal) -> Self {
63        self.default_option = Some(traversal);
64        self
65    }
66}
67
68impl Default for ChooseStep {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl Step for ChooseStep {
75    fn id(&self) -> &str {
76        &self.id
77    }
78
79    fn name(&self) -> &str {
80        "ChooseStep"
81    }
82
83    fn labels(&self) -> &[String] {
84        &self.labels
85    }
86
87    fn add_label(&mut self, label: String) {
88        if !self.labels.contains(&label) {
89            self.labels.push(label);
90        }
91    }
92
93    fn requirements(&self) -> &[TraverserRequirement] {
94        &[]
95    }
96
97    fn process_traverser(&self, traverser: Traverser) -> StepResult {
98        // In real impl, would:
99        // 1. Run condition traversal
100        // 2. Match result against options
101        // 3. Execute matching branch
102        // For now, pass through to default or filter
103        if self.default_option.is_some() {
104            StepResult::emit_one(traverser)
105        } else {
106            StepResult::Filter
107        }
108    }
109
110    fn reset(&mut self) {}
111
112    fn clone_step(&self) -> Box<dyn Step> {
113        Box::new(self.clone())
114    }
115
116    fn as_any(&self) -> &dyn Any {
117        self
118    }
119
120    fn as_any_mut(&mut self) -> &mut dyn Any {
121        self
122    }
123}
124
125/// Union step - parallel branch execution
126#[derive(Debug, Clone)]
127pub struct UnionStep {
128    id: String,
129    labels: Vec<String>,
130    /// Branch traversals
131    branches: Vec<BasicTraversal>,
132    /// Is this a start step
133    is_start: bool,
134}
135
136impl UnionStep {
137    /// Create union() step
138    pub fn new(branches: Vec<BasicTraversal>) -> Self {
139        Self {
140            id: format!("union_{}", branches.len()),
141            labels: Vec::new(),
142            branches,
143            is_start: false,
144        }
145    }
146
147    /// Mark as start step
148    pub fn as_start(mut self) -> Self {
149        self.is_start = true;
150        self
151    }
152
153    /// Get branches
154    pub fn get_branches(&self) -> &[BasicTraversal] {
155        &self.branches
156    }
157}
158
159impl Step for UnionStep {
160    fn id(&self) -> &str {
161        &self.id
162    }
163
164    fn name(&self) -> &str {
165        "UnionStep"
166    }
167
168    fn labels(&self) -> &[String] {
169        &self.labels
170    }
171
172    fn add_label(&mut self, label: String) {
173        if !self.labels.contains(&label) {
174            self.labels.push(label);
175        }
176    }
177
178    fn requirements(&self) -> &[TraverserRequirement] {
179        &[]
180    }
181
182    fn process_traverser(&self, traverser: Traverser) -> StepResult {
183        // In real impl, would execute all branches and combine results
184        // For now, just duplicate traverser for each branch
185        let results: Vec<Traverser> = self.branches.iter().map(|_| traverser.split()).collect();
186        StepResult::emit_many(results)
187    }
188
189    fn reset(&mut self) {
190        for branch in &mut self.branches {
191            branch.reset();
192        }
193    }
194
195    fn clone_step(&self) -> Box<dyn Step> {
196        Box::new(self.clone())
197    }
198
199    fn as_any(&self) -> &dyn Any {
200        self
201    }
202
203    fn as_any_mut(&mut self) -> &mut dyn Any {
204        self
205    }
206}
207
208/// Coalesce step - first non-empty result wins
209#[derive(Debug, Clone)]
210pub struct CoalesceStep {
211    id: String,
212    labels: Vec<String>,
213    /// Branch traversals (tried in order)
214    branches: Vec<BasicTraversal>,
215}
216
217impl CoalesceStep {
218    /// Create coalesce() step
219    pub fn new(branches: Vec<BasicTraversal>) -> Self {
220        Self {
221            id: format!("coalesce_{}", branches.len()),
222            labels: Vec::new(),
223            branches,
224        }
225    }
226}
227
228impl Step for CoalesceStep {
229    fn id(&self) -> &str {
230        &self.id
231    }
232
233    fn name(&self) -> &str {
234        "CoalesceStep"
235    }
236
237    fn labels(&self) -> &[String] {
238        &self.labels
239    }
240
241    fn add_label(&mut self, label: String) {
242        if !self.labels.contains(&label) {
243            self.labels.push(label);
244        }
245    }
246
247    fn requirements(&self) -> &[TraverserRequirement] {
248        &[]
249    }
250
251    fn process_traverser(&self, traverser: Traverser) -> StepResult {
252        // In real impl, would try each branch until one produces output
253        // For now, just pass through if any branches exist
254        if !self.branches.is_empty() {
255            StepResult::emit_one(traverser)
256        } else {
257            StepResult::Filter
258        }
259    }
260
261    fn reset(&mut self) {
262        for branch in &mut self.branches {
263            branch.reset();
264        }
265    }
266
267    fn clone_step(&self) -> Box<dyn Step> {
268        Box::new(self.clone())
269    }
270
271    fn as_any(&self) -> &dyn Any {
272        self
273    }
274
275    fn as_any_mut(&mut self) -> &mut dyn Any {
276        self
277    }
278}
279
280/// Optional step - execute traversal or pass through
281#[derive(Debug, Clone)]
282pub struct OptionalStep {
283    id: String,
284    labels: Vec<String>,
285    /// Child traversal
286    traversal: BasicTraversal,
287}
288
289impl OptionalStep {
290    /// Create optional() step
291    pub fn new(traversal: BasicTraversal) -> Self {
292        Self {
293            id: "optional_0".to_string(),
294            labels: Vec::new(),
295            traversal,
296        }
297    }
298}
299
300impl Step for OptionalStep {
301    fn id(&self) -> &str {
302        &self.id
303    }
304
305    fn name(&self) -> &str {
306        "OptionalStep"
307    }
308
309    fn labels(&self) -> &[String] {
310        &self.labels
311    }
312
313    fn add_label(&mut self, label: String) {
314        if !self.labels.contains(&label) {
315            self.labels.push(label);
316        }
317    }
318
319    fn requirements(&self) -> &[TraverserRequirement] {
320        &[]
321    }
322
323    fn process_traverser(&self, traverser: Traverser) -> StepResult {
324        // In real impl, would try child traversal
325        // If it produces output, return that
326        // Otherwise, return original traverser
327        StepResult::emit_one(traverser)
328    }
329
330    fn reset(&mut self) {
331        self.traversal.reset();
332    }
333
334    fn clone_step(&self) -> Box<dyn Step> {
335        Box::new(self.clone())
336    }
337
338    fn as_any(&self) -> &dyn Any {
339        self
340    }
341
342    fn as_any_mut(&mut self) -> &mut dyn Any {
343        self
344    }
345}
346
347/// Repeat step - loop execution
348#[derive(Debug, Clone)]
349pub struct RepeatStep {
350    id: String,
351    labels: Vec<String>,
352    /// Loop name (for nested loops)
353    loop_name: String,
354    /// Repeat traversal
355    repeat_traversal: BasicTraversal,
356    /// Until predicate traversal
357    until_traversal: Option<BasicTraversal>,
358    /// Emit predicate traversal
359    emit_traversal: Option<BasicTraversal>,
360    /// Times limit
361    times: Option<u32>,
362    /// Until first flag (until before repeat)
363    until_first: bool,
364    /// Emit first flag (emit before repeat)
365    emit_first: bool,
366}
367
368impl RepeatStep {
369    /// Create repeat() step
370    pub fn new(repeat_traversal: BasicTraversal) -> Self {
371        static COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
372        let id = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
373
374        Self {
375            id: format!("repeat_{}", id),
376            labels: Vec::new(),
377            loop_name: format!("repeat_{}", id),
378            repeat_traversal,
379            until_traversal: None,
380            emit_traversal: None,
381            times: None,
382            until_first: false,
383            emit_first: false,
384        }
385    }
386
387    /// Set loop name
388    pub fn with_name(mut self, name: String) -> Self {
389        self.loop_name = name;
390        self
391    }
392
393    /// Set until condition
394    pub fn until(mut self, traversal: BasicTraversal) -> Self {
395        self.until_traversal = Some(traversal);
396        self
397    }
398
399    /// Set emit condition
400    pub fn emit(mut self, traversal: BasicTraversal) -> Self {
401        self.emit_traversal = Some(traversal);
402        self
403    }
404
405    /// Set times limit
406    pub fn times(mut self, times: u32) -> Self {
407        self.times = Some(times);
408        self
409    }
410
411    /// Set until-first (check until before repeat)
412    pub fn until_first(mut self) -> Self {
413        self.until_first = true;
414        self
415    }
416
417    /// Set emit-first (emit before repeat)
418    pub fn emit_first(mut self) -> Self {
419        self.emit_first = true;
420        self
421    }
422
423    /// Get loop name
424    pub fn loop_name(&self) -> &str {
425        &self.loop_name
426    }
427}
428
429impl Step for RepeatStep {
430    fn id(&self) -> &str {
431        &self.id
432    }
433
434    fn name(&self) -> &str {
435        "RepeatStep"
436    }
437
438    fn labels(&self) -> &[String] {
439        &self.labels
440    }
441
442    fn add_label(&mut self, label: String) {
443        if !self.labels.contains(&label) {
444            self.labels.push(label);
445        }
446    }
447
448    fn requirements(&self) -> &[TraverserRequirement] {
449        static REQS: &[TraverserRequirement] =
450            &[TraverserRequirement::SingleLoop, TraverserRequirement::Path];
451        REQS
452    }
453
454    fn process_traverser(&self, mut traverser: Traverser) -> StepResult {
455        // Initialize loop if needed
456        traverser.init_loop(&self.loop_name);
457
458        // Check times limit
459        if let Some(times) = self.times {
460            if traverser.loop_count(&self.loop_name) >= times {
461                return StepResult::emit_one(traverser);
462            }
463        }
464
465        // In real impl:
466        // 1. Check until condition (if until_first)
467        // 2. Emit if emit condition passes (if emit_first)
468        // 3. Execute repeat traversal
469        // 4. Increment loop counter
470        // 5. Check until condition (if not until_first)
471        // 6. Emit if emit condition passes (if not emit_first)
472        // 7. Loop back to step 1
473
474        traverser.incr_loop(&self.loop_name);
475        StepResult::emit_one(traverser)
476    }
477
478    fn reset(&mut self) {
479        self.repeat_traversal.reset();
480        if let Some(ref mut t) = self.until_traversal {
481            t.reset();
482        }
483        if let Some(ref mut t) = self.emit_traversal {
484            t.reset();
485        }
486    }
487
488    fn clone_step(&self) -> Box<dyn Step> {
489        Box::new(self.clone())
490    }
491
492    fn as_any(&self) -> &dyn Any {
493        self
494    }
495
496    fn as_any_mut(&mut self) -> &mut dyn Any {
497        self
498    }
499}
500
501/// Local step - execute child traversal for each traverser
502#[derive(Debug, Clone)]
503pub struct LocalStep {
504    id: String,
505    labels: Vec<String>,
506    /// Child traversal
507    traversal: BasicTraversal,
508}
509
510impl LocalStep {
511    /// Create local() step
512    pub fn new(traversal: BasicTraversal) -> Self {
513        Self {
514            id: "local_0".to_string(),
515            labels: Vec::new(),
516            traversal,
517        }
518    }
519}
520
521impl Step for LocalStep {
522    fn id(&self) -> &str {
523        &self.id
524    }
525
526    fn name(&self) -> &str {
527        "LocalStep"
528    }
529
530    fn labels(&self) -> &[String] {
531        &self.labels
532    }
533
534    fn add_label(&mut self, label: String) {
535        if !self.labels.contains(&label) {
536            self.labels.push(label);
537        }
538    }
539
540    fn requirements(&self) -> &[TraverserRequirement] {
541        &[]
542    }
543
544    fn process_traverser(&self, traverser: Traverser) -> StepResult {
545        // In real impl, would execute child traversal with this traverser
546        StepResult::emit_one(traverser)
547    }
548
549    fn reset(&mut self) {
550        self.traversal.reset();
551    }
552
553    fn clone_step(&self) -> Box<dyn Step> {
554        Box::new(self.clone())
555    }
556
557    fn as_any(&self) -> &dyn Any {
558        self
559    }
560
561    fn as_any_mut(&mut self) -> &mut dyn Any {
562        self
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569
570    #[test]
571    fn test_choose_step() {
572        let step = ChooseStep::new();
573        assert_eq!(step.name(), "ChooseStep");
574    }
575
576    #[test]
577    fn test_union_step() {
578        let branch1 = BasicTraversal::new();
579        let branch2 = BasicTraversal::new();
580        let step = UnionStep::new(vec![branch1, branch2]);
581
582        assert_eq!(step.get_branches().len(), 2);
583
584        let traverser = Traverser::new("v1");
585        let result = step.process_traverser(traverser);
586        if let StepResult::Emit(traversers) = result {
587            assert_eq!(traversers.len(), 2);
588        }
589    }
590
591    #[test]
592    fn test_coalesce_step() {
593        let step = CoalesceStep::new(vec![BasicTraversal::new()]);
594        assert_eq!(step.name(), "CoalesceStep");
595
596        let traverser = Traverser::new("v1");
597        let result = step.process_traverser(traverser);
598        assert!(matches!(result, StepResult::Emit(_)));
599    }
600
601    #[test]
602    fn test_optional_step() {
603        let step = OptionalStep::new(BasicTraversal::new());
604        assert_eq!(step.name(), "OptionalStep");
605
606        let traverser = Traverser::new("v1");
607        let result = step.process_traverser(traverser);
608        assert!(matches!(result, StepResult::Emit(_)));
609    }
610
611    #[test]
612    fn test_repeat_step() {
613        let repeat_t = BasicTraversal::new();
614        let step = RepeatStep::new(repeat_t).times(3);
615
616        assert_eq!(step.name(), "RepeatStep");
617
618        let traverser = Traverser::new("v1");
619        let result = step.process_traverser(traverser);
620        if let StepResult::Emit(t) = result {
621            assert_eq!(t[0].loop_count(step.loop_name()), 1);
622        }
623    }
624
625    #[test]
626    fn test_repeat_with_until_first() {
627        let step = RepeatStep::new(BasicTraversal::new())
628            .until_first()
629            .times(5);
630
631        assert!(step.until_first);
632    }
633
634    #[test]
635    fn test_local_step() {
636        let step = LocalStep::new(BasicTraversal::new());
637        assert_eq!(step.name(), "LocalStep");
638    }
639}