Skip to main content

reddb_server/storage/query/step/
mod.rs

1//! Step Architecture
2//!
3//! TinkerPop-inspired step hierarchy for graph traversal execution.
4//!
5//! # Architecture
6//!
7//! ```text
8//! Step (trait)
9//! ├── SourceStep       - Start of traversal (V(), E())
10//! ├── FilterStep       - Predicate filtering (has, where)
11//! ├── MapStep          - 1:1 transformation (map, select)
12//! ├── FlatMapStep      - 1:N expansion (out, in, both)
13//! ├── SideEffectStep   - Side-effect execution (store, aggregate)
14//! ├── BranchStep       - Conditional branching (choose, union)
15//! └── BarrierStep      - Synchronization (fold, group, dedup)
16//! ```
17//!
18//! # TraverserRequirements
19//!
20//! Steps declare their requirements, enabling optimization:
21//! - `PATH`: Needs path tracking
22//! - `BULK`: Supports bulk processing
23//! - `LOOP`: Uses loop counters
24//! - `LABELS`: Uses step labels
25
26pub mod barrier;
27pub mod branch;
28pub mod filter;
29pub mod flatmap;
30pub mod map;
31pub mod sideeffect;
32pub mod source;
33pub mod traverser;
34
35// Re-export common types
36pub use barrier::{
37    BarrierStep, CollectingBarrierStep, FoldStep, GroupStep, OrderStep, ReducingBarrierStep,
38};
39pub use branch::{BranchStep, ChooseStep, OptionalStep, RepeatStep, UnionStep};
40pub use filter::{DedupStep, FilterStep, HasStep, LimitStep, Predicate, RangeStep, WhereStep};
41pub use flatmap::{BothStep, Direction, EdgeStep, FlatMapStep, InStep, OutStep, VertexStep};
42pub use map::{IdStep, MapStep, PathStep, ProjectStep, SelectStep, ValueMapStep};
43pub use sideeffect::{AggregateStep, PropertyStep, SideEffectStep, StoreStep};
44pub use source::{EdgeSourceStep, SourceStep, VertexSourceStep};
45pub use traverser::{
46    LoopState, Path, Traverser, TraverserGenerator, TraverserRequirement, TraverserValue,
47};
48
49use std::any::Any;
50use std::fmt::Debug;
51
52/// Core step trait - all traversal steps implement this
53pub trait Step: Send + Sync + Debug {
54    /// Step identifier (unique in traversal)
55    fn id(&self) -> &str;
56
57    /// Human-readable name
58    fn name(&self) -> &str;
59
60    /// Labels assigned to this step
61    fn labels(&self) -> &[String];
62
63    /// Add a label to this step
64    fn add_label(&mut self, label: String);
65
66    /// Requirements this step declares
67    fn requirements(&self) -> &[TraverserRequirement];
68
69    /// Process a single traverser (standard algorithm)
70    fn process_traverser(&self, traverser: Traverser) -> StepResult;
71
72    /// Reset step state for reuse
73    fn reset(&mut self);
74
75    /// Clone as trait object
76    fn clone_step(&self) -> Box<dyn Step>;
77
78    /// Downcast to concrete type
79    fn as_any(&self) -> &dyn Any;
80
81    /// Downcast to mutable concrete type
82    fn as_any_mut(&mut self) -> &mut dyn Any;
83}
84
85/// Result of processing a traverser through a step
86#[derive(Debug, Clone)]
87pub enum StepResult {
88    /// Emit these traversers to next step
89    Emit(Vec<Traverser>),
90    /// Filter out (no traversers emitted)
91    Filter,
92    /// Hold traversers (barrier steps)
93    Hold(Vec<Traverser>),
94    /// Error during processing
95    Error(String),
96}
97
98impl StepResult {
99    /// Create emit result with single traverser
100    pub fn emit_one(traverser: Traverser) -> Self {
101        StepResult::Emit(vec![traverser])
102    }
103
104    /// Create emit result with multiple traversers
105    pub fn emit_many(traversers: Vec<Traverser>) -> Self {
106        if traversers.is_empty() {
107            StepResult::Filter
108        } else {
109            StepResult::Emit(traversers)
110        }
111    }
112
113    /// Check if result is a filter
114    pub fn is_filter(&self) -> bool {
115        matches!(self, StepResult::Filter)
116    }
117
118    /// Check if result has traversers
119    pub fn has_traversers(&self) -> bool {
120        match self {
121            StepResult::Emit(t) => !t.is_empty(),
122            StepResult::Hold(t) => !t.is_empty(),
123            _ => false,
124        }
125    }
126
127    /// Extract traversers if present
128    pub fn into_traversers(self) -> Vec<Traverser> {
129        match self {
130            StepResult::Emit(t) | StepResult::Hold(t) => t,
131            _ => Vec::new(),
132        }
133    }
134}
135
136/// Step execution mode
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum ExecutionMode {
139    /// Single-machine standard execution
140    Standard,
141    /// Distributed graph computer execution
142    Computer,
143}
144
145/// Step position in traversal
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum StepPosition {
148    /// Start step (source)
149    Start,
150    /// Middle step
151    Middle,
152    /// End step (terminal)
153    End,
154}
155
156/// Traversal parent for nested traversals
157pub trait TraversalParent: Send + Sync {
158    /// Get global child traversals (executed once)
159    fn global_children(&self) -> Vec<&dyn Traversal>;
160
161    /// Get local child traversals (per-traverser)
162    fn local_children(&self) -> Vec<&dyn Traversal>;
163
164    /// Get all child traversals
165    fn children(&self) -> Vec<&dyn Traversal> {
166        let mut all = self.global_children();
167        all.extend(self.local_children());
168        all
169    }
170}
171
172/// Traversal interface for step pipelines
173pub trait Traversal: Send + Sync + Debug {
174    /// Get all steps in order
175    fn steps(&self) -> &[Box<dyn Step>];
176
177    /// Get mutable steps
178    fn steps_mut(&mut self) -> &mut Vec<Box<dyn Step>>;
179
180    /// Add a step to the end
181    fn add_step(&mut self, step: Box<dyn Step>);
182
183    /// Insert step at position
184    fn insert_step(&mut self, index: usize, step: Box<dyn Step>);
185
186    /// Remove step at position
187    fn remove_step(&mut self, index: usize) -> Box<dyn Step>;
188
189    /// Get step by index
190    fn get_step(&self, index: usize) -> Option<&dyn Step>;
191
192    /// Find step by id
193    fn find_step(&self, id: &str) -> Option<&dyn Step>;
194
195    /// Get aggregated requirements
196    fn requirements(&self) -> Vec<TraverserRequirement>;
197
198    /// Reset all steps
199    fn reset(&mut self);
200}
201
202/// Basic traversal implementation
203#[derive(Debug)]
204pub struct BasicTraversal {
205    steps: Vec<Box<dyn Step>>,
206    requirements_cache: Option<Vec<TraverserRequirement>>,
207}
208
209impl BasicTraversal {
210    /// Create empty traversal
211    pub fn new() -> Self {
212        Self {
213            steps: Vec::new(),
214            requirements_cache: None,
215        }
216    }
217
218    /// Create traversal with steps
219    pub fn with_steps(steps: Vec<Box<dyn Step>>) -> Self {
220        Self {
221            steps,
222            requirements_cache: None,
223        }
224    }
225
226    /// Invalidate requirements cache
227    fn invalidate_cache(&mut self) {
228        self.requirements_cache = None;
229    }
230}
231
232impl Default for BasicTraversal {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238impl Clone for BasicTraversal {
239    fn clone(&self) -> Self {
240        Self {
241            steps: self.steps.iter().map(|s| s.clone_step()).collect(),
242            requirements_cache: self.requirements_cache.clone(),
243        }
244    }
245}
246
247impl Traversal for BasicTraversal {
248    fn steps(&self) -> &[Box<dyn Step>] {
249        &self.steps
250    }
251
252    fn steps_mut(&mut self) -> &mut Vec<Box<dyn Step>> {
253        self.invalidate_cache();
254        &mut self.steps
255    }
256
257    fn add_step(&mut self, step: Box<dyn Step>) {
258        self.invalidate_cache();
259        self.steps.push(step);
260    }
261
262    fn insert_step(&mut self, index: usize, step: Box<dyn Step>) {
263        self.invalidate_cache();
264        self.steps.insert(index, step);
265    }
266
267    fn remove_step(&mut self, index: usize) -> Box<dyn Step> {
268        self.invalidate_cache();
269        self.steps.remove(index)
270    }
271
272    fn get_step(&self, index: usize) -> Option<&dyn Step> {
273        self.steps.get(index).map(|s| s.as_ref())
274    }
275
276    fn find_step(&self, id: &str) -> Option<&dyn Step> {
277        self.steps.iter().find(|s| s.id() == id).map(|s| s.as_ref())
278    }
279
280    fn requirements(&self) -> Vec<TraverserRequirement> {
281        if let Some(ref cached) = self.requirements_cache {
282            return cached.clone();
283        }
284
285        let mut reqs: Vec<TraverserRequirement> = Vec::new();
286        for step in &self.steps {
287            for req in step.requirements() {
288                if !reqs.contains(req) {
289                    reqs.push(req.clone());
290                }
291            }
292        }
293        reqs
294    }
295
296    fn reset(&mut self) {
297        for step in &mut self.steps {
298            step.reset();
299        }
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_step_result_emit_one() {
309        let traverser = Traverser::new("test");
310        let result = StepResult::emit_one(traverser);
311        assert!(result.has_traversers());
312    }
313
314    #[test]
315    fn test_step_result_filter() {
316        let result = StepResult::Filter;
317        assert!(result.is_filter());
318        assert!(!result.has_traversers());
319    }
320
321    #[test]
322    fn test_step_result_into_traversers() {
323        let t1 = Traverser::new("a");
324        let t2 = Traverser::new("b");
325        let result = StepResult::emit_many(vec![t1, t2]);
326        let traversers = result.into_traversers();
327        assert_eq!(traversers.len(), 2);
328    }
329
330    #[test]
331    fn test_basic_traversal() {
332        let traversal = BasicTraversal::new();
333        assert_eq!(traversal.steps().len(), 0);
334    }
335}