reddb_server/storage/query/step/
map.rs1use super::{Path, Step, StepResult, Traverser, TraverserRequirement, TraverserValue};
15use crate::json;
16use crate::serde_json::Value;
17use std::any::Any;
18use std::collections::HashMap;
19
20pub trait MapStep: Step {
22 fn map(&self, traverser: &Traverser) -> TraverserValue;
24}
25
26#[derive(Debug, Clone)]
28pub struct SelectStep {
29 id: String,
30 labels: Vec<String>,
31 select_labels: Vec<String>,
33 pop: Pop,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum Pop {
40 First,
42 Last,
44 All,
46 Mixed,
48}
49
50impl SelectStep {
51 pub fn new(select_labels: Vec<String>) -> Self {
53 Self {
54 id: format!("select_{}", select_labels.join("_")),
55 labels: Vec::new(),
56 select_labels,
57 pop: Pop::Last,
58 }
59 }
60
61 pub fn with_pop(select_labels: Vec<String>, pop: Pop) -> Self {
63 Self {
64 id: format!("select_{:?}_{}", pop, select_labels.join("_")),
65 labels: Vec::new(),
66 select_labels,
67 pop,
68 }
69 }
70
71 pub fn select_labels(&self) -> &[String] {
73 &self.select_labels
74 }
75}
76
77impl Step for SelectStep {
78 fn id(&self) -> &str {
79 &self.id
80 }
81
82 fn name(&self) -> &str {
83 "SelectStep"
84 }
85
86 fn labels(&self) -> &[String] {
87 &self.labels
88 }
89
90 fn add_label(&mut self, label: String) {
91 if !self.labels.contains(&label) {
92 self.labels.push(label);
93 }
94 }
95
96 fn requirements(&self) -> &[TraverserRequirement] {
97 static REQS: &[TraverserRequirement] =
98 &[TraverserRequirement::Path, TraverserRequirement::Labels];
99 REQS
100 }
101
102 fn process_traverser(&self, traverser: Traverser) -> StepResult {
103 let new_value = self.map(&traverser);
104 if new_value.is_null() {
105 StepResult::Filter
106 } else {
107 StepResult::emit_one(traverser.clone_with_value(new_value))
108 }
109 }
110
111 fn reset(&mut self) {}
112
113 fn clone_step(&self) -> Box<dyn Step> {
114 Box::new(self.clone())
115 }
116
117 fn as_any(&self) -> &dyn Any {
118 self
119 }
120
121 fn as_any_mut(&mut self) -> &mut dyn Any {
122 self
123 }
124}
125
126impl MapStep for SelectStep {
127 fn map(&self, traverser: &Traverser) -> TraverserValue {
128 if let Some(path) = traverser.path() {
129 if self.select_labels.len() == 1 {
130 let label = &self.select_labels[0];
132 match self.pop {
133 Pop::Last => path.get(label).cloned().unwrap_or(TraverserValue::Null),
134 Pop::First => {
135 let all = path.get_all(label);
136 all.first()
137 .cloned()
138 .cloned()
139 .unwrap_or(TraverserValue::Null)
140 }
141 Pop::All => {
142 let all: Vec<Value> = path
143 .get_all(label)
144 .into_iter()
145 .map(|v| v.to_json())
146 .collect();
147 TraverserValue::List(all)
148 }
149 Pop::Mixed => path.get(label).cloned().unwrap_or(TraverserValue::Null),
150 }
151 } else {
152 let mut map = HashMap::new();
154 for label in &self.select_labels {
155 if let Some(value) = path.get(label) {
156 map.insert(label.clone(), value.to_json());
157 }
158 }
159 TraverserValue::Map(map)
160 }
161 } else {
162 TraverserValue::Null
163 }
164 }
165}
166
167#[derive(Debug, Clone)]
169pub struct ProjectStep {
170 id: String,
171 labels: Vec<String>,
172 keys: Vec<String>,
174 }
176
177impl ProjectStep {
178 pub fn new(keys: Vec<String>) -> Self {
180 Self {
181 id: format!("project_{}", keys.join("_")),
182 labels: Vec::new(),
183 keys,
184 }
185 }
186
187 pub fn keys(&self) -> &[String] {
189 &self.keys
190 }
191}
192
193impl Step for ProjectStep {
194 fn id(&self) -> &str {
195 &self.id
196 }
197
198 fn name(&self) -> &str {
199 "ProjectStep"
200 }
201
202 fn labels(&self) -> &[String] {
203 &self.labels
204 }
205
206 fn add_label(&mut self, label: String) {
207 if !self.labels.contains(&label) {
208 self.labels.push(label);
209 }
210 }
211
212 fn requirements(&self) -> &[TraverserRequirement] {
213 &[]
214 }
215
216 fn process_traverser(&self, traverser: Traverser) -> StepResult {
217 let new_value = self.map(&traverser);
218 StepResult::emit_one(traverser.clone_with_value(new_value))
219 }
220
221 fn reset(&mut self) {}
222
223 fn clone_step(&self) -> Box<dyn Step> {
224 Box::new(self.clone())
225 }
226
227 fn as_any(&self) -> &dyn Any {
228 self
229 }
230
231 fn as_any_mut(&mut self) -> &mut dyn Any {
232 self
233 }
234}
235
236impl MapStep for ProjectStep {
237 fn map(&self, traverser: &Traverser) -> TraverserValue {
238 let mut result = HashMap::new();
239
240 if let TraverserValue::Map(current) = traverser.value() {
243 for key in &self.keys {
244 if let Some(value) = current.get(key) {
245 result.insert(key.clone(), value.clone());
246 } else {
247 result.insert(key.clone(), Value::Null);
248 }
249 }
250 }
251
252 TraverserValue::Map(result)
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct PathStep {
259 id: String,
260 labels: Vec<String>,
261}
262
263impl PathStep {
264 pub fn new() -> Self {
266 Self {
267 id: "path_0".to_string(),
268 labels: Vec::new(),
269 }
270 }
271}
272
273impl Default for PathStep {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279impl Step for PathStep {
280 fn id(&self) -> &str {
281 &self.id
282 }
283
284 fn name(&self) -> &str {
285 "PathStep"
286 }
287
288 fn labels(&self) -> &[String] {
289 &self.labels
290 }
291
292 fn add_label(&mut self, label: String) {
293 if !self.labels.contains(&label) {
294 self.labels.push(label);
295 }
296 }
297
298 fn requirements(&self) -> &[TraverserRequirement] {
299 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Path];
300 REQS
301 }
302
303 fn process_traverser(&self, traverser: Traverser) -> StepResult {
304 let new_value = self.map(&traverser);
305 StepResult::emit_one(traverser.clone_with_value(new_value))
306 }
307
308 fn reset(&mut self) {}
309
310 fn clone_step(&self) -> Box<dyn Step> {
311 Box::new(self.clone())
312 }
313
314 fn as_any(&self) -> &dyn Any {
315 self
316 }
317
318 fn as_any_mut(&mut self) -> &mut dyn Any {
319 self
320 }
321}
322
323impl MapStep for PathStep {
324 fn map(&self, traverser: &Traverser) -> TraverserValue {
325 if let Some(path) = traverser.path() {
326 TraverserValue::Path(path.clone())
327 } else {
328 TraverserValue::Path(Path::new())
329 }
330 }
331}
332
333#[derive(Debug, Clone)]
335pub struct ValueMapStep {
336 id: String,
337 labels: Vec<String>,
338 keys: Vec<String>,
340 with_tokens: bool,
342}
343
344impl ValueMapStep {
345 pub fn new() -> Self {
347 Self {
348 id: "valueMap_0".to_string(),
349 labels: Vec::new(),
350 keys: Vec::new(),
351 with_tokens: false,
352 }
353 }
354
355 pub fn with_keys(keys: Vec<String>) -> Self {
357 Self {
358 id: format!("valueMap_{}", keys.join("_")),
359 labels: Vec::new(),
360 keys,
361 with_tokens: false,
362 }
363 }
364
365 pub fn with_tokens(mut self) -> Self {
367 self.with_tokens = true;
368 self
369 }
370}
371
372impl Default for ValueMapStep {
373 fn default() -> Self {
374 Self::new()
375 }
376}
377
378impl Step for ValueMapStep {
379 fn id(&self) -> &str {
380 &self.id
381 }
382
383 fn name(&self) -> &str {
384 "ValueMapStep"
385 }
386
387 fn labels(&self) -> &[String] {
388 &self.labels
389 }
390
391 fn add_label(&mut self, label: String) {
392 if !self.labels.contains(&label) {
393 self.labels.push(label);
394 }
395 }
396
397 fn requirements(&self) -> &[TraverserRequirement] {
398 &[]
399 }
400
401 fn process_traverser(&self, traverser: Traverser) -> StepResult {
402 let new_value = self.map(&traverser);
403 StepResult::emit_one(traverser.clone_with_value(new_value))
404 }
405
406 fn reset(&mut self) {}
407
408 fn clone_step(&self) -> Box<dyn Step> {
409 Box::new(self.clone())
410 }
411
412 fn as_any(&self) -> &dyn Any {
413 self
414 }
415
416 fn as_any_mut(&mut self) -> &mut dyn Any {
417 self
418 }
419}
420
421impl MapStep for ValueMapStep {
422 fn map(&self, traverser: &Traverser) -> TraverserValue {
423 let mut result = HashMap::new();
424
425 match traverser.value() {
426 TraverserValue::Vertex(id) if self.with_tokens => {
427 result.insert("@id".to_string(), json!(id));
428 result.insert("@type".to_string(), json!("vertex"));
429 }
430 TraverserValue::Edge {
431 id,
432 source,
433 target,
434 label,
435 } if self.with_tokens => {
436 result.insert("@id".to_string(), json!(id));
437 result.insert("@type".to_string(), json!("edge"));
438 result.insert("@label".to_string(), json!(label));
439 result.insert("@source".to_string(), json!(source));
440 result.insert("@target".to_string(), json!(target));
441 }
442 TraverserValue::Map(map) => {
443 for (key, value) in map {
444 if self.keys.is_empty() || self.keys.contains(key) {
445 result.insert(key.clone(), value.clone());
446 }
447 }
448 }
449 _ => {}
450 }
451
452 TraverserValue::Map(result)
453 }
454}
455
456#[derive(Debug, Clone)]
458pub struct IdStep {
459 id: String,
460 labels: Vec<String>,
461}
462
463impl IdStep {
464 pub fn new() -> Self {
466 Self {
467 id: "id_0".to_string(),
468 labels: Vec::new(),
469 }
470 }
471}
472
473impl Default for IdStep {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479impl Step for IdStep {
480 fn id(&self) -> &str {
481 &self.id
482 }
483
484 fn name(&self) -> &str {
485 "IdStep"
486 }
487
488 fn labels(&self) -> &[String] {
489 &self.labels
490 }
491
492 fn add_label(&mut self, label: String) {
493 if !self.labels.contains(&label) {
494 self.labels.push(label);
495 }
496 }
497
498 fn requirements(&self) -> &[TraverserRequirement] {
499 &[]
500 }
501
502 fn process_traverser(&self, traverser: Traverser) -> StepResult {
503 let new_value = self.map(&traverser);
504 StepResult::emit_one(traverser.clone_with_value(new_value))
505 }
506
507 fn reset(&mut self) {}
508
509 fn clone_step(&self) -> Box<dyn Step> {
510 Box::new(self.clone())
511 }
512
513 fn as_any(&self) -> &dyn Any {
514 self
515 }
516
517 fn as_any_mut(&mut self) -> &mut dyn Any {
518 self
519 }
520}
521
522impl MapStep for IdStep {
523 fn map(&self, traverser: &Traverser) -> TraverserValue {
524 match traverser.value() {
525 TraverserValue::Vertex(id) => TraverserValue::String(id.clone()),
526 TraverserValue::Edge { id, .. } => TraverserValue::String(id.clone()),
527 _ => TraverserValue::Null,
528 }
529 }
530}
531
532#[derive(Debug, Clone)]
534pub struct LabelStep {
535 id: String,
536 labels: Vec<String>,
537}
538
539impl LabelStep {
540 pub fn new() -> Self {
542 Self {
543 id: "label_0".to_string(),
544 labels: Vec::new(),
545 }
546 }
547}
548
549impl Default for LabelStep {
550 fn default() -> Self {
551 Self::new()
552 }
553}
554
555impl Step for LabelStep {
556 fn id(&self) -> &str {
557 &self.id
558 }
559
560 fn name(&self) -> &str {
561 "LabelStep"
562 }
563
564 fn labels(&self) -> &[String] {
565 &self.labels
566 }
567
568 fn add_label(&mut self, label: String) {
569 if !self.labels.contains(&label) {
570 self.labels.push(label);
571 }
572 }
573
574 fn requirements(&self) -> &[TraverserRequirement] {
575 &[]
576 }
577
578 fn process_traverser(&self, traverser: Traverser) -> StepResult {
579 let new_value = self.map(&traverser);
580 StepResult::emit_one(traverser.clone_with_value(new_value))
581 }
582
583 fn reset(&mut self) {}
584
585 fn clone_step(&self) -> Box<dyn Step> {
586 Box::new(self.clone())
587 }
588
589 fn as_any(&self) -> &dyn Any {
590 self
591 }
592
593 fn as_any_mut(&mut self) -> &mut dyn Any {
594 self
595 }
596}
597
598impl MapStep for LabelStep {
599 fn map(&self, traverser: &Traverser) -> TraverserValue {
600 match traverser.value() {
601 TraverserValue::Edge { label, .. } => TraverserValue::String(label.clone()),
602 _ => TraverserValue::Null,
604 }
605 }
606}
607
608#[derive(Debug, Clone)]
610pub struct CountStep {
611 id: String,
612 labels: Vec<String>,
613}
614
615impl CountStep {
616 pub fn new() -> Self {
618 Self {
619 id: "count_0".to_string(),
620 labels: Vec::new(),
621 }
622 }
623}
624
625impl Default for CountStep {
626 fn default() -> Self {
627 Self::new()
628 }
629}
630
631impl Step for CountStep {
632 fn id(&self) -> &str {
633 &self.id
634 }
635
636 fn name(&self) -> &str {
637 "CountStep"
638 }
639
640 fn labels(&self) -> &[String] {
641 &self.labels
642 }
643
644 fn add_label(&mut self, label: String) {
645 if !self.labels.contains(&label) {
646 self.labels.push(label);
647 }
648 }
649
650 fn requirements(&self) -> &[TraverserRequirement] {
651 static REQS: &[TraverserRequirement] =
652 &[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
653 REQS
654 }
655
656 fn process_traverser(&self, traverser: Traverser) -> StepResult {
657 let count = traverser.bulk();
660 StepResult::emit_one(traverser.clone_with_value(TraverserValue::Integer(count as i64)))
661 }
662
663 fn reset(&mut self) {}
664
665 fn clone_step(&self) -> Box<dyn Step> {
666 Box::new(self.clone())
667 }
668
669 fn as_any(&self) -> &dyn Any {
670 self
671 }
672
673 fn as_any_mut(&mut self) -> &mut dyn Any {
674 self
675 }
676}
677
678impl MapStep for CountStep {
679 fn map(&self, traverser: &Traverser) -> TraverserValue {
680 TraverserValue::Integer(traverser.bulk() as i64)
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687
688 #[test]
689 fn test_select_single() {
690 let step = SelectStep::new(vec!["a".to_string()]);
691 assert_eq!(step.select_labels(), &["a"]);
692 assert_eq!(step.name(), "SelectStep");
693 }
694
695 #[test]
696 fn test_select_multiple() {
697 let step = SelectStep::new(vec!["a".to_string(), "b".to_string()]);
698 assert_eq!(step.select_labels().len(), 2);
699 }
700
701 #[test]
702 fn test_project_step() {
703 let step = ProjectStep::new(vec!["name".to_string(), "age".to_string()]);
704 assert_eq!(step.keys().len(), 2);
705 }
706
707 #[test]
708 fn test_path_step() {
709 let step = PathStep::new();
710 assert_eq!(step.name(), "PathStep");
711
712 let mut traverser = Traverser::new("v1");
713 traverser.enable_path();
714
715 let result = step.map(&traverser);
716 assert!(matches!(result, TraverserValue::Path(_)));
717 }
718
719 #[test]
720 fn test_value_map_step() {
721 let step = ValueMapStep::new().with_tokens();
722 assert_eq!(step.name(), "ValueMapStep");
723
724 let traverser = Traverser::new("v1");
725 let result = step.map(&traverser);
726
727 if let TraverserValue::Map(map) = result {
728 assert!(map.contains_key("@id"));
729 assert!(map.contains_key("@type"));
730 }
731 }
732
733 #[test]
734 fn test_id_step() {
735 let step = IdStep::new();
736
737 let traverser = Traverser::new("vertex123");
738 let result = step.map(&traverser);
739 assert!(matches!(result, TraverserValue::String(id) if id == "vertex123"));
740 }
741
742 #[test]
743 fn test_label_step() {
744 let step = LabelStep::new();
745
746 let edge = TraverserValue::Edge {
747 id: "e1".to_string(),
748 source: "v1".to_string(),
749 target: "v2".to_string(),
750 label: "knows".to_string(),
751 };
752 let traverser = Traverser::with_value(edge);
753 let result = step.map(&traverser);
754 assert!(matches!(result, TraverserValue::String(l) if l == "knows"));
755 }
756
757 #[test]
758 fn test_count_step() {
759 let step = CountStep::new();
760
761 let mut traverser = Traverser::new("v1");
762 traverser.set_bulk(5);
763
764 let result = step.map(&traverser);
765 assert!(matches!(result, TraverserValue::Integer(5)));
766 }
767}