reddb_server/storage/query/step/
branch.rs1use super::{
14 BasicTraversal, Step, StepResult, Traversal, Traverser, TraverserRequirement, TraverserValue,
15};
16use std::any::Any;
17
18pub trait BranchStep: Step {
20 fn branches(&self) -> Vec<&dyn Traversal>;
22}
23
24#[derive(Debug, Clone)]
26pub struct ChooseStep {
27 id: String,
28 labels: Vec<String>,
29 condition: Option<BasicTraversal>,
31 options: Vec<(TraverserValue, BasicTraversal)>,
33 default_option: Option<BasicTraversal>,
35}
36
37impl ChooseStep {
38 pub fn new() -> Self {
40 Self {
41 id: "choose_0".to_string(),
42 labels: Vec::new(),
43 condition: None,
44 options: Vec::new(),
45 default_option: None,
46 }
47 }
48
49 pub fn with_condition(mut self, condition: BasicTraversal) -> Self {
51 self.condition = Some(condition);
52 self
53 }
54
55 pub fn option(mut self, value: TraverserValue, traversal: BasicTraversal) -> Self {
57 self.options.push((value, traversal));
58 self
59 }
60
61 pub fn default(mut self, traversal: BasicTraversal) -> Self {
63 self.default_option = Some(traversal);
64 self
65 }
66}
67
68impl Default for ChooseStep {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74impl Step for ChooseStep {
75 fn id(&self) -> &str {
76 &self.id
77 }
78
79 fn name(&self) -> &str {
80 "ChooseStep"
81 }
82
83 fn labels(&self) -> &[String] {
84 &self.labels
85 }
86
87 fn add_label(&mut self, label: String) {
88 if !self.labels.contains(&label) {
89 self.labels.push(label);
90 }
91 }
92
93 fn requirements(&self) -> &[TraverserRequirement] {
94 &[]
95 }
96
97 fn process_traverser(&self, traverser: Traverser) -> StepResult {
98 if self.default_option.is_some() {
104 StepResult::emit_one(traverser)
105 } else {
106 StepResult::Filter
107 }
108 }
109
110 fn reset(&mut self) {}
111
112 fn clone_step(&self) -> Box<dyn Step> {
113 Box::new(self.clone())
114 }
115
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119
120 fn as_any_mut(&mut self) -> &mut dyn Any {
121 self
122 }
123}
124
125#[derive(Debug, Clone)]
127pub struct UnionStep {
128 id: String,
129 labels: Vec<String>,
130 branches: Vec<BasicTraversal>,
132 is_start: bool,
134}
135
136impl UnionStep {
137 pub fn new(branches: Vec<BasicTraversal>) -> Self {
139 Self {
140 id: format!("union_{}", branches.len()),
141 labels: Vec::new(),
142 branches,
143 is_start: false,
144 }
145 }
146
147 pub fn as_start(mut self) -> Self {
149 self.is_start = true;
150 self
151 }
152
153 pub fn get_branches(&self) -> &[BasicTraversal] {
155 &self.branches
156 }
157}
158
159impl Step for UnionStep {
160 fn id(&self) -> &str {
161 &self.id
162 }
163
164 fn name(&self) -> &str {
165 "UnionStep"
166 }
167
168 fn labels(&self) -> &[String] {
169 &self.labels
170 }
171
172 fn add_label(&mut self, label: String) {
173 if !self.labels.contains(&label) {
174 self.labels.push(label);
175 }
176 }
177
178 fn requirements(&self) -> &[TraverserRequirement] {
179 &[]
180 }
181
182 fn process_traverser(&self, traverser: Traverser) -> StepResult {
183 let results: Vec<Traverser> = self.branches.iter().map(|_| traverser.split()).collect();
186 StepResult::emit_many(results)
187 }
188
189 fn reset(&mut self) {
190 for branch in &mut self.branches {
191 branch.reset();
192 }
193 }
194
195 fn clone_step(&self) -> Box<dyn Step> {
196 Box::new(self.clone())
197 }
198
199 fn as_any(&self) -> &dyn Any {
200 self
201 }
202
203 fn as_any_mut(&mut self) -> &mut dyn Any {
204 self
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct CoalesceStep {
211 id: String,
212 labels: Vec<String>,
213 branches: Vec<BasicTraversal>,
215}
216
217impl CoalesceStep {
218 pub fn new(branches: Vec<BasicTraversal>) -> Self {
220 Self {
221 id: format!("coalesce_{}", branches.len()),
222 labels: Vec::new(),
223 branches,
224 }
225 }
226}
227
228impl Step for CoalesceStep {
229 fn id(&self) -> &str {
230 &self.id
231 }
232
233 fn name(&self) -> &str {
234 "CoalesceStep"
235 }
236
237 fn labels(&self) -> &[String] {
238 &self.labels
239 }
240
241 fn add_label(&mut self, label: String) {
242 if !self.labels.contains(&label) {
243 self.labels.push(label);
244 }
245 }
246
247 fn requirements(&self) -> &[TraverserRequirement] {
248 &[]
249 }
250
251 fn process_traverser(&self, traverser: Traverser) -> StepResult {
252 if !self.branches.is_empty() {
255 StepResult::emit_one(traverser)
256 } else {
257 StepResult::Filter
258 }
259 }
260
261 fn reset(&mut self) {
262 for branch in &mut self.branches {
263 branch.reset();
264 }
265 }
266
267 fn clone_step(&self) -> Box<dyn Step> {
268 Box::new(self.clone())
269 }
270
271 fn as_any(&self) -> &dyn Any {
272 self
273 }
274
275 fn as_any_mut(&mut self) -> &mut dyn Any {
276 self
277 }
278}
279
280#[derive(Debug, Clone)]
282pub struct OptionalStep {
283 id: String,
284 labels: Vec<String>,
285 traversal: BasicTraversal,
287}
288
289impl OptionalStep {
290 pub fn new(traversal: BasicTraversal) -> Self {
292 Self {
293 id: "optional_0".to_string(),
294 labels: Vec::new(),
295 traversal,
296 }
297 }
298}
299
300impl Step for OptionalStep {
301 fn id(&self) -> &str {
302 &self.id
303 }
304
305 fn name(&self) -> &str {
306 "OptionalStep"
307 }
308
309 fn labels(&self) -> &[String] {
310 &self.labels
311 }
312
313 fn add_label(&mut self, label: String) {
314 if !self.labels.contains(&label) {
315 self.labels.push(label);
316 }
317 }
318
319 fn requirements(&self) -> &[TraverserRequirement] {
320 &[]
321 }
322
323 fn process_traverser(&self, traverser: Traverser) -> StepResult {
324 StepResult::emit_one(traverser)
328 }
329
330 fn reset(&mut self) {
331 self.traversal.reset();
332 }
333
334 fn clone_step(&self) -> Box<dyn Step> {
335 Box::new(self.clone())
336 }
337
338 fn as_any(&self) -> &dyn Any {
339 self
340 }
341
342 fn as_any_mut(&mut self) -> &mut dyn Any {
343 self
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct RepeatStep {
350 id: String,
351 labels: Vec<String>,
352 loop_name: String,
354 repeat_traversal: BasicTraversal,
356 until_traversal: Option<BasicTraversal>,
358 emit_traversal: Option<BasicTraversal>,
360 times: Option<u32>,
362 until_first: bool,
364 emit_first: bool,
366}
367
368impl RepeatStep {
369 pub fn new(repeat_traversal: BasicTraversal) -> Self {
371 static COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
372 let id = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
373
374 Self {
375 id: format!("repeat_{}", id),
376 labels: Vec::new(),
377 loop_name: format!("repeat_{}", id),
378 repeat_traversal,
379 until_traversal: None,
380 emit_traversal: None,
381 times: None,
382 until_first: false,
383 emit_first: false,
384 }
385 }
386
387 pub fn with_name(mut self, name: String) -> Self {
389 self.loop_name = name;
390 self
391 }
392
393 pub fn until(mut self, traversal: BasicTraversal) -> Self {
395 self.until_traversal = Some(traversal);
396 self
397 }
398
399 pub fn emit(mut self, traversal: BasicTraversal) -> Self {
401 self.emit_traversal = Some(traversal);
402 self
403 }
404
405 pub fn times(mut self, times: u32) -> Self {
407 self.times = Some(times);
408 self
409 }
410
411 pub fn until_first(mut self) -> Self {
413 self.until_first = true;
414 self
415 }
416
417 pub fn emit_first(mut self) -> Self {
419 self.emit_first = true;
420 self
421 }
422
423 pub fn loop_name(&self) -> &str {
425 &self.loop_name
426 }
427}
428
429impl Step for RepeatStep {
430 fn id(&self) -> &str {
431 &self.id
432 }
433
434 fn name(&self) -> &str {
435 "RepeatStep"
436 }
437
438 fn labels(&self) -> &[String] {
439 &self.labels
440 }
441
442 fn add_label(&mut self, label: String) {
443 if !self.labels.contains(&label) {
444 self.labels.push(label);
445 }
446 }
447
448 fn requirements(&self) -> &[TraverserRequirement] {
449 static REQS: &[TraverserRequirement] =
450 &[TraverserRequirement::SingleLoop, TraverserRequirement::Path];
451 REQS
452 }
453
454 fn process_traverser(&self, mut traverser: Traverser) -> StepResult {
455 traverser.init_loop(&self.loop_name);
457
458 if let Some(times) = self.times {
460 if traverser.loop_count(&self.loop_name) >= times {
461 return StepResult::emit_one(traverser);
462 }
463 }
464
465 traverser.incr_loop(&self.loop_name);
475 StepResult::emit_one(traverser)
476 }
477
478 fn reset(&mut self) {
479 self.repeat_traversal.reset();
480 if let Some(ref mut t) = self.until_traversal {
481 t.reset();
482 }
483 if let Some(ref mut t) = self.emit_traversal {
484 t.reset();
485 }
486 }
487
488 fn clone_step(&self) -> Box<dyn Step> {
489 Box::new(self.clone())
490 }
491
492 fn as_any(&self) -> &dyn Any {
493 self
494 }
495
496 fn as_any_mut(&mut self) -> &mut dyn Any {
497 self
498 }
499}
500
501#[derive(Debug, Clone)]
503pub struct LocalStep {
504 id: String,
505 labels: Vec<String>,
506 traversal: BasicTraversal,
508}
509
510impl LocalStep {
511 pub fn new(traversal: BasicTraversal) -> Self {
513 Self {
514 id: "local_0".to_string(),
515 labels: Vec::new(),
516 traversal,
517 }
518 }
519}
520
521impl Step for LocalStep {
522 fn id(&self) -> &str {
523 &self.id
524 }
525
526 fn name(&self) -> &str {
527 "LocalStep"
528 }
529
530 fn labels(&self) -> &[String] {
531 &self.labels
532 }
533
534 fn add_label(&mut self, label: String) {
535 if !self.labels.contains(&label) {
536 self.labels.push(label);
537 }
538 }
539
540 fn requirements(&self) -> &[TraverserRequirement] {
541 &[]
542 }
543
544 fn process_traverser(&self, traverser: Traverser) -> StepResult {
545 StepResult::emit_one(traverser)
547 }
548
549 fn reset(&mut self) {
550 self.traversal.reset();
551 }
552
553 fn clone_step(&self) -> Box<dyn Step> {
554 Box::new(self.clone())
555 }
556
557 fn as_any(&self) -> &dyn Any {
558 self
559 }
560
561 fn as_any_mut(&mut self) -> &mut dyn Any {
562 self
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569
570 #[test]
571 fn test_choose_step() {
572 let step = ChooseStep::new();
573 assert_eq!(step.name(), "ChooseStep");
574 }
575
576 #[test]
577 fn test_union_step() {
578 let branch1 = BasicTraversal::new();
579 let branch2 = BasicTraversal::new();
580 let step = UnionStep::new(vec![branch1, branch2]);
581
582 assert_eq!(step.get_branches().len(), 2);
583
584 let traverser = Traverser::new("v1");
585 let result = step.process_traverser(traverser);
586 if let StepResult::Emit(traversers) = result {
587 assert_eq!(traversers.len(), 2);
588 }
589 }
590
591 #[test]
592 fn test_coalesce_step() {
593 let step = CoalesceStep::new(vec![BasicTraversal::new()]);
594 assert_eq!(step.name(), "CoalesceStep");
595
596 let traverser = Traverser::new("v1");
597 let result = step.process_traverser(traverser);
598 assert!(matches!(result, StepResult::Emit(_)));
599 }
600
601 #[test]
602 fn test_optional_step() {
603 let step = OptionalStep::new(BasicTraversal::new());
604 assert_eq!(step.name(), "OptionalStep");
605
606 let traverser = Traverser::new("v1");
607 let result = step.process_traverser(traverser);
608 assert!(matches!(result, StepResult::Emit(_)));
609 }
610
611 #[test]
612 fn test_repeat_step() {
613 let repeat_t = BasicTraversal::new();
614 let step = RepeatStep::new(repeat_t).times(3);
615
616 assert_eq!(step.name(), "RepeatStep");
617
618 let traverser = Traverser::new("v1");
619 let result = step.process_traverser(traverser);
620 if let StepResult::Emit(t) = result {
621 assert_eq!(t[0].loop_count(step.loop_name()), 1);
622 }
623 }
624
625 #[test]
626 fn test_repeat_with_until_first() {
627 let step = RepeatStep::new(BasicTraversal::new())
628 .until_first()
629 .times(5);
630
631 assert!(step.until_first);
632 }
633
634 #[test]
635 fn test_local_step() {
636 let step = LocalStep::new(BasicTraversal::new());
637 assert_eq!(step.name(), "LocalStep");
638 }
639}