Skip to main content

reddb_server/storage/query/step/
barrier.rs

1//! Barrier Steps
2//!
3//! Steps that synchronize traversers before emitting results.
4//! Barriers require all upstream traversers to complete before processing.
5//!
6//! # Steps
7//!
8//! - `fold()`: Collect all into list
9//! - `group()`: Group by key
10//! - `groupCount()`: Count by key
11//! - `order()`: Sort traversers
12//! - `sum()`, `max()`, `min()`, `mean()`: Aggregations
13
14use super::{Step, StepResult, Traverser, TraverserRequirement, TraverserValue};
15use crate::json;
16use crate::serde_json::Value;
17use std::any::Any;
18use std::collections::HashMap;
19
20/// Trait for barrier steps (synchronization points)
21pub trait BarrierStep: Step {
22    /// Add traverser to barrier
23    fn add_to_barrier(&mut self, traverser: Traverser);
24
25    /// Flush barrier and produce results
26    fn flush_barrier(&mut self) -> Vec<Traverser>;
27
28    /// Check if barrier has accumulated data
29    fn has_data(&self) -> bool;
30
31    /// Check if barrier is ready to flush
32    fn is_ready(&self) -> bool;
33}
34
35/// Generic reducing barrier step
36#[derive(Debug, Clone)]
37pub struct ReducingBarrierStep<T: Clone + Send + Sync + std::fmt::Debug> {
38    id: String,
39    labels: Vec<String>,
40    /// Accumulated value
41    seed: T,
42    /// Current accumulated value
43    accumulator: Option<T>,
44    /// Reduction function name
45    reducer_name: String,
46}
47
48/// Fold step - collects all traversers into a list
49#[derive(Debug, Clone)]
50pub struct FoldStep {
51    id: String,
52    labels: Vec<String>,
53    /// Accumulated values
54    values: Vec<Value>,
55}
56
57impl FoldStep {
58    /// Create fold() step
59    pub fn new() -> Self {
60        Self {
61            id: "fold_0".to_string(),
62            labels: Vec::new(),
63            values: Vec::new(),
64        }
65    }
66}
67
68impl Default for FoldStep {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl Step for FoldStep {
75    fn id(&self) -> &str {
76        &self.id
77    }
78
79    fn name(&self) -> &str {
80        "FoldStep"
81    }
82
83    fn labels(&self) -> &[String] {
84        &self.labels
85    }
86
87    fn add_label(&mut self, label: String) {
88        if !self.labels.contains(&label) {
89            self.labels.push(label);
90        }
91    }
92
93    fn requirements(&self) -> &[TraverserRequirement] {
94        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
95        REQS
96    }
97
98    fn process_traverser(&self, traverser: Traverser) -> StepResult {
99        // Barrier steps hold traversers
100        StepResult::Hold(vec![traverser])
101    }
102
103    fn reset(&mut self) {
104        self.values.clear();
105    }
106
107    fn clone_step(&self) -> Box<dyn Step> {
108        Box::new(self.clone())
109    }
110
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114
115    fn as_any_mut(&mut self) -> &mut dyn Any {
116        self
117    }
118}
119
120impl BarrierStep for FoldStep {
121    fn add_to_barrier(&mut self, traverser: Traverser) {
122        // Add value based on bulk
123        for _ in 0..traverser.bulk() {
124            self.values.push(traverser.value().to_json());
125        }
126    }
127
128    fn flush_barrier(&mut self) -> Vec<Traverser> {
129        if self.values.is_empty() {
130            return vec![Traverser::with_value(TraverserValue::List(vec![]))];
131        }
132
133        let result = std::mem::take(&mut self.values);
134        vec![Traverser::with_value(TraverserValue::List(result))]
135    }
136
137    fn has_data(&self) -> bool {
138        !self.values.is_empty()
139    }
140
141    fn is_ready(&self) -> bool {
142        true // Always ready - needs external signal
143    }
144}
145
146/// Collecting barrier step - collects into various containers
147#[derive(Debug, Clone)]
148pub struct CollectingBarrierStep {
149    id: String,
150    labels: Vec<String>,
151    /// Collected values
152    values: Vec<Value>,
153    /// Container type
154    container: ContainerType,
155}
156
157/// Container types for collecting
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum ContainerType {
160    /// Standard list
161    List,
162    /// Set (unique values)
163    Set,
164    /// Bulk map (value -> count)
165    BulkMap,
166}
167
168impl CollectingBarrierStep {
169    /// Create list collector
170    pub fn to_list() -> Self {
171        Self {
172            id: "toList_0".to_string(),
173            labels: Vec::new(),
174            values: Vec::new(),
175            container: ContainerType::List,
176        }
177    }
178
179    /// Create set collector
180    pub fn to_set() -> Self {
181        Self {
182            id: "toSet_0".to_string(),
183            labels: Vec::new(),
184            values: Vec::new(),
185            container: ContainerType::Set,
186        }
187    }
188
189    /// Create bulk map collector
190    pub fn to_bulk_map() -> Self {
191        Self {
192            id: "toBulkSet_0".to_string(),
193            labels: Vec::new(),
194            values: Vec::new(),
195            container: ContainerType::BulkMap,
196        }
197    }
198}
199
200impl Step for CollectingBarrierStep {
201    fn id(&self) -> &str {
202        &self.id
203    }
204
205    fn name(&self) -> &str {
206        match self.container {
207            ContainerType::List => "ToListStep",
208            ContainerType::Set => "ToSetStep",
209            ContainerType::BulkMap => "ToBulkSetStep",
210        }
211    }
212
213    fn labels(&self) -> &[String] {
214        &self.labels
215    }
216
217    fn add_label(&mut self, label: String) {
218        if !self.labels.contains(&label) {
219            self.labels.push(label);
220        }
221    }
222
223    fn requirements(&self) -> &[TraverserRequirement] {
224        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
225        REQS
226    }
227
228    fn process_traverser(&self, traverser: Traverser) -> StepResult {
229        StepResult::Hold(vec![traverser])
230    }
231
232    fn reset(&mut self) {
233        self.values.clear();
234    }
235
236    fn clone_step(&self) -> Box<dyn Step> {
237        Box::new(self.clone())
238    }
239
240    fn as_any(&self) -> &dyn Any {
241        self
242    }
243
244    fn as_any_mut(&mut self) -> &mut dyn Any {
245        self
246    }
247}
248
249impl BarrierStep for CollectingBarrierStep {
250    fn add_to_barrier(&mut self, traverser: Traverser) {
251        let value = traverser.value().to_json();
252
253        match self.container {
254            ContainerType::List => {
255                for _ in 0..traverser.bulk() {
256                    self.values.push(value.clone());
257                }
258            }
259            ContainerType::Set => {
260                if !self.values.contains(&value) {
261                    self.values.push(value);
262                }
263            }
264            ContainerType::BulkMap => {
265                // Would use map instead of vec
266                self.values.push(value);
267            }
268        }
269    }
270
271    fn flush_barrier(&mut self) -> Vec<Traverser> {
272        let result = std::mem::take(&mut self.values);
273        vec![Traverser::with_value(TraverserValue::List(result))]
274    }
275
276    fn has_data(&self) -> bool {
277        !self.values.is_empty()
278    }
279
280    fn is_ready(&self) -> bool {
281        true
282    }
283}
284
285/// Group step - groups by key
286#[derive(Debug, Clone)]
287pub struct GroupStep {
288    id: String,
289    labels: Vec<String>,
290    /// Group storage: key -> values
291    groups: HashMap<String, Vec<Value>>,
292    /// Current state: 'k' for key, 'v' for value
293    state: char,
294}
295
296impl GroupStep {
297    /// Create group() step
298    pub fn new() -> Self {
299        Self {
300            id: "group_0".to_string(),
301            labels: Vec::new(),
302            groups: HashMap::new(),
303            state: 'k',
304        }
305    }
306}
307
308impl Default for GroupStep {
309    fn default() -> Self {
310        Self::new()
311    }
312}
313
314impl Step for GroupStep {
315    fn id(&self) -> &str {
316        &self.id
317    }
318
319    fn name(&self) -> &str {
320        "GroupStep"
321    }
322
323    fn labels(&self) -> &[String] {
324        &self.labels
325    }
326
327    fn add_label(&mut self, label: String) {
328        if !self.labels.contains(&label) {
329            self.labels.push(label);
330        }
331    }
332
333    fn requirements(&self) -> &[TraverserRequirement] {
334        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
335        REQS
336    }
337
338    fn process_traverser(&self, traverser: Traverser) -> StepResult {
339        StepResult::Hold(vec![traverser])
340    }
341
342    fn reset(&mut self) {
343        self.groups.clear();
344        self.state = 'k';
345    }
346
347    fn clone_step(&self) -> Box<dyn Step> {
348        Box::new(self.clone())
349    }
350
351    fn as_any(&self) -> &dyn Any {
352        self
353    }
354
355    fn as_any_mut(&mut self) -> &mut dyn Any {
356        self
357    }
358}
359
360impl BarrierStep for GroupStep {
361    fn add_to_barrier(&mut self, traverser: Traverser) {
362        // In real impl, would use key/value child traversals
363        // For now, use value.to_string() as key
364        let key = match traverser.value() {
365            TraverserValue::String(s) => s.clone(),
366            TraverserValue::Vertex(id) => id.clone(),
367            other => format!("{:?}", other),
368        };
369
370        self.groups
371            .entry(key)
372            .or_default()
373            .push(traverser.value().to_json());
374    }
375
376    fn flush_barrier(&mut self) -> Vec<Traverser> {
377        let result: HashMap<String, Value> = self
378            .groups
379            .drain()
380            .map(|(k, v)| (k, Value::Array(v)))
381            .collect();
382
383        vec![Traverser::with_value(TraverserValue::Map(result))]
384    }
385
386    fn has_data(&self) -> bool {
387        !self.groups.is_empty()
388    }
389
390    fn is_ready(&self) -> bool {
391        true
392    }
393}
394
395/// GroupCount step - counts by key
396#[derive(Debug, Clone)]
397pub struct GroupCountStep {
398    id: String,
399    labels: Vec<String>,
400    /// Count storage: key -> count
401    counts: HashMap<String, u64>,
402}
403
404impl GroupCountStep {
405    /// Create groupCount() step
406    pub fn new() -> Self {
407        Self {
408            id: "groupCount_0".to_string(),
409            labels: Vec::new(),
410            counts: HashMap::new(),
411        }
412    }
413}
414
415impl Default for GroupCountStep {
416    fn default() -> Self {
417        Self::new()
418    }
419}
420
421impl Step for GroupCountStep {
422    fn id(&self) -> &str {
423        &self.id
424    }
425
426    fn name(&self) -> &str {
427        "GroupCountStep"
428    }
429
430    fn labels(&self) -> &[String] {
431        &self.labels
432    }
433
434    fn add_label(&mut self, label: String) {
435        if !self.labels.contains(&label) {
436            self.labels.push(label);
437        }
438    }
439
440    fn requirements(&self) -> &[TraverserRequirement] {
441        static REQS: &[TraverserRequirement] =
442            &[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
443        REQS
444    }
445
446    fn process_traverser(&self, traverser: Traverser) -> StepResult {
447        StepResult::Hold(vec![traverser])
448    }
449
450    fn reset(&mut self) {
451        self.counts.clear();
452    }
453
454    fn clone_step(&self) -> Box<dyn Step> {
455        Box::new(self.clone())
456    }
457
458    fn as_any(&self) -> &dyn Any {
459        self
460    }
461
462    fn as_any_mut(&mut self) -> &mut dyn Any {
463        self
464    }
465}
466
467impl BarrierStep for GroupCountStep {
468    fn add_to_barrier(&mut self, traverser: Traverser) {
469        let key = match traverser.value() {
470            TraverserValue::String(s) => s.clone(),
471            TraverserValue::Vertex(id) => id.clone(),
472            other => format!("{:?}", other),
473        };
474
475        *self.counts.entry(key).or_insert(0) += traverser.bulk();
476    }
477
478    fn flush_barrier(&mut self) -> Vec<Traverser> {
479        let result: HashMap<String, Value> =
480            self.counts.drain().map(|(k, v)| (k, json!(v))).collect();
481
482        vec![Traverser::with_value(TraverserValue::Map(result))]
483    }
484
485    fn has_data(&self) -> bool {
486        !self.counts.is_empty()
487    }
488
489    fn is_ready(&self) -> bool {
490        true
491    }
492}
493
494/// Order step - sorts traversers
495#[derive(Debug, Clone)]
496pub struct OrderStep {
497    id: String,
498    labels: Vec<String>,
499    /// Collected traversers for sorting
500    traversers: Vec<Traverser>,
501    /// Sort direction
502    ascending: bool,
503}
504
505impl OrderStep {
506    /// Create order() step
507    pub fn new() -> Self {
508        Self {
509            id: "order_0".to_string(),
510            labels: Vec::new(),
511            traversers: Vec::new(),
512            ascending: true,
513        }
514    }
515
516    /// Create descending order
517    pub fn descending() -> Self {
518        Self {
519            id: "order_desc_0".to_string(),
520            labels: Vec::new(),
521            traversers: Vec::new(),
522            ascending: false,
523        }
524    }
525}
526
527impl Default for OrderStep {
528    fn default() -> Self {
529        Self::new()
530    }
531}
532
533impl Step for OrderStep {
534    fn id(&self) -> &str {
535        &self.id
536    }
537
538    fn name(&self) -> &str {
539        "OrderStep"
540    }
541
542    fn labels(&self) -> &[String] {
543        &self.labels
544    }
545
546    fn add_label(&mut self, label: String) {
547        if !self.labels.contains(&label) {
548            self.labels.push(label);
549        }
550    }
551
552    fn requirements(&self) -> &[TraverserRequirement] {
553        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
554        REQS
555    }
556
557    fn process_traverser(&self, traverser: Traverser) -> StepResult {
558        StepResult::Hold(vec![traverser])
559    }
560
561    fn reset(&mut self) {
562        self.traversers.clear();
563    }
564
565    fn clone_step(&self) -> Box<dyn Step> {
566        Box::new(self.clone())
567    }
568
569    fn as_any(&self) -> &dyn Any {
570        self
571    }
572
573    fn as_any_mut(&mut self) -> &mut dyn Any {
574        self
575    }
576}
577
578impl BarrierStep for OrderStep {
579    fn add_to_barrier(&mut self, traverser: Traverser) {
580        self.traversers.push(traverser);
581    }
582
583    fn flush_barrier(&mut self) -> Vec<Traverser> {
584        // Sort by value string representation
585        let ascending = self.ascending;
586        self.traversers.sort_by(|a, b| {
587            let a_str = format!("{:?}", a.value());
588            let b_str = format!("{:?}", b.value());
589            if ascending {
590                a_str.cmp(&b_str)
591            } else {
592                b_str.cmp(&a_str)
593            }
594        });
595
596        std::mem::take(&mut self.traversers)
597    }
598
599    fn has_data(&self) -> bool {
600        !self.traversers.is_empty()
601    }
602
603    fn is_ready(&self) -> bool {
604        true
605    }
606}
607
608/// Sum step - sums numeric values
609#[derive(Debug, Clone)]
610pub struct SumStep {
611    id: String,
612    labels: Vec<String>,
613    sum: f64,
614}
615
616impl SumStep {
617    /// Create sum() step
618    pub fn new() -> Self {
619        Self {
620            id: "sum_0".to_string(),
621            labels: Vec::new(),
622            sum: 0.0,
623        }
624    }
625}
626
627impl Default for SumStep {
628    fn default() -> Self {
629        Self::new()
630    }
631}
632
633impl Step for SumStep {
634    fn id(&self) -> &str {
635        &self.id
636    }
637
638    fn name(&self) -> &str {
639        "SumStep"
640    }
641
642    fn labels(&self) -> &[String] {
643        &self.labels
644    }
645
646    fn add_label(&mut self, label: String) {
647        if !self.labels.contains(&label) {
648            self.labels.push(label);
649        }
650    }
651
652    fn requirements(&self) -> &[TraverserRequirement] {
653        static REQS: &[TraverserRequirement] =
654            &[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
655        REQS
656    }
657
658    fn process_traverser(&self, traverser: Traverser) -> StepResult {
659        StepResult::Hold(vec![traverser])
660    }
661
662    fn reset(&mut self) {
663        self.sum = 0.0;
664    }
665
666    fn clone_step(&self) -> Box<dyn Step> {
667        Box::new(self.clone())
668    }
669
670    fn as_any(&self) -> &dyn Any {
671        self
672    }
673
674    fn as_any_mut(&mut self) -> &mut dyn Any {
675        self
676    }
677}
678
679impl BarrierStep for SumStep {
680    fn add_to_barrier(&mut self, traverser: Traverser) {
681        let value = match traverser.value() {
682            TraverserValue::Integer(i) => *i as f64,
683            TraverserValue::Float(f) => *f,
684            _ => 0.0,
685        };
686        self.sum += value * traverser.bulk() as f64;
687    }
688
689    fn flush_barrier(&mut self) -> Vec<Traverser> {
690        let result = self.sum;
691        self.sum = 0.0;
692        vec![Traverser::with_value(TraverserValue::Float(result))]
693    }
694
695    fn has_data(&self) -> bool {
696        self.sum != 0.0
697    }
698
699    fn is_ready(&self) -> bool {
700        true
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707
708    #[test]
709    fn test_fold_step() {
710        let mut step = FoldStep::new();
711
712        step.add_to_barrier(Traverser::new("v1"));
713        step.add_to_barrier(Traverser::new("v2"));
714        step.add_to_barrier(Traverser::new("v3"));
715
716        let result = step.flush_barrier();
717        assert_eq!(result.len(), 1);
718
719        if let TraverserValue::List(list) = result[0].value() {
720            assert_eq!(list.len(), 3);
721        } else {
722            panic!("Expected list");
723        }
724    }
725
726    #[test]
727    fn test_fold_with_bulk() {
728        let mut step = FoldStep::new();
729
730        let mut t = Traverser::new("v1");
731        t.set_bulk(3);
732        step.add_to_barrier(t);
733
734        let result = step.flush_barrier();
735        if let TraverserValue::List(list) = result[0].value() {
736            assert_eq!(list.len(), 3); // Bulk expanded
737        }
738    }
739
740    #[test]
741    fn test_group_step() {
742        let mut step = GroupStep::new();
743
744        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
745            "a".to_string(),
746        )));
747        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
748            "b".to_string(),
749        )));
750        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
751            "a".to_string(),
752        )));
753
754        let result = step.flush_barrier();
755        assert_eq!(result.len(), 1);
756
757        if let TraverserValue::Map(map) = result[0].value() {
758            assert_eq!(map.len(), 2); // Two groups: a and b
759        }
760    }
761
762    #[test]
763    fn test_group_count_step() {
764        let mut step = GroupCountStep::new();
765
766        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
767            "a".to_string(),
768        )));
769        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
770            "b".to_string(),
771        )));
772
773        let mut t = Traverser::with_value(TraverserValue::String("a".to_string()));
774        t.set_bulk(2);
775        step.add_to_barrier(t);
776
777        let result = step.flush_barrier();
778        if let TraverserValue::Map(map) = result[0].value() {
779            assert_eq!(map.get("a"), Some(&json!(3)));
780            assert_eq!(map.get("b"), Some(&json!(1)));
781        }
782    }
783
784    #[test]
785    fn test_order_step() {
786        let mut step = OrderStep::new();
787
788        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
789            "c".to_string(),
790        )));
791        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
792            "a".to_string(),
793        )));
794        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
795            "b".to_string(),
796        )));
797
798        let result = step.flush_barrier();
799        assert_eq!(result.len(), 3);
800        // Should be sorted ascending
801    }
802
803    #[test]
804    fn test_sum_step() {
805        let mut step = SumStep::new();
806
807        step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(10)));
808        step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(20)));
809        step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(30)));
810
811        let result = step.flush_barrier();
812        if let TraverserValue::Float(sum) = result[0].value() {
813            assert_eq!(*sum, 60.0);
814        }
815    }
816
817    #[test]
818    fn test_collecting_to_set() {
819        let mut step = CollectingBarrierStep::to_set();
820
821        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
822            "a".to_string(),
823        )));
824        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
825            "a".to_string(),
826        )));
827        step.add_to_barrier(Traverser::with_value(TraverserValue::String(
828            "b".to_string(),
829        )));
830
831        let result = step.flush_barrier();
832        if let TraverserValue::List(list) = result[0].value() {
833            assert_eq!(list.len(), 2); // Duplicates removed
834        }
835    }
836}