reddb_server/storage/query/step/
barrier.rs1use super::{Step, StepResult, Traverser, TraverserRequirement, TraverserValue};
15use crate::json;
16use crate::serde_json::Value;
17use std::any::Any;
18use std::collections::HashMap;
19
20pub trait BarrierStep: Step {
22 fn add_to_barrier(&mut self, traverser: Traverser);
24
25 fn flush_barrier(&mut self) -> Vec<Traverser>;
27
28 fn has_data(&self) -> bool;
30
31 fn is_ready(&self) -> bool;
33}
34
35#[derive(Debug, Clone)]
37pub struct ReducingBarrierStep<T: Clone + Send + Sync + std::fmt::Debug> {
38 id: String,
39 labels: Vec<String>,
40 seed: T,
42 accumulator: Option<T>,
44 reducer_name: String,
46}
47
48#[derive(Debug, Clone)]
50pub struct FoldStep {
51 id: String,
52 labels: Vec<String>,
53 values: Vec<Value>,
55}
56
57impl FoldStep {
58 pub fn new() -> Self {
60 Self {
61 id: "fold_0".to_string(),
62 labels: Vec::new(),
63 values: Vec::new(),
64 }
65 }
66}
67
68impl Default for FoldStep {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl Step for FoldStep {
75 fn id(&self) -> &str {
76 &self.id
77 }
78
79 fn name(&self) -> &str {
80 "FoldStep"
81 }
82
83 fn labels(&self) -> &[String] {
84 &self.labels
85 }
86
87 fn add_label(&mut self, label: String) {
88 if !self.labels.contains(&label) {
89 self.labels.push(label);
90 }
91 }
92
93 fn requirements(&self) -> &[TraverserRequirement] {
94 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
95 REQS
96 }
97
98 fn process_traverser(&self, traverser: Traverser) -> StepResult {
99 StepResult::Hold(vec![traverser])
101 }
102
103 fn reset(&mut self) {
104 self.values.clear();
105 }
106
107 fn clone_step(&self) -> Box<dyn Step> {
108 Box::new(self.clone())
109 }
110
111 fn as_any(&self) -> &dyn Any {
112 self
113 }
114
115 fn as_any_mut(&mut self) -> &mut dyn Any {
116 self
117 }
118}
119
120impl BarrierStep for FoldStep {
121 fn add_to_barrier(&mut self, traverser: Traverser) {
122 for _ in 0..traverser.bulk() {
124 self.values.push(traverser.value().to_json());
125 }
126 }
127
128 fn flush_barrier(&mut self) -> Vec<Traverser> {
129 if self.values.is_empty() {
130 return vec![Traverser::with_value(TraverserValue::List(vec![]))];
131 }
132
133 let result = std::mem::take(&mut self.values);
134 vec![Traverser::with_value(TraverserValue::List(result))]
135 }
136
137 fn has_data(&self) -> bool {
138 !self.values.is_empty()
139 }
140
141 fn is_ready(&self) -> bool {
142 true }
144}
145
146#[derive(Debug, Clone)]
148pub struct CollectingBarrierStep {
149 id: String,
150 labels: Vec<String>,
151 values: Vec<Value>,
153 container: ContainerType,
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum ContainerType {
160 List,
162 Set,
164 BulkMap,
166}
167
168impl CollectingBarrierStep {
169 pub fn to_list() -> Self {
171 Self {
172 id: "toList_0".to_string(),
173 labels: Vec::new(),
174 values: Vec::new(),
175 container: ContainerType::List,
176 }
177 }
178
179 pub fn to_set() -> Self {
181 Self {
182 id: "toSet_0".to_string(),
183 labels: Vec::new(),
184 values: Vec::new(),
185 container: ContainerType::Set,
186 }
187 }
188
189 pub fn to_bulk_map() -> Self {
191 Self {
192 id: "toBulkSet_0".to_string(),
193 labels: Vec::new(),
194 values: Vec::new(),
195 container: ContainerType::BulkMap,
196 }
197 }
198}
199
200impl Step for CollectingBarrierStep {
201 fn id(&self) -> &str {
202 &self.id
203 }
204
205 fn name(&self) -> &str {
206 match self.container {
207 ContainerType::List => "ToListStep",
208 ContainerType::Set => "ToSetStep",
209 ContainerType::BulkMap => "ToBulkSetStep",
210 }
211 }
212
213 fn labels(&self) -> &[String] {
214 &self.labels
215 }
216
217 fn add_label(&mut self, label: String) {
218 if !self.labels.contains(&label) {
219 self.labels.push(label);
220 }
221 }
222
223 fn requirements(&self) -> &[TraverserRequirement] {
224 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
225 REQS
226 }
227
228 fn process_traverser(&self, traverser: Traverser) -> StepResult {
229 StepResult::Hold(vec![traverser])
230 }
231
232 fn reset(&mut self) {
233 self.values.clear();
234 }
235
236 fn clone_step(&self) -> Box<dyn Step> {
237 Box::new(self.clone())
238 }
239
240 fn as_any(&self) -> &dyn Any {
241 self
242 }
243
244 fn as_any_mut(&mut self) -> &mut dyn Any {
245 self
246 }
247}
248
249impl BarrierStep for CollectingBarrierStep {
250 fn add_to_barrier(&mut self, traverser: Traverser) {
251 let value = traverser.value().to_json();
252
253 match self.container {
254 ContainerType::List => {
255 for _ in 0..traverser.bulk() {
256 self.values.push(value.clone());
257 }
258 }
259 ContainerType::Set => {
260 if !self.values.contains(&value) {
261 self.values.push(value);
262 }
263 }
264 ContainerType::BulkMap => {
265 self.values.push(value);
267 }
268 }
269 }
270
271 fn flush_barrier(&mut self) -> Vec<Traverser> {
272 let result = std::mem::take(&mut self.values);
273 vec![Traverser::with_value(TraverserValue::List(result))]
274 }
275
276 fn has_data(&self) -> bool {
277 !self.values.is_empty()
278 }
279
280 fn is_ready(&self) -> bool {
281 true
282 }
283}
284
285#[derive(Debug, Clone)]
287pub struct GroupStep {
288 id: String,
289 labels: Vec<String>,
290 groups: HashMap<String, Vec<Value>>,
292 state: char,
294}
295
296impl GroupStep {
297 pub fn new() -> Self {
299 Self {
300 id: "group_0".to_string(),
301 labels: Vec::new(),
302 groups: HashMap::new(),
303 state: 'k',
304 }
305 }
306}
307
308impl Default for GroupStep {
309 fn default() -> Self {
310 Self::new()
311 }
312}
313
314impl Step for GroupStep {
315 fn id(&self) -> &str {
316 &self.id
317 }
318
319 fn name(&self) -> &str {
320 "GroupStep"
321 }
322
323 fn labels(&self) -> &[String] {
324 &self.labels
325 }
326
327 fn add_label(&mut self, label: String) {
328 if !self.labels.contains(&label) {
329 self.labels.push(label);
330 }
331 }
332
333 fn requirements(&self) -> &[TraverserRequirement] {
334 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
335 REQS
336 }
337
338 fn process_traverser(&self, traverser: Traverser) -> StepResult {
339 StepResult::Hold(vec![traverser])
340 }
341
342 fn reset(&mut self) {
343 self.groups.clear();
344 self.state = 'k';
345 }
346
347 fn clone_step(&self) -> Box<dyn Step> {
348 Box::new(self.clone())
349 }
350
351 fn as_any(&self) -> &dyn Any {
352 self
353 }
354
355 fn as_any_mut(&mut self) -> &mut dyn Any {
356 self
357 }
358}
359
360impl BarrierStep for GroupStep {
361 fn add_to_barrier(&mut self, traverser: Traverser) {
362 let key = match traverser.value() {
365 TraverserValue::String(s) => s.clone(),
366 TraverserValue::Vertex(id) => id.clone(),
367 other => format!("{:?}", other),
368 };
369
370 self.groups
371 .entry(key)
372 .or_default()
373 .push(traverser.value().to_json());
374 }
375
376 fn flush_barrier(&mut self) -> Vec<Traverser> {
377 let result: HashMap<String, Value> = self
378 .groups
379 .drain()
380 .map(|(k, v)| (k, Value::Array(v)))
381 .collect();
382
383 vec![Traverser::with_value(TraverserValue::Map(result))]
384 }
385
386 fn has_data(&self) -> bool {
387 !self.groups.is_empty()
388 }
389
390 fn is_ready(&self) -> bool {
391 true
392 }
393}
394
395#[derive(Debug, Clone)]
397pub struct GroupCountStep {
398 id: String,
399 labels: Vec<String>,
400 counts: HashMap<String, u64>,
402}
403
404impl GroupCountStep {
405 pub fn new() -> Self {
407 Self {
408 id: "groupCount_0".to_string(),
409 labels: Vec::new(),
410 counts: HashMap::new(),
411 }
412 }
413}
414
415impl Default for GroupCountStep {
416 fn default() -> Self {
417 Self::new()
418 }
419}
420
421impl Step for GroupCountStep {
422 fn id(&self) -> &str {
423 &self.id
424 }
425
426 fn name(&self) -> &str {
427 "GroupCountStep"
428 }
429
430 fn labels(&self) -> &[String] {
431 &self.labels
432 }
433
434 fn add_label(&mut self, label: String) {
435 if !self.labels.contains(&label) {
436 self.labels.push(label);
437 }
438 }
439
440 fn requirements(&self) -> &[TraverserRequirement] {
441 static REQS: &[TraverserRequirement] =
442 &[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
443 REQS
444 }
445
446 fn process_traverser(&self, traverser: Traverser) -> StepResult {
447 StepResult::Hold(vec![traverser])
448 }
449
450 fn reset(&mut self) {
451 self.counts.clear();
452 }
453
454 fn clone_step(&self) -> Box<dyn Step> {
455 Box::new(self.clone())
456 }
457
458 fn as_any(&self) -> &dyn Any {
459 self
460 }
461
462 fn as_any_mut(&mut self) -> &mut dyn Any {
463 self
464 }
465}
466
467impl BarrierStep for GroupCountStep {
468 fn add_to_barrier(&mut self, traverser: Traverser) {
469 let key = match traverser.value() {
470 TraverserValue::String(s) => s.clone(),
471 TraverserValue::Vertex(id) => id.clone(),
472 other => format!("{:?}", other),
473 };
474
475 *self.counts.entry(key).or_insert(0) += traverser.bulk();
476 }
477
478 fn flush_barrier(&mut self) -> Vec<Traverser> {
479 let result: HashMap<String, Value> =
480 self.counts.drain().map(|(k, v)| (k, json!(v))).collect();
481
482 vec![Traverser::with_value(TraverserValue::Map(result))]
483 }
484
485 fn has_data(&self) -> bool {
486 !self.counts.is_empty()
487 }
488
489 fn is_ready(&self) -> bool {
490 true
491 }
492}
493
494#[derive(Debug, Clone)]
496pub struct OrderStep {
497 id: String,
498 labels: Vec<String>,
499 traversers: Vec<Traverser>,
501 ascending: bool,
503}
504
505impl OrderStep {
506 pub fn new() -> Self {
508 Self {
509 id: "order_0".to_string(),
510 labels: Vec::new(),
511 traversers: Vec::new(),
512 ascending: true,
513 }
514 }
515
516 pub fn descending() -> Self {
518 Self {
519 id: "order_desc_0".to_string(),
520 labels: Vec::new(),
521 traversers: Vec::new(),
522 ascending: false,
523 }
524 }
525}
526
527impl Default for OrderStep {
528 fn default() -> Self {
529 Self::new()
530 }
531}
532
533impl Step for OrderStep {
534 fn id(&self) -> &str {
535 &self.id
536 }
537
538 fn name(&self) -> &str {
539 "OrderStep"
540 }
541
542 fn labels(&self) -> &[String] {
543 &self.labels
544 }
545
546 fn add_label(&mut self, label: String) {
547 if !self.labels.contains(&label) {
548 self.labels.push(label);
549 }
550 }
551
552 fn requirements(&self) -> &[TraverserRequirement] {
553 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
554 REQS
555 }
556
557 fn process_traverser(&self, traverser: Traverser) -> StepResult {
558 StepResult::Hold(vec![traverser])
559 }
560
561 fn reset(&mut self) {
562 self.traversers.clear();
563 }
564
565 fn clone_step(&self) -> Box<dyn Step> {
566 Box::new(self.clone())
567 }
568
569 fn as_any(&self) -> &dyn Any {
570 self
571 }
572
573 fn as_any_mut(&mut self) -> &mut dyn Any {
574 self
575 }
576}
577
578impl BarrierStep for OrderStep {
579 fn add_to_barrier(&mut self, traverser: Traverser) {
580 self.traversers.push(traverser);
581 }
582
583 fn flush_barrier(&mut self) -> Vec<Traverser> {
584 let ascending = self.ascending;
586 self.traversers.sort_by(|a, b| {
587 let a_str = format!("{:?}", a.value());
588 let b_str = format!("{:?}", b.value());
589 if ascending {
590 a_str.cmp(&b_str)
591 } else {
592 b_str.cmp(&a_str)
593 }
594 });
595
596 std::mem::take(&mut self.traversers)
597 }
598
599 fn has_data(&self) -> bool {
600 !self.traversers.is_empty()
601 }
602
603 fn is_ready(&self) -> bool {
604 true
605 }
606}
607
608#[derive(Debug, Clone)]
610pub struct SumStep {
611 id: String,
612 labels: Vec<String>,
613 sum: f64,
614}
615
616impl SumStep {
617 pub fn new() -> Self {
619 Self {
620 id: "sum_0".to_string(),
621 labels: Vec::new(),
622 sum: 0.0,
623 }
624 }
625}
626
627impl Default for SumStep {
628 fn default() -> Self {
629 Self::new()
630 }
631}
632
633impl Step for SumStep {
634 fn id(&self) -> &str {
635 &self.id
636 }
637
638 fn name(&self) -> &str {
639 "SumStep"
640 }
641
642 fn labels(&self) -> &[String] {
643 &self.labels
644 }
645
646 fn add_label(&mut self, label: String) {
647 if !self.labels.contains(&label) {
648 self.labels.push(label);
649 }
650 }
651
652 fn requirements(&self) -> &[TraverserRequirement] {
653 static REQS: &[TraverserRequirement] =
654 &[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
655 REQS
656 }
657
658 fn process_traverser(&self, traverser: Traverser) -> StepResult {
659 StepResult::Hold(vec![traverser])
660 }
661
662 fn reset(&mut self) {
663 self.sum = 0.0;
664 }
665
666 fn clone_step(&self) -> Box<dyn Step> {
667 Box::new(self.clone())
668 }
669
670 fn as_any(&self) -> &dyn Any {
671 self
672 }
673
674 fn as_any_mut(&mut self) -> &mut dyn Any {
675 self
676 }
677}
678
679impl BarrierStep for SumStep {
680 fn add_to_barrier(&mut self, traverser: Traverser) {
681 let value = match traverser.value() {
682 TraverserValue::Integer(i) => *i as f64,
683 TraverserValue::Float(f) => *f,
684 _ => 0.0,
685 };
686 self.sum += value * traverser.bulk() as f64;
687 }
688
689 fn flush_barrier(&mut self) -> Vec<Traverser> {
690 let result = self.sum;
691 self.sum = 0.0;
692 vec![Traverser::with_value(TraverserValue::Float(result))]
693 }
694
695 fn has_data(&self) -> bool {
696 self.sum != 0.0
697 }
698
699 fn is_ready(&self) -> bool {
700 true
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[test]
709 fn test_fold_step() {
710 let mut step = FoldStep::new();
711
712 step.add_to_barrier(Traverser::new("v1"));
713 step.add_to_barrier(Traverser::new("v2"));
714 step.add_to_barrier(Traverser::new("v3"));
715
716 let result = step.flush_barrier();
717 assert_eq!(result.len(), 1);
718
719 if let TraverserValue::List(list) = result[0].value() {
720 assert_eq!(list.len(), 3);
721 } else {
722 panic!("Expected list");
723 }
724 }
725
726 #[test]
727 fn test_fold_with_bulk() {
728 let mut step = FoldStep::new();
729
730 let mut t = Traverser::new("v1");
731 t.set_bulk(3);
732 step.add_to_barrier(t);
733
734 let result = step.flush_barrier();
735 if let TraverserValue::List(list) = result[0].value() {
736 assert_eq!(list.len(), 3); }
738 }
739
740 #[test]
741 fn test_group_step() {
742 let mut step = GroupStep::new();
743
744 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
745 "a".to_string(),
746 )));
747 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
748 "b".to_string(),
749 )));
750 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
751 "a".to_string(),
752 )));
753
754 let result = step.flush_barrier();
755 assert_eq!(result.len(), 1);
756
757 if let TraverserValue::Map(map) = result[0].value() {
758 assert_eq!(map.len(), 2); }
760 }
761
762 #[test]
763 fn test_group_count_step() {
764 let mut step = GroupCountStep::new();
765
766 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
767 "a".to_string(),
768 )));
769 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
770 "b".to_string(),
771 )));
772
773 let mut t = Traverser::with_value(TraverserValue::String("a".to_string()));
774 t.set_bulk(2);
775 step.add_to_barrier(t);
776
777 let result = step.flush_barrier();
778 if let TraverserValue::Map(map) = result[0].value() {
779 assert_eq!(map.get("a"), Some(&json!(3)));
780 assert_eq!(map.get("b"), Some(&json!(1)));
781 }
782 }
783
784 #[test]
785 fn test_order_step() {
786 let mut step = OrderStep::new();
787
788 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
789 "c".to_string(),
790 )));
791 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
792 "a".to_string(),
793 )));
794 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
795 "b".to_string(),
796 )));
797
798 let result = step.flush_barrier();
799 assert_eq!(result.len(), 3);
800 }
802
803 #[test]
804 fn test_sum_step() {
805 let mut step = SumStep::new();
806
807 step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(10)));
808 step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(20)));
809 step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(30)));
810
811 let result = step.flush_barrier();
812 if let TraverserValue::Float(sum) = result[0].value() {
813 assert_eq!(*sum, 60.0);
814 }
815 }
816
817 #[test]
818 fn test_collecting_to_set() {
819 let mut step = CollectingBarrierStep::to_set();
820
821 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
822 "a".to_string(),
823 )));
824 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
825 "a".to_string(),
826 )));
827 step.add_to_barrier(Traverser::with_value(TraverserValue::String(
828 "b".to_string(),
829 )));
830
831 let result = step.flush_barrier();
832 if let TraverserValue::List(list) = result[0].value() {
833 assert_eq!(list.len(), 2); }
835 }
836}