Skip to main content

reddb_server/storage/query/step/
sideeffect.rs

1//! Side Effect Steps
2//!
3//! Steps that perform side effects while passing through traversers.
4//!
5//! # Steps
6//!
7//! - `store()`: Store values in side-effect
8//! - `aggregate()`: Aggregate values globally
9//! - `property()`: Set property on element
10//! - `sack()`: Manipulate traverser sack
11//! - `profile()`: Gather profiling metrics
12
13use super::{
14    BasicTraversal, Step, StepResult, Traversal, Traverser, TraverserRequirement, TraverserValue,
15};
16use crate::json;
17use crate::serde_json::Value;
18use std::any::Any;
19use std::collections::HashMap;
20use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
21
22fn sideeffect_read<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
23    lock.read().unwrap_or_else(|poisoned| poisoned.into_inner())
24}
25
26fn sideeffect_write<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
27    lock.write()
28        .unwrap_or_else(|poisoned| poisoned.into_inner())
29}
30
31/// Trait for side effect steps
32pub trait SideEffectStep: Step {
33    /// Execute side effect
34    fn side_effect(&self, traverser: &Traverser);
35}
36
37/// Store step - stores values in side-effect key
38#[derive(Debug)]
39pub struct StoreStep {
40    id: String,
41    labels: Vec<String>,
42    /// Side-effect key
43    key: String,
44    /// Stored values
45    values: Arc<RwLock<Vec<Value>>>,
46}
47
48impl Clone for StoreStep {
49    fn clone(&self) -> Self {
50        Self {
51            id: self.id.clone(),
52            labels: self.labels.clone(),
53            key: self.key.clone(),
54            values: Arc::clone(&self.values),
55        }
56    }
57}
58
59impl StoreStep {
60    /// Create store() step
61    pub fn new(key: String) -> Self {
62        Self {
63            id: format!("store_{}", key),
64            labels: Vec::new(),
65            key,
66            values: Arc::new(RwLock::new(Vec::new())),
67        }
68    }
69
70    /// Get stored values
71    pub fn values(&self) -> Vec<Value> {
72        sideeffect_read(&self.values).clone()
73    }
74}
75
76impl Step for StoreStep {
77    fn id(&self) -> &str {
78        &self.id
79    }
80
81    fn name(&self) -> &str {
82        "StoreStep"
83    }
84
85    fn labels(&self) -> &[String] {
86        &self.labels
87    }
88
89    fn add_label(&mut self, label: String) {
90        if !self.labels.contains(&label) {
91            self.labels.push(label);
92        }
93    }
94
95    fn requirements(&self) -> &[TraverserRequirement] {
96        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Sack];
97        REQS
98    }
99
100    fn process_traverser(&self, traverser: Traverser) -> StepResult {
101        self.side_effect(&traverser);
102        StepResult::emit_one(traverser)
103    }
104
105    fn reset(&mut self) {
106        sideeffect_write(&self.values).clear();
107    }
108
109    fn clone_step(&self) -> Box<dyn Step> {
110        Box::new(self.clone())
111    }
112
113    fn as_any(&self) -> &dyn Any {
114        self
115    }
116
117    fn as_any_mut(&mut self) -> &mut dyn Any {
118        self
119    }
120}
121
122impl SideEffectStep for StoreStep {
123    fn side_effect(&self, traverser: &Traverser) {
124        let value = traverser.value().to_json();
125        sideeffect_write(&self.values).push(value);
126    }
127}
128
129/// Aggregate step - aggregates values globally (barrier-like)
130#[derive(Debug)]
131pub struct AggregateStep {
132    id: String,
133    labels: Vec<String>,
134    /// Side-effect key
135    key: String,
136    /// Scope: local (per-traverser) or global
137    global: bool,
138    /// Aggregated values
139    values: Arc<RwLock<Vec<Value>>>,
140}
141
142impl Clone for AggregateStep {
143    fn clone(&self) -> Self {
144        Self {
145            id: self.id.clone(),
146            labels: self.labels.clone(),
147            key: self.key.clone(),
148            global: self.global,
149            values: Arc::clone(&self.values),
150        }
151    }
152}
153
154impl AggregateStep {
155    /// Create aggregate() step (global)
156    pub fn global(key: String) -> Self {
157        Self {
158            id: format!("aggregate_global_{}", key),
159            labels: Vec::new(),
160            key,
161            global: true,
162            values: Arc::new(RwLock::new(Vec::new())),
163        }
164    }
165
166    /// Create aggregate() step (local)
167    pub fn local(key: String) -> Self {
168        Self {
169            id: format!("aggregate_local_{}", key),
170            labels: Vec::new(),
171            key,
172            global: false,
173            values: Arc::new(RwLock::new(Vec::new())),
174        }
175    }
176
177    /// Get aggregated values
178    pub fn values(&self) -> Vec<Value> {
179        sideeffect_read(&self.values).clone()
180    }
181}
182
183impl Step for AggregateStep {
184    fn id(&self) -> &str {
185        &self.id
186    }
187
188    fn name(&self) -> &str {
189        if self.global {
190            "AggregateGlobalStep"
191        } else {
192            "AggregateLocalStep"
193        }
194    }
195
196    fn labels(&self) -> &[String] {
197        &self.labels
198    }
199
200    fn add_label(&mut self, label: String) {
201        if !self.labels.contains(&label) {
202            self.labels.push(label);
203        }
204    }
205
206    fn requirements(&self) -> &[TraverserRequirement] {
207        if self.global {
208            static REQS: &[TraverserRequirement] =
209                &[TraverserRequirement::Barrier, TraverserRequirement::Sack];
210            REQS
211        } else {
212            static REQS: &[TraverserRequirement] = &[TraverserRequirement::Sack];
213            REQS
214        }
215    }
216
217    fn process_traverser(&self, traverser: Traverser) -> StepResult {
218        self.side_effect(&traverser);
219        StepResult::emit_one(traverser)
220    }
221
222    fn reset(&mut self) {
223        sideeffect_write(&self.values).clear();
224    }
225
226    fn clone_step(&self) -> Box<dyn Step> {
227        Box::new(self.clone())
228    }
229
230    fn as_any(&self) -> &dyn Any {
231        self
232    }
233
234    fn as_any_mut(&mut self) -> &mut dyn Any {
235        self
236    }
237}
238
239impl SideEffectStep for AggregateStep {
240    fn side_effect(&self, traverser: &Traverser) {
241        let value = traverser.value().to_json();
242        sideeffect_write(&self.values).push(value);
243    }
244}
245
246/// Property step - sets property on element
247#[derive(Debug, Clone)]
248pub struct PropertyStep {
249    id: String,
250    labels: Vec<String>,
251    /// Property key
252    key: String,
253    /// Property value (or child traversal for value)
254    value: Option<Value>,
255    /// Value traversal
256    value_traversal: Option<BasicTraversal>,
257    /// Cardinality (single, list, set)
258    cardinality: PropertyCardinality,
259}
260
261/// Property cardinality
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum PropertyCardinality {
264    /// Single value (replaces)
265    Single,
266    /// List (appends)
267    List,
268    /// Set (unique)
269    Set,
270}
271
272impl PropertyStep {
273    /// Create property() step with value
274    pub fn with_value(key: String, value: Value) -> Self {
275        Self {
276            id: format!("property_{}_{}", key, value),
277            labels: Vec::new(),
278            key,
279            value: Some(value),
280            value_traversal: None,
281            cardinality: PropertyCardinality::Single,
282        }
283    }
284
285    /// Create property() step with traversal
286    pub fn with_traversal(key: String, traversal: BasicTraversal) -> Self {
287        Self {
288            id: format!("property_{}_traversal", key),
289            labels: Vec::new(),
290            key,
291            value: None,
292            value_traversal: Some(traversal),
293            cardinality: PropertyCardinality::Single,
294        }
295    }
296
297    /// Set cardinality
298    pub fn cardinality(mut self, cardinality: PropertyCardinality) -> Self {
299        self.cardinality = cardinality;
300        self
301    }
302
303    /// Get property key
304    pub fn key(&self) -> &str {
305        &self.key
306    }
307}
308
309impl Step for PropertyStep {
310    fn id(&self) -> &str {
311        &self.id
312    }
313
314    fn name(&self) -> &str {
315        "PropertyStep"
316    }
317
318    fn labels(&self) -> &[String] {
319        &self.labels
320    }
321
322    fn add_label(&mut self, label: String) {
323        if !self.labels.contains(&label) {
324            self.labels.push(label);
325        }
326    }
327
328    fn requirements(&self) -> &[TraverserRequirement] {
329        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Mutates];
330        REQS
331    }
332
333    fn process_traverser(&self, traverser: Traverser) -> StepResult {
334        // In real impl, would modify the element in the graph
335        // For now, just pass through
336        self.side_effect(&traverser);
337        StepResult::emit_one(traverser)
338    }
339
340    fn reset(&mut self) {
341        if let Some(ref mut t) = self.value_traversal {
342            t.reset();
343        }
344    }
345
346    fn clone_step(&self) -> Box<dyn Step> {
347        Box::new(self.clone())
348    }
349
350    fn as_any(&self) -> &dyn Any {
351        self
352    }
353
354    fn as_any_mut(&mut self) -> &mut dyn Any {
355        self
356    }
357}
358
359impl SideEffectStep for PropertyStep {
360    fn side_effect(&self, _traverser: &Traverser) {
361        // Would mutate the graph element
362    }
363}
364
365/// Sack step - manipulates traverser sack
366#[derive(Debug, Clone)]
367pub struct SackStep {
368    id: String,
369    labels: Vec<String>,
370    /// Sack operation
371    operation: SackOperation,
372}
373
374/// Sack operation types
375#[derive(Debug, Clone)]
376pub enum SackOperation {
377    /// Set sack value
378    Set(Value),
379    /// Sum into sack
380    Sum,
381    /// Multiply into sack
382    Mult,
383    /// Merge map/list
384    Merge,
385}
386
387impl SackStep {
388    /// Create sack() step to set value
389    pub fn set(value: Value) -> Self {
390        Self {
391            id: "sack_set_0".to_string(),
392            labels: Vec::new(),
393            operation: SackOperation::Set(value),
394        }
395    }
396
397    /// Create sack() step for sum
398    pub fn sum() -> Self {
399        Self {
400            id: "sack_sum_0".to_string(),
401            labels: Vec::new(),
402            operation: SackOperation::Sum,
403        }
404    }
405
406    /// Create sack() step for multiply
407    pub fn mult() -> Self {
408        Self {
409            id: "sack_mult_0".to_string(),
410            labels: Vec::new(),
411            operation: SackOperation::Mult,
412        }
413    }
414
415    /// Create sack() step for merge
416    pub fn merge() -> Self {
417        Self {
418            id: "sack_merge_0".to_string(),
419            labels: Vec::new(),
420            operation: SackOperation::Merge,
421        }
422    }
423}
424
425impl Step for SackStep {
426    fn id(&self) -> &str {
427        &self.id
428    }
429
430    fn name(&self) -> &str {
431        "SackStep"
432    }
433
434    fn labels(&self) -> &[String] {
435        &self.labels
436    }
437
438    fn add_label(&mut self, label: String) {
439        if !self.labels.contains(&label) {
440            self.labels.push(label);
441        }
442    }
443
444    fn requirements(&self) -> &[TraverserRequirement] {
445        static REQS: &[TraverserRequirement] = &[TraverserRequirement::Sack];
446        REQS
447    }
448
449    fn process_traverser(&self, mut traverser: Traverser) -> StepResult {
450        match &self.operation {
451            SackOperation::Set(value) => {
452                traverser.set_sack(value.clone());
453            }
454            SackOperation::Sum => {
455                if let (Some(sack), TraverserValue::Integer(i)) =
456                    (traverser.sack(), traverser.value())
457                {
458                    if let Some(s) = sack.as_i64() {
459                        traverser.set_sack(json!(s + i));
460                    }
461                }
462            }
463            SackOperation::Mult => {
464                if let (Some(sack), TraverserValue::Integer(i)) =
465                    (traverser.sack(), traverser.value())
466                {
467                    if let Some(s) = sack.as_i64() {
468                        traverser.set_sack(json!(s * i));
469                    }
470                }
471            }
472            SackOperation::Merge => {
473                // Would merge maps/lists
474            }
475        }
476        StepResult::emit_one(traverser)
477    }
478
479    fn reset(&mut self) {}
480
481    fn clone_step(&self) -> Box<dyn Step> {
482        Box::new(self.clone())
483    }
484
485    fn as_any(&self) -> &dyn Any {
486        self
487    }
488
489    fn as_any_mut(&mut self) -> &mut dyn Any {
490        self
491    }
492}
493
494impl SideEffectStep for SackStep {
495    fn side_effect(&self, _traverser: &Traverser) {
496        // Side effect is handled in process_traverser
497    }
498}
499
500/// Profile step - gathers execution metrics
501#[derive(Debug)]
502pub struct ProfileStep {
503    id: String,
504    labels: Vec<String>,
505    /// Profile key
506    key: Option<String>,
507    /// Metrics storage
508    metrics: Arc<RwLock<ProfileMetrics>>,
509}
510
511/// Profile metrics
512#[derive(Debug, Clone, Default)]
513pub struct ProfileMetrics {
514    /// Step timings (step_id -> duration_ns)
515    pub step_times: HashMap<String, u64>,
516    /// Traverser counts
517    pub traverser_count: u64,
518    /// Total execution time
519    pub total_time: u64,
520}
521
522impl Clone for ProfileStep {
523    fn clone(&self) -> Self {
524        Self {
525            id: self.id.clone(),
526            labels: self.labels.clone(),
527            key: self.key.clone(),
528            metrics: Arc::clone(&self.metrics),
529        }
530    }
531}
532
533impl ProfileStep {
534    /// Create profile() step
535    pub fn new() -> Self {
536        Self {
537            id: "profile_0".to_string(),
538            labels: Vec::new(),
539            key: None,
540            metrics: Arc::new(RwLock::new(ProfileMetrics::default())),
541        }
542    }
543
544    /// Create profile() step with key
545    pub fn with_key(key: String) -> Self {
546        Self {
547            id: format!("profile_{}", key),
548            labels: Vec::new(),
549            key: Some(key),
550            metrics: Arc::new(RwLock::new(ProfileMetrics::default())),
551        }
552    }
553
554    /// Get metrics
555    pub fn metrics(&self) -> ProfileMetrics {
556        sideeffect_read(&self.metrics).clone()
557    }
558}
559
560impl Default for ProfileStep {
561    fn default() -> Self {
562        Self::new()
563    }
564}
565
566impl Step for ProfileStep {
567    fn id(&self) -> &str {
568        &self.id
569    }
570
571    fn name(&self) -> &str {
572        "ProfileStep"
573    }
574
575    fn labels(&self) -> &[String] {
576        &self.labels
577    }
578
579    fn add_label(&mut self, label: String) {
580        if !self.labels.contains(&label) {
581            self.labels.push(label);
582        }
583    }
584
585    fn requirements(&self) -> &[TraverserRequirement] {
586        &[]
587    }
588
589    fn process_traverser(&self, traverser: Traverser) -> StepResult {
590        self.side_effect(&traverser);
591        StepResult::emit_one(traverser)
592    }
593
594    fn reset(&mut self) {
595        *sideeffect_write(&self.metrics) = ProfileMetrics::default();
596    }
597
598    fn clone_step(&self) -> Box<dyn Step> {
599        Box::new(self.clone())
600    }
601
602    fn as_any(&self) -> &dyn Any {
603        self
604    }
605
606    fn as_any_mut(&mut self) -> &mut dyn Any {
607        self
608    }
609}
610
611impl SideEffectStep for ProfileStep {
612    fn side_effect(&self, _traverser: &Traverser) {
613        let mut metrics = sideeffect_write(&self.metrics);
614        metrics.traverser_count += 1;
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    #[test]
623    fn test_store_step() {
624        let step = StoreStep::new("x".to_string());
625
626        step.side_effect(&Traverser::new("v1"));
627        step.side_effect(&Traverser::new("v2"));
628
629        let values = step.values();
630        assert_eq!(values.len(), 2);
631    }
632
633    #[test]
634    fn test_aggregate_step() {
635        let step = AggregateStep::global("x".to_string());
636        assert_eq!(step.name(), "AggregateGlobalStep");
637
638        step.side_effect(&Traverser::new("v1"));
639        assert_eq!(step.values().len(), 1);
640    }
641
642    #[test]
643    fn test_aggregate_local() {
644        let step = AggregateStep::local("x".to_string());
645        assert_eq!(step.name(), "AggregateLocalStep");
646    }
647
648    #[test]
649    fn test_property_step() {
650        let step = PropertyStep::with_value("status".to_string(), json!("active"));
651        assert_eq!(step.key(), "status");
652    }
653
654    #[test]
655    fn test_property_cardinality() {
656        let step = PropertyStep::with_value("tags".to_string(), json!("new"))
657            .cardinality(PropertyCardinality::List);
658
659        assert!(matches!(step.cardinality, PropertyCardinality::List));
660    }
661
662    #[test]
663    fn test_sack_step_set() {
664        let step = SackStep::set(json!(0));
665
666        let traverser = Traverser::new("v1");
667        let result = step.process_traverser(traverser);
668
669        if let StepResult::Emit(t) = result {
670            assert_eq!(t[0].sack(), Some(&json!(0)));
671        }
672    }
673
674    #[test]
675    fn test_profile_step() {
676        let step = ProfileStep::new();
677
678        step.side_effect(&Traverser::new("v1"));
679        step.side_effect(&Traverser::new("v2"));
680
681        let metrics = step.metrics();
682        assert_eq!(metrics.traverser_count, 2);
683    }
684
685    #[test]
686    fn test_store_step_recovers_after_values_lock_poisoning() {
687        let step = StoreStep::new("x".to_string());
688        let poison_target = step.clone();
689        let _ = std::thread::spawn(move || {
690            let _guard = poison_target
691                .values
692                .write()
693                .expect("store values lock should be acquired");
694            panic!("poison store values lock");
695        })
696        .join();
697
698        step.side_effect(&Traverser::new("v1"));
699        let values = step.values();
700        assert_eq!(values.len(), 1);
701        assert_eq!(values[0]["id"], json!("v1"));
702    }
703
704    #[test]
705    fn test_profile_step_recovers_after_metrics_lock_poisoning() {
706        let step = ProfileStep::new();
707        let poison_target = step.clone();
708        let _ = std::thread::spawn(move || {
709            let _guard = poison_target
710                .metrics
711                .write()
712                .expect("profile metrics lock should be acquired");
713            panic!("poison profile metrics lock");
714        })
715        .join();
716
717        step.side_effect(&Traverser::new("v1"));
718        assert_eq!(step.metrics().traverser_count, 1);
719    }
720}