reddb_server/storage/query/step/
sideeffect.rs1use super::{
14 BasicTraversal, Step, StepResult, Traversal, Traverser, TraverserRequirement, TraverserValue,
15};
16use crate::json;
17use crate::serde_json::Value;
18use std::any::Any;
19use std::collections::HashMap;
20use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
21
22fn sideeffect_read<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
23 lock.read().unwrap_or_else(|poisoned| poisoned.into_inner())
24}
25
26fn sideeffect_write<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
27 lock.write()
28 .unwrap_or_else(|poisoned| poisoned.into_inner())
29}
30
31pub trait SideEffectStep: Step {
33 fn side_effect(&self, traverser: &Traverser);
35}
36
37#[derive(Debug)]
39pub struct StoreStep {
40 id: String,
41 labels: Vec<String>,
42 key: String,
44 values: Arc<RwLock<Vec<Value>>>,
46}
47
48impl Clone for StoreStep {
49 fn clone(&self) -> Self {
50 Self {
51 id: self.id.clone(),
52 labels: self.labels.clone(),
53 key: self.key.clone(),
54 values: Arc::clone(&self.values),
55 }
56 }
57}
58
59impl StoreStep {
60 pub fn new(key: String) -> Self {
62 Self {
63 id: format!("store_{}", key),
64 labels: Vec::new(),
65 key,
66 values: Arc::new(RwLock::new(Vec::new())),
67 }
68 }
69
70 pub fn values(&self) -> Vec<Value> {
72 sideeffect_read(&self.values).clone()
73 }
74}
75
76impl Step for StoreStep {
77 fn id(&self) -> &str {
78 &self.id
79 }
80
81 fn name(&self) -> &str {
82 "StoreStep"
83 }
84
85 fn labels(&self) -> &[String] {
86 &self.labels
87 }
88
89 fn add_label(&mut self, label: String) {
90 if !self.labels.contains(&label) {
91 self.labels.push(label);
92 }
93 }
94
95 fn requirements(&self) -> &[TraverserRequirement] {
96 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Sack];
97 REQS
98 }
99
100 fn process_traverser(&self, traverser: Traverser) -> StepResult {
101 self.side_effect(&traverser);
102 StepResult::emit_one(traverser)
103 }
104
105 fn reset(&mut self) {
106 sideeffect_write(&self.values).clear();
107 }
108
109 fn clone_step(&self) -> Box<dyn Step> {
110 Box::new(self.clone())
111 }
112
113 fn as_any(&self) -> &dyn Any {
114 self
115 }
116
117 fn as_any_mut(&mut self) -> &mut dyn Any {
118 self
119 }
120}
121
122impl SideEffectStep for StoreStep {
123 fn side_effect(&self, traverser: &Traverser) {
124 let value = traverser.value().to_json();
125 sideeffect_write(&self.values).push(value);
126 }
127}
128
129#[derive(Debug)]
131pub struct AggregateStep {
132 id: String,
133 labels: Vec<String>,
134 key: String,
136 global: bool,
138 values: Arc<RwLock<Vec<Value>>>,
140}
141
142impl Clone for AggregateStep {
143 fn clone(&self) -> Self {
144 Self {
145 id: self.id.clone(),
146 labels: self.labels.clone(),
147 key: self.key.clone(),
148 global: self.global,
149 values: Arc::clone(&self.values),
150 }
151 }
152}
153
154impl AggregateStep {
155 pub fn global(key: String) -> Self {
157 Self {
158 id: format!("aggregate_global_{}", key),
159 labels: Vec::new(),
160 key,
161 global: true,
162 values: Arc::new(RwLock::new(Vec::new())),
163 }
164 }
165
166 pub fn local(key: String) -> Self {
168 Self {
169 id: format!("aggregate_local_{}", key),
170 labels: Vec::new(),
171 key,
172 global: false,
173 values: Arc::new(RwLock::new(Vec::new())),
174 }
175 }
176
177 pub fn values(&self) -> Vec<Value> {
179 sideeffect_read(&self.values).clone()
180 }
181}
182
183impl Step for AggregateStep {
184 fn id(&self) -> &str {
185 &self.id
186 }
187
188 fn name(&self) -> &str {
189 if self.global {
190 "AggregateGlobalStep"
191 } else {
192 "AggregateLocalStep"
193 }
194 }
195
196 fn labels(&self) -> &[String] {
197 &self.labels
198 }
199
200 fn add_label(&mut self, label: String) {
201 if !self.labels.contains(&label) {
202 self.labels.push(label);
203 }
204 }
205
206 fn requirements(&self) -> &[TraverserRequirement] {
207 if self.global {
208 static REQS: &[TraverserRequirement] =
209 &[TraverserRequirement::Barrier, TraverserRequirement::Sack];
210 REQS
211 } else {
212 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Sack];
213 REQS
214 }
215 }
216
217 fn process_traverser(&self, traverser: Traverser) -> StepResult {
218 self.side_effect(&traverser);
219 StepResult::emit_one(traverser)
220 }
221
222 fn reset(&mut self) {
223 sideeffect_write(&self.values).clear();
224 }
225
226 fn clone_step(&self) -> Box<dyn Step> {
227 Box::new(self.clone())
228 }
229
230 fn as_any(&self) -> &dyn Any {
231 self
232 }
233
234 fn as_any_mut(&mut self) -> &mut dyn Any {
235 self
236 }
237}
238
239impl SideEffectStep for AggregateStep {
240 fn side_effect(&self, traverser: &Traverser) {
241 let value = traverser.value().to_json();
242 sideeffect_write(&self.values).push(value);
243 }
244}
245
246#[derive(Debug, Clone)]
248pub struct PropertyStep {
249 id: String,
250 labels: Vec<String>,
251 key: String,
253 value: Option<Value>,
255 value_traversal: Option<BasicTraversal>,
257 cardinality: PropertyCardinality,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum PropertyCardinality {
264 Single,
266 List,
268 Set,
270}
271
272impl PropertyStep {
273 pub fn with_value(key: String, value: Value) -> Self {
275 Self {
276 id: format!("property_{}_{}", key, value),
277 labels: Vec::new(),
278 key,
279 value: Some(value),
280 value_traversal: None,
281 cardinality: PropertyCardinality::Single,
282 }
283 }
284
285 pub fn with_traversal(key: String, traversal: BasicTraversal) -> Self {
287 Self {
288 id: format!("property_{}_traversal", key),
289 labels: Vec::new(),
290 key,
291 value: None,
292 value_traversal: Some(traversal),
293 cardinality: PropertyCardinality::Single,
294 }
295 }
296
297 pub fn cardinality(mut self, cardinality: PropertyCardinality) -> Self {
299 self.cardinality = cardinality;
300 self
301 }
302
303 pub fn key(&self) -> &str {
305 &self.key
306 }
307}
308
309impl Step for PropertyStep {
310 fn id(&self) -> &str {
311 &self.id
312 }
313
314 fn name(&self) -> &str {
315 "PropertyStep"
316 }
317
318 fn labels(&self) -> &[String] {
319 &self.labels
320 }
321
322 fn add_label(&mut self, label: String) {
323 if !self.labels.contains(&label) {
324 self.labels.push(label);
325 }
326 }
327
328 fn requirements(&self) -> &[TraverserRequirement] {
329 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Mutates];
330 REQS
331 }
332
333 fn process_traverser(&self, traverser: Traverser) -> StepResult {
334 self.side_effect(&traverser);
337 StepResult::emit_one(traverser)
338 }
339
340 fn reset(&mut self) {
341 if let Some(ref mut t) = self.value_traversal {
342 t.reset();
343 }
344 }
345
346 fn clone_step(&self) -> Box<dyn Step> {
347 Box::new(self.clone())
348 }
349
350 fn as_any(&self) -> &dyn Any {
351 self
352 }
353
354 fn as_any_mut(&mut self) -> &mut dyn Any {
355 self
356 }
357}
358
359impl SideEffectStep for PropertyStep {
360 fn side_effect(&self, _traverser: &Traverser) {
361 }
363}
364
365#[derive(Debug, Clone)]
367pub struct SackStep {
368 id: String,
369 labels: Vec<String>,
370 operation: SackOperation,
372}
373
374#[derive(Debug, Clone)]
376pub enum SackOperation {
377 Set(Value),
379 Sum,
381 Mult,
383 Merge,
385}
386
387impl SackStep {
388 pub fn set(value: Value) -> Self {
390 Self {
391 id: "sack_set_0".to_string(),
392 labels: Vec::new(),
393 operation: SackOperation::Set(value),
394 }
395 }
396
397 pub fn sum() -> Self {
399 Self {
400 id: "sack_sum_0".to_string(),
401 labels: Vec::new(),
402 operation: SackOperation::Sum,
403 }
404 }
405
406 pub fn mult() -> Self {
408 Self {
409 id: "sack_mult_0".to_string(),
410 labels: Vec::new(),
411 operation: SackOperation::Mult,
412 }
413 }
414
415 pub fn merge() -> Self {
417 Self {
418 id: "sack_merge_0".to_string(),
419 labels: Vec::new(),
420 operation: SackOperation::Merge,
421 }
422 }
423}
424
425impl Step for SackStep {
426 fn id(&self) -> &str {
427 &self.id
428 }
429
430 fn name(&self) -> &str {
431 "SackStep"
432 }
433
434 fn labels(&self) -> &[String] {
435 &self.labels
436 }
437
438 fn add_label(&mut self, label: String) {
439 if !self.labels.contains(&label) {
440 self.labels.push(label);
441 }
442 }
443
444 fn requirements(&self) -> &[TraverserRequirement] {
445 static REQS: &[TraverserRequirement] = &[TraverserRequirement::Sack];
446 REQS
447 }
448
449 fn process_traverser(&self, mut traverser: Traverser) -> StepResult {
450 match &self.operation {
451 SackOperation::Set(value) => {
452 traverser.set_sack(value.clone());
453 }
454 SackOperation::Sum => {
455 if let (Some(sack), TraverserValue::Integer(i)) =
456 (traverser.sack(), traverser.value())
457 {
458 if let Some(s) = sack.as_i64() {
459 traverser.set_sack(json!(s + i));
460 }
461 }
462 }
463 SackOperation::Mult => {
464 if let (Some(sack), TraverserValue::Integer(i)) =
465 (traverser.sack(), traverser.value())
466 {
467 if let Some(s) = sack.as_i64() {
468 traverser.set_sack(json!(s * i));
469 }
470 }
471 }
472 SackOperation::Merge => {
473 }
475 }
476 StepResult::emit_one(traverser)
477 }
478
479 fn reset(&mut self) {}
480
481 fn clone_step(&self) -> Box<dyn Step> {
482 Box::new(self.clone())
483 }
484
485 fn as_any(&self) -> &dyn Any {
486 self
487 }
488
489 fn as_any_mut(&mut self) -> &mut dyn Any {
490 self
491 }
492}
493
494impl SideEffectStep for SackStep {
495 fn side_effect(&self, _traverser: &Traverser) {
496 }
498}
499
500#[derive(Debug)]
502pub struct ProfileStep {
503 id: String,
504 labels: Vec<String>,
505 key: Option<String>,
507 metrics: Arc<RwLock<ProfileMetrics>>,
509}
510
511#[derive(Debug, Clone, Default)]
513pub struct ProfileMetrics {
514 pub step_times: HashMap<String, u64>,
516 pub traverser_count: u64,
518 pub total_time: u64,
520}
521
522impl Clone for ProfileStep {
523 fn clone(&self) -> Self {
524 Self {
525 id: self.id.clone(),
526 labels: self.labels.clone(),
527 key: self.key.clone(),
528 metrics: Arc::clone(&self.metrics),
529 }
530 }
531}
532
533impl ProfileStep {
534 pub fn new() -> Self {
536 Self {
537 id: "profile_0".to_string(),
538 labels: Vec::new(),
539 key: None,
540 metrics: Arc::new(RwLock::new(ProfileMetrics::default())),
541 }
542 }
543
544 pub fn with_key(key: String) -> Self {
546 Self {
547 id: format!("profile_{}", key),
548 labels: Vec::new(),
549 key: Some(key),
550 metrics: Arc::new(RwLock::new(ProfileMetrics::default())),
551 }
552 }
553
554 pub fn metrics(&self) -> ProfileMetrics {
556 sideeffect_read(&self.metrics).clone()
557 }
558}
559
560impl Default for ProfileStep {
561 fn default() -> Self {
562 Self::new()
563 }
564}
565
566impl Step for ProfileStep {
567 fn id(&self) -> &str {
568 &self.id
569 }
570
571 fn name(&self) -> &str {
572 "ProfileStep"
573 }
574
575 fn labels(&self) -> &[String] {
576 &self.labels
577 }
578
579 fn add_label(&mut self, label: String) {
580 if !self.labels.contains(&label) {
581 self.labels.push(label);
582 }
583 }
584
585 fn requirements(&self) -> &[TraverserRequirement] {
586 &[]
587 }
588
589 fn process_traverser(&self, traverser: Traverser) -> StepResult {
590 self.side_effect(&traverser);
591 StepResult::emit_one(traverser)
592 }
593
594 fn reset(&mut self) {
595 *sideeffect_write(&self.metrics) = ProfileMetrics::default();
596 }
597
598 fn clone_step(&self) -> Box<dyn Step> {
599 Box::new(self.clone())
600 }
601
602 fn as_any(&self) -> &dyn Any {
603 self
604 }
605
606 fn as_any_mut(&mut self) -> &mut dyn Any {
607 self
608 }
609}
610
611impl SideEffectStep for ProfileStep {
612 fn side_effect(&self, _traverser: &Traverser) {
613 let mut metrics = sideeffect_write(&self.metrics);
614 metrics.traverser_count += 1;
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 #[test]
623 fn test_store_step() {
624 let step = StoreStep::new("x".to_string());
625
626 step.side_effect(&Traverser::new("v1"));
627 step.side_effect(&Traverser::new("v2"));
628
629 let values = step.values();
630 assert_eq!(values.len(), 2);
631 }
632
633 #[test]
634 fn test_aggregate_step() {
635 let step = AggregateStep::global("x".to_string());
636 assert_eq!(step.name(), "AggregateGlobalStep");
637
638 step.side_effect(&Traverser::new("v1"));
639 assert_eq!(step.values().len(), 1);
640 }
641
642 #[test]
643 fn test_aggregate_local() {
644 let step = AggregateStep::local("x".to_string());
645 assert_eq!(step.name(), "AggregateLocalStep");
646 }
647
648 #[test]
649 fn test_property_step() {
650 let step = PropertyStep::with_value("status".to_string(), json!("active"));
651 assert_eq!(step.key(), "status");
652 }
653
654 #[test]
655 fn test_property_cardinality() {
656 let step = PropertyStep::with_value("tags".to_string(), json!("new"))
657 .cardinality(PropertyCardinality::List);
658
659 assert!(matches!(step.cardinality, PropertyCardinality::List));
660 }
661
662 #[test]
663 fn test_sack_step_set() {
664 let step = SackStep::set(json!(0));
665
666 let traverser = Traverser::new("v1");
667 let result = step.process_traverser(traverser);
668
669 if let StepResult::Emit(t) = result {
670 assert_eq!(t[0].sack(), Some(&json!(0)));
671 }
672 }
673
674 #[test]
675 fn test_profile_step() {
676 let step = ProfileStep::new();
677
678 step.side_effect(&Traverser::new("v1"));
679 step.side_effect(&Traverser::new("v2"));
680
681 let metrics = step.metrics();
682 assert_eq!(metrics.traverser_count, 2);
683 }
684
685 #[test]
686 fn test_store_step_recovers_after_values_lock_poisoning() {
687 let step = StoreStep::new("x".to_string());
688 let poison_target = step.clone();
689 let _ = std::thread::spawn(move || {
690 let _guard = poison_target
691 .values
692 .write()
693 .expect("store values lock should be acquired");
694 panic!("poison store values lock");
695 })
696 .join();
697
698 step.side_effect(&Traverser::new("v1"));
699 let values = step.values();
700 assert_eq!(values.len(), 1);
701 assert_eq!(values[0]["id"], json!("v1"));
702 }
703
704 #[test]
705 fn test_profile_step_recovers_after_metrics_lock_poisoning() {
706 let step = ProfileStep::new();
707 let poison_target = step.clone();
708 let _ = std::thread::spawn(move || {
709 let _guard = poison_target
710 .metrics
711 .write()
712 .expect("profile metrics lock should be acquired");
713 panic!("poison profile metrics lock");
714 })
715 .join();
716
717 step.side_effect(&Traverser::new("v1"));
718 assert_eq!(step.metrics().traverser_count, 1);
719 }
720}