Skip to main content

reddb_server/storage/query/executors/
gremlin.rs

1//! Gremlin Traversal Executor
2//!
3//! Implements TinkerPop-inspired traversal execution with:
4//! - `Traverser`: Tracks current position, path history, and bulk count
5//! - Step execution: V, E, out, in, has, filter, etc.
6//! - Path tracking for return path queries
7//! - Loop detection for repeat() steps
8//!
9//! # Architecture (inspired by TinkerPop)
10//!
11//! ```text
12//! GremlinTraversal → [Step, Step, ...] → Traverser stream → Result
13//!                    ↓                    ↓
14//!                 GremlinStep           TraverserSet
15//! ```
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19
20use crate::storage::engine::graph_store::GraphStore;
21use crate::storage::query::modes::gremlin::{
22    GremlinParser, GremlinPredicate, GremlinStep, GremlinTraversal, GremlinValue, TraversalSource,
23};
24use crate::storage::query::unified::{
25    ExecutionError, GraphPath, MatchedEdge, MatchedNode, QueryStats, UnifiedRecord, UnifiedResult,
26};
27
28/// A traverser represents a position in the graph during traversal
29#[derive(Debug, Clone)]
30pub struct Traverser {
31    /// Current element (node ID or edge ID)
32    pub current: TraverserElement,
33    /// Path history
34    pub path: Vec<TraverserElement>,
35    /// Labels assigned via as() step
36    pub labels: HashMap<String, TraverserElement>,
37    /// Bulk count (optimization for duplicate paths)
38    pub bulk: u64,
39    /// Loop counters for repeat() steps
40    pub loops: HashMap<String, u32>,
41    /// Sack values for side-effect accumulation
42    pub sack: Option<SackValue>,
43}
44
45/// Element that a traverser can point to
46#[derive(Debug, Clone, PartialEq)]
47pub enum TraverserElement {
48    Node(String),
49    Edge {
50        from: String,
51        edge_label: String,
52        to: String,
53        weight: f32,
54    },
55    Value(GremlinValue),
56}
57
58impl TraverserElement {
59    /// Get as node ID if this is a node
60    pub fn as_node_id(&self) -> Option<&str> {
61        match self {
62            Self::Node(id) => Some(id),
63            _ => None,
64        }
65    }
66}
67
68/// Sack value for accumulation
69#[derive(Debug, Clone)]
70pub enum SackValue {
71    Integer(i64),
72    Float(f64),
73    List(Vec<GremlinValue>),
74    Map(HashMap<String, GremlinValue>),
75}
76
77/// Gremlin executor with traverser-based execution
78pub struct GremlinExecutor {
79    graph: Arc<GraphStore>,
80}
81
82impl GremlinExecutor {
83    /// Create a new Gremlin executor
84    pub fn new(graph: Arc<GraphStore>) -> Self {
85        Self { graph }
86    }
87
88    /// Execute a Gremlin query string
89    pub fn execute(&self, query: &str) -> Result<UnifiedResult, ExecutionError> {
90        let traversal =
91            GremlinParser::parse(query).map_err(|e| ExecutionError::new(e.to_string()))?;
92        self.execute_traversal(&traversal)
93    }
94
95    /// Execute a parsed traversal
96    pub fn execute_traversal(
97        &self,
98        traversal: &GremlinTraversal,
99    ) -> Result<UnifiedResult, ExecutionError> {
100        let mut stats = QueryStats::default();
101
102        // Initialize traverser set from source type
103        let mut traversers = Vec::new();
104
105        // First find the V() or E() step that initializes the traversal
106        for step in &traversal.steps {
107            match step {
108                GremlinStep::V(id_opt) => {
109                    if let Some(id) = id_opt {
110                        // Specific vertex
111                        if self.graph.get_node(id).is_some() {
112                            traversers.push(Traverser::at_node(id));
113                            stats.nodes_scanned += 1;
114                        }
115                    } else {
116                        // All vertices
117                        for node in self.graph.iter_nodes() {
118                            stats.nodes_scanned += 1;
119                            traversers.push(Traverser::at_node(&node.id));
120                        }
121                    }
122                    break;
123                }
124                GremlinStep::E(id_opt) => {
125                    if let Some(id) = id_opt {
126                        // Specific edge by ID (format: "from->to")
127                        if let Some((from, to)) = id.split_once("->") {
128                            for (edge_type, target, weight) in self.graph.outgoing_edges(from) {
129                                if target == to {
130                                    traversers
131                                        .push(Traverser::at_edge(from, edge_type, &target, weight));
132                                    stats.edges_scanned += 1;
133                                }
134                            }
135                        }
136                    } else {
137                        // All edges
138                        for node in self.graph.iter_nodes() {
139                            for (edge_type, target, weight) in self.graph.outgoing_edges(&node.id) {
140                                stats.edges_scanned += 1;
141                                traversers
142                                    .push(Traverser::at_edge(&node.id, edge_type, &target, weight));
143                            }
144                        }
145                    }
146                    break;
147                }
148                _ => {}
149            }
150        }
151
152        // If no V()/E() step found and source is Anonymous, start empty
153        if traversers.is_empty() && traversal.source == TraversalSource::Anonymous {
154            // Anonymous traversals are typically used as inner traversals
155        }
156
157        // Execute each step after the source step
158        let mut found_source = false;
159        for step in &traversal.steps {
160            // Skip the source step we already processed
161            if !found_source && matches!(step, GremlinStep::V(_) | GremlinStep::E(_)) {
162                found_source = true;
163                continue;
164            }
165
166            traversers = self.execute_step(traversers, step, &mut stats)?;
167            if traversers.is_empty() {
168                break;
169            }
170        }
171
172        // Convert traversers to result
173        self.traversers_to_result(traversers, stats)
174    }
175
176    /// Execute a single step on all traversers
177    fn execute_step(
178        &self,
179        traversers: Vec<Traverser>,
180        step: &GremlinStep,
181        stats: &mut QueryStats,
182    ) -> Result<Vec<Traverser>, ExecutionError> {
183        let mut result = Vec::new();
184
185        match step {
186            // ======== Source Steps (already processed) ========
187            GremlinStep::V(_) | GremlinStep::E(_) => {
188                // Already handled in initialization
189                result = traversers;
190            }
191
192            // ======== Navigation Steps ========
193            GremlinStep::Out(label_opt) => {
194                for t in traversers {
195                    if let Some(node_id) = t.current.as_node_id() {
196                        for (edge_type, target, _) in self.graph.outgoing_edges(node_id) {
197                            stats.edges_scanned += 1;
198                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
199                                result.push(t.move_to_node(&target));
200                            }
201                        }
202                    }
203                }
204            }
205            GremlinStep::In(label_opt) => {
206                for t in traversers {
207                    if let Some(node_id) = t.current.as_node_id() {
208                        for (edge_type, source, _) in self.graph.incoming_edges(node_id) {
209                            stats.edges_scanned += 1;
210                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
211                                result.push(t.move_to_node(&source));
212                            }
213                        }
214                    }
215                }
216            }
217            GremlinStep::Both(label_opt) => {
218                for t in traversers {
219                    if let Some(node_id) = t.current.as_node_id() {
220                        // Outgoing
221                        for (edge_type, target, _) in self.graph.outgoing_edges(node_id) {
222                            stats.edges_scanned += 1;
223                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
224                                result.push(t.move_to_node(&target));
225                            }
226                        }
227                        // Incoming
228                        for (edge_type, source, _) in self.graph.incoming_edges(node_id) {
229                            stats.edges_scanned += 1;
230                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
231                                result.push(t.move_to_node(&source));
232                            }
233                        }
234                    }
235                }
236            }
237            GremlinStep::OutE(label_opt) => {
238                for t in traversers {
239                    if let Some(node_id) = t.current.as_node_id() {
240                        for (edge_type, target, weight) in self.graph.outgoing_edges(node_id) {
241                            stats.edges_scanned += 1;
242                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
243                                result.push(t.move_to_edge(node_id, edge_type, &target, weight));
244                            }
245                        }
246                    }
247                }
248            }
249            GremlinStep::InE(label_opt) => {
250                for t in traversers {
251                    if let Some(node_id) = t.current.as_node_id() {
252                        for (edge_type, source, weight) in self.graph.incoming_edges(node_id) {
253                            stats.edges_scanned += 1;
254                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
255                                result.push(t.move_to_edge(&source, edge_type, node_id, weight));
256                            }
257                        }
258                    }
259                }
260            }
261            GremlinStep::BothE(label_opt) => {
262                for t in traversers {
263                    if let Some(node_id) = t.current.as_node_id() {
264                        for (edge_type, target, weight) in self.graph.outgoing_edges(node_id) {
265                            stats.edges_scanned += 1;
266                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
267                                result.push(t.move_to_edge(node_id, edge_type, &target, weight));
268                            }
269                        }
270                        for (edge_type, source, weight) in self.graph.incoming_edges(node_id) {
271                            stats.edges_scanned += 1;
272                            if self.edge_matches_label(edge_type.as_str(), label_opt.as_deref()) {
273                                result.push(t.move_to_edge(&source, edge_type, node_id, weight));
274                            }
275                        }
276                    }
277                }
278            }
279            GremlinStep::OutV => {
280                for t in traversers {
281                    if let TraverserElement::Edge { from, .. } = &t.current {
282                        result.push(t.move_to_node(from));
283                    }
284                }
285            }
286            GremlinStep::InV => {
287                for t in traversers {
288                    if let TraverserElement::Edge { to, .. } = &t.current {
289                        result.push(t.move_to_node(to));
290                    }
291                }
292            }
293            GremlinStep::BothV => {
294                for t in traversers {
295                    if let TraverserElement::Edge { from, to, .. } = &t.current {
296                        result.push(t.move_to_node(from));
297                        result.push(t.move_to_node(to));
298                    }
299                }
300            }
301            GremlinStep::OtherV => {
302                // Other vertex from edge - requires context of where we came from
303                for t in traversers {
304                    if let TraverserElement::Edge { from, to, .. } = &t.current {
305                        // Look at previous element in path to determine direction
306                        if let Some(prev) = t.path.last() {
307                            if let Some(prev_id) = prev.as_node_id() {
308                                if prev_id == from {
309                                    result.push(t.move_to_node(to));
310                                } else {
311                                    result.push(t.move_to_node(from));
312                                }
313                            }
314                        }
315                    }
316                }
317            }
318
319            // ======== Filter Steps ========
320            GremlinStep::Has(key, value_opt) => {
321                for t in traversers {
322                    if self.check_has(&t, key, value_opt.as_ref(), stats) {
323                        result.push(t);
324                    }
325                }
326            }
327            GremlinStep::HasNot(key) => {
328                for t in traversers {
329                    if !self.check_has(&t, key, None, stats) {
330                        result.push(t);
331                    }
332                }
333            }
334            GremlinStep::HasLabel(label) => {
335                for t in traversers {
336                    if self.check_has_label(&t, label, stats) {
337                        result.push(t);
338                    }
339                }
340            }
341            GremlinStep::HasId(id) => {
342                for t in traversers {
343                    if let Some(node_id) = t.current.as_node_id() {
344                        if node_id == id {
345                            result.push(t);
346                        }
347                    }
348                }
349            }
350            GremlinStep::Where(inner) => {
351                for t in traversers {
352                    let inner_result =
353                        self.execute_inner_traversal(vec![t.clone()], inner, stats)?;
354                    if !inner_result.is_empty() {
355                        result.push(t);
356                    }
357                }
358            }
359            GremlinStep::Filter(inner) => {
360                for t in traversers {
361                    let inner_result =
362                        self.execute_inner_traversal(vec![t.clone()], inner, stats)?;
363                    if !inner_result.is_empty() {
364                        result.push(t);
365                    }
366                }
367            }
368            GremlinStep::Dedup => {
369                let mut seen = HashSet::new();
370                for t in traversers {
371                    let key = format!("{:?}", t.current);
372                    if seen.insert(key) {
373                        result.push(t);
374                    }
375                }
376            }
377            GremlinStep::Limit(n) => {
378                result = traversers.into_iter().take(*n as usize).collect();
379            }
380            GremlinStep::Skip(n) => {
381                result = traversers.into_iter().skip(*n as usize).collect();
382            }
383            GremlinStep::Range(from, to) => {
384                result = traversers
385                    .into_iter()
386                    .skip(*from as usize)
387                    .take((*to - *from) as usize)
388                    .collect();
389            }
390
391            // ======== Side Effect Steps ========
392            GremlinStep::As(label) => {
393                for mut t in traversers {
394                    t.labels.insert(label.clone(), t.current.clone());
395                    result.push(t);
396                }
397            }
398            GremlinStep::Store(_) | GremlinStep::Aggregate(_) => {
399                // Side effects - just pass through
400                result = traversers;
401            }
402            GremlinStep::By(_) => {
403                // Modifier step - pass through
404                result = traversers;
405            }
406
407            // ======== Branch Steps ========
408            GremlinStep::Repeat(inner) => {
409                // Basic repeat - look for following Times/Until modifiers
410                // For now, execute once
411                result = self.execute_inner_traversal(traversers, inner, stats)?;
412            }
413            GremlinStep::Times(n) => {
414                // Modifier for repeat - handled in Repeat
415                // If we get here, just pass through
416                let _ = n;
417                result = traversers;
418            }
419            GremlinStep::Until(inner) => {
420                // Modifier for repeat - handled in Repeat
421                let _ = inner;
422                result = traversers;
423            }
424            GremlinStep::Emit => {
425                // Modifier for repeat - handled in Repeat
426                result = traversers;
427            }
428            GremlinStep::Union(branches) => {
429                for t in traversers {
430                    for branch in branches {
431                        result.extend(self.execute_inner_traversal(
432                            vec![t.clone()],
433                            branch,
434                            stats,
435                        )?);
436                    }
437                }
438            }
439            GremlinStep::Choose(cond, if_branch, else_branch) => {
440                for t in traversers {
441                    let matches = self.execute_inner_traversal(vec![t.clone()], cond, stats)?;
442                    if !matches.is_empty() {
443                        result.extend(self.execute_inner_traversal(vec![t], if_branch, stats)?);
444                    } else if let Some(else_br) = else_branch {
445                        result.extend(self.execute_inner_traversal(vec![t], else_br, stats)?);
446                    }
447                }
448            }
449            GremlinStep::Coalesce(branches) => {
450                for t in traversers {
451                    for branch in branches {
452                        let branch_result =
453                            self.execute_inner_traversal(vec![t.clone()], branch, stats)?;
454                        if !branch_result.is_empty() {
455                            result.extend(branch_result);
456                            break;
457                        }
458                    }
459                }
460            }
461
462            // ======== Map Steps ========
463            GremlinStep::Id => {
464                for t in traversers {
465                    if let Some(id) = t.current.as_node_id() {
466                        result.push(t.with_value(GremlinValue::String(id.to_string())));
467                    }
468                }
469            }
470            GremlinStep::Label => {
471                for t in traversers {
472                    if let Some(id) = t.current.as_node_id() {
473                        if let Some(node) = self.graph.get_node(id) {
474                            result.push(t.with_value(GremlinValue::String(node.label.clone())));
475                        }
476                    }
477                }
478            }
479            GremlinStep::Values(keys) => {
480                for t in traversers {
481                    for key in keys {
482                        if let Some(val) = self.get_property(&t, key) {
483                            result.push(t.with_value(GremlinValue::String(val)));
484                        }
485                    }
486                }
487            }
488            GremlinStep::ValueMap(keys) => {
489                // Return all properties as a map (simplified)
490                let _ = keys;
491                for t in traversers {
492                    if let Some(id) = t.current.as_node_id() {
493                        if let Some(_node) = self.graph.get_node(id) {
494                            result.push(t);
495                        }
496                    }
497                }
498            }
499            GremlinStep::Properties(keys) => {
500                // Same as values for now
501                for t in traversers {
502                    for key in keys {
503                        if let Some(val) = self.get_property(&t, key) {
504                            result.push(t.with_value(GremlinValue::String(val)));
505                        }
506                    }
507                }
508            }
509            GremlinStep::Select(labels) => {
510                for t in traversers {
511                    let mut new_t = t.clone();
512                    let selected: HashMap<_, _> = labels
513                        .iter()
514                        .filter_map(|l| t.labels.get(l).map(|v| (l.clone(), v.clone())))
515                        .collect();
516                    new_t.labels = selected;
517                    result.push(new_t);
518                }
519            }
520            GremlinStep::Project(keys) => {
521                // Similar to select but for computed values
522                let _ = keys;
523                result = traversers;
524            }
525            GremlinStep::Path => {
526                // Already tracking path, just pass through
527                result = traversers;
528            }
529            GremlinStep::SimplePath => {
530                for t in traversers {
531                    let mut seen = HashSet::new();
532                    let is_simple = t
533                        .path
534                        .iter()
535                        .filter_map(|e| e.as_node_id())
536                        .all(|id| seen.insert(id.to_string()));
537                    if is_simple {
538                        result.push(t);
539                    }
540                }
541            }
542            GremlinStep::CyclicPath => {
543                for t in traversers {
544                    let mut seen = HashSet::new();
545                    let has_cycle = !t
546                        .path
547                        .iter()
548                        .filter_map(|e| e.as_node_id())
549                        .all(|id| seen.insert(id.to_string()));
550                    if has_cycle {
551                        result.push(t);
552                    }
553                }
554            }
555
556            // ======== Aggregate Steps ========
557            GremlinStep::Count => {
558                let count = traversers.len() as i64;
559                let t = Traverser {
560                    current: TraverserElement::Value(GremlinValue::Integer(count)),
561                    path: Vec::new(),
562                    labels: HashMap::new(),
563                    bulk: 1,
564                    loops: HashMap::new(),
565                    sack: None,
566                };
567                result.push(t);
568            }
569            GremlinStep::Sum => {
570                let sum: f64 = traversers
571                    .iter()
572                    .filter_map(|t| match &t.current {
573                        TraverserElement::Value(GremlinValue::Integer(i)) => Some(*i as f64),
574                        TraverserElement::Value(GremlinValue::Float(f)) => Some(*f),
575                        _ => None,
576                    })
577                    .sum();
578                result.push(Traverser {
579                    current: TraverserElement::Value(GremlinValue::Float(sum)),
580                    path: Vec::new(),
581                    labels: HashMap::new(),
582                    bulk: 1,
583                    loops: HashMap::new(),
584                    sack: None,
585                });
586            }
587            GremlinStep::Min => {
588                let min = traversers
589                    .iter()
590                    .filter_map(|t| match &t.current {
591                        TraverserElement::Value(GremlinValue::Integer(i)) => Some(*i as f64),
592                        TraverserElement::Value(GremlinValue::Float(f)) => Some(*f),
593                        _ => None,
594                    })
595                    .fold(f64::INFINITY, |a, b| a.min(b));
596                if min.is_finite() {
597                    result.push(Traverser {
598                        current: TraverserElement::Value(GremlinValue::Float(min)),
599                        path: Vec::new(),
600                        labels: HashMap::new(),
601                        bulk: 1,
602                        loops: HashMap::new(),
603                        sack: None,
604                    });
605                }
606            }
607            GremlinStep::Max => {
608                let max = traversers
609                    .iter()
610                    .filter_map(|t| match &t.current {
611                        TraverserElement::Value(GremlinValue::Integer(i)) => Some(*i as f64),
612                        TraverserElement::Value(GremlinValue::Float(f)) => Some(*f),
613                        _ => None,
614                    })
615                    .fold(f64::NEG_INFINITY, |a, b| a.max(b));
616                if max.is_finite() {
617                    result.push(Traverser {
618                        current: TraverserElement::Value(GremlinValue::Float(max)),
619                        path: Vec::new(),
620                        labels: HashMap::new(),
621                        bulk: 1,
622                        loops: HashMap::new(),
623                        sack: None,
624                    });
625                }
626            }
627            GremlinStep::Mean => {
628                let vals: Vec<f64> = traversers
629                    .iter()
630                    .filter_map(|t| match &t.current {
631                        TraverserElement::Value(GremlinValue::Integer(i)) => Some(*i as f64),
632                        TraverserElement::Value(GremlinValue::Float(f)) => Some(*f),
633                        _ => None,
634                    })
635                    .collect();
636                if !vals.is_empty() {
637                    let mean = vals.iter().sum::<f64>() / vals.len() as f64;
638                    result.push(Traverser {
639                        current: TraverserElement::Value(GremlinValue::Float(mean)),
640                        path: Vec::new(),
641                        labels: HashMap::new(),
642                        bulk: 1,
643                        loops: HashMap::new(),
644                        sack: None,
645                    });
646                }
647            }
648            GremlinStep::Group => {
649                // Group step without modifier - pass through
650                result = traversers;
651            }
652            GremlinStep::GroupCount => {
653                // GroupCount - count occurrences
654                let mut counts: HashMap<String, u64> = HashMap::new();
655                for t in &traversers {
656                    let val = t.current.as_node_id().unwrap_or("unknown").to_string();
657                    *counts.entry(val).or_insert(0) += t.bulk;
658                }
659                // Return as single traverser (simplified)
660                result = traversers;
661            }
662            GremlinStep::Fold => {
663                result = vec![Traverser {
664                    current: TraverserElement::Value(
665                        GremlinValue::Integer(traversers.len() as i64),
666                    ),
667                    path: Vec::new(),
668                    labels: HashMap::new(),
669                    bulk: 1,
670                    loops: HashMap::new(),
671                    sack: None,
672                }];
673            }
674
675            // ======== Terminal Steps ========
676            GremlinStep::ToList | GremlinStep::ToSet | GremlinStep::Next => {
677                result = traversers;
678            }
679        }
680
681        Ok(result)
682    }
683
684    /// Execute an inner traversal (for repeat, where, etc.)
685    fn execute_inner_traversal(
686        &self,
687        traversers: Vec<Traverser>,
688        inner: &GremlinTraversal,
689        stats: &mut QueryStats,
690    ) -> Result<Vec<Traverser>, ExecutionError> {
691        let mut current = traversers;
692        for step in &inner.steps {
693            current = self.execute_step(current, step, stats)?;
694            if current.is_empty() {
695                break;
696            }
697        }
698        Ok(current)
699    }
700
701    /// Check if edge label matches a Gremlin label filter.
702    fn edge_matches_label(&self, edge_label: &str, label: Option<&str>) -> bool {
703        match label {
704            None => true,
705            Some(l) => {
706                let edge_str = edge_label.to_lowercase();
707                edge_str.contains(&l.to_lowercase())
708            }
709        }
710    }
711
712    /// Check has() filter
713    fn check_has(
714        &self,
715        t: &Traverser,
716        key: &str,
717        value: Option<&GremlinValue>,
718        stats: &mut QueryStats,
719    ) -> bool {
720        if let Some(node_id) = t.current.as_node_id() {
721            if let Some(node) = self.graph.get_node(node_id) {
722                stats.nodes_scanned += 1;
723                match key {
724                    "label" | "type" | "nodeType" => {
725                        let node_type_str = format!("{:?}", node.node_type);
726                        if let Some(val) = value {
727                            matches_gremlin_value(&node_type_str, val)
728                        } else {
729                            true
730                        }
731                    }
732                    "id" => {
733                        if let Some(val) = value {
734                            matches_gremlin_value(&node.id, val)
735                        } else {
736                            true
737                        }
738                    }
739                    "name" => {
740                        if let Some(val) = value {
741                            matches_gremlin_value(&node.label, val)
742                        } else {
743                            true
744                        }
745                    }
746                    _ => {
747                        if let Some(val) = value {
748                            match val {
749                                GremlinValue::String(s) => node.label.contains(s),
750                                _ => false,
751                            }
752                        } else {
753                            node.label.contains(key)
754                        }
755                    }
756                }
757            } else {
758                false
759            }
760        } else {
761            false
762        }
763    }
764
765    /// Check hasLabel() filter
766    fn check_has_label(&self, t: &Traverser, label: &str, stats: &mut QueryStats) -> bool {
767        if let Some(node_id) = t.current.as_node_id() {
768            if let Some(node) = self.graph.get_node(node_id) {
769                stats.nodes_scanned += 1;
770                let node_type_str = format!("{:?}", node.node_type).to_lowercase();
771                node_type_str.contains(&label.to_lowercase())
772                    || node.label.to_lowercase().contains(&label.to_lowercase())
773            } else {
774                false
775            }
776        } else {
777            false
778        }
779    }
780
781    /// Get property value from traverser
782    fn get_property(&self, t: &Traverser, key: &str) -> Option<String> {
783        if let Some(node_id) = t.current.as_node_id() {
784            if let Some(node) = self.graph.get_node(node_id) {
785                match key {
786                    "id" => Some(node.id.clone()),
787                    "label" | "name" => Some(node.label.clone()),
788                    "type" => Some(format!("{:?}", node.node_type)),
789                    _ => None,
790                }
791            } else {
792                None
793            }
794        } else {
795            None
796        }
797    }
798
799    /// Convert traversers to unified result
800    fn traversers_to_result(
801        &self,
802        traversers: Vec<Traverser>,
803        stats: QueryStats,
804    ) -> Result<UnifiedResult, ExecutionError> {
805        let mut result = UnifiedResult::empty();
806        result.stats = stats;
807
808        for t in traversers {
809            let mut record = UnifiedRecord::new();
810
811            // Add current element
812            match &t.current {
813                TraverserElement::Node(id) => {
814                    if let Some(node) = self.graph.get_node(id) {
815                        record.set_node("_", MatchedNode::from_stored(&node));
816                    }
817                }
818                TraverserElement::Edge {
819                    from,
820                    edge_label,
821                    to,
822                    weight,
823                } => {
824                    record.set_edge(
825                        "_",
826                        MatchedEdge::from_tuple(from, edge_label.clone(), to, *weight),
827                    );
828                }
829                TraverserElement::Value(v) => match v {
830                    GremlinValue::String(s) => {
831                        record.set("_value", crate::storage::schema::Value::text(s.clone()))
832                    }
833                    GremlinValue::Integer(i) => {
834                        record.set("_value", crate::storage::schema::Value::Integer(*i))
835                    }
836                    GremlinValue::Float(f) => {
837                        record.set("_value", crate::storage::schema::Value::Float(*f))
838                    }
839                    GremlinValue::Boolean(b) => {
840                        record.set("_value", crate::storage::schema::Value::Boolean(*b))
841                    }
842                    GremlinValue::Predicate(_) => {}
843                },
844            }
845
846            // Add labeled elements
847            for (label, elem) in &t.labels {
848                match elem {
849                    TraverserElement::Node(id) => {
850                        if let Some(node) = self.graph.get_node(id) {
851                            record.set_node(label, MatchedNode::from_stored(&node));
852                        }
853                    }
854                    TraverserElement::Edge {
855                        from,
856                        edge_label,
857                        to,
858                        weight,
859                    } => {
860                        record.set_edge(
861                            label,
862                            MatchedEdge::from_tuple(from, edge_label.clone(), to, *weight),
863                        );
864                    }
865                    _ => {}
866                }
867            }
868
869            // Add path if present
870            if !t.path.is_empty() {
871                // Collect node IDs from path elements
872                let node_ids: Vec<String> = t
873                    .path
874                    .iter()
875                    .filter_map(|elem| elem.as_node_id().map(|s| s.to_string()))
876                    .collect();
877
878                if let Some(first_id) = node_ids.first() {
879                    let mut path = GraphPath::start(first_id);
880                    // Add remaining nodes to the path
881                    for id in node_ids.iter().skip(1) {
882                        path.nodes.push(id.clone());
883                    }
884                    record.paths.push(path);
885                }
886            }
887
888            result.push(record);
889        }
890
891        Ok(result)
892    }
893}
894
895impl Traverser {
896    /// Create traverser at a node
897    fn at_node(id: &str) -> Self {
898        Self {
899            current: TraverserElement::Node(id.to_string()),
900            path: vec![TraverserElement::Node(id.to_string())],
901            labels: HashMap::new(),
902            bulk: 1,
903            loops: HashMap::new(),
904            sack: None,
905        }
906    }
907
908    /// Create traverser at an edge
909    fn at_edge(from: &str, edge_label: impl Into<String>, to: &str, weight: f32) -> Self {
910        Self {
911            current: TraverserElement::Edge {
912                from: from.to_string(),
913                edge_label: edge_label.into(),
914                to: to.to_string(),
915                weight,
916            },
917            path: Vec::new(),
918            labels: HashMap::new(),
919            bulk: 1,
920            loops: HashMap::new(),
921            sack: None,
922        }
923    }
924
925    /// Move to a new node
926    fn move_to_node(&self, id: &str) -> Self {
927        let mut new_path = self.path.clone();
928        new_path.push(TraverserElement::Node(id.to_string()));
929        Self {
930            current: TraverserElement::Node(id.to_string()),
931            path: new_path,
932            labels: self.labels.clone(),
933            bulk: self.bulk,
934            loops: self.loops.clone(),
935            sack: self.sack.clone(),
936        }
937    }
938
939    /// Move to an edge
940    fn move_to_edge(
941        &self,
942        from: &str,
943        edge_label: impl Into<String>,
944        to: &str,
945        weight: f32,
946    ) -> Self {
947        let edge_label = edge_label.into();
948        let mut new_path = self.path.clone();
949        new_path.push(TraverserElement::Edge {
950            from: from.to_string(),
951            edge_label: edge_label.clone(),
952            to: to.to_string(),
953            weight,
954        });
955        Self {
956            current: TraverserElement::Edge {
957                from: from.to_string(),
958                edge_label,
959                to: to.to_string(),
960                weight,
961            },
962            path: new_path,
963            labels: self.labels.clone(),
964            bulk: self.bulk,
965            loops: self.loops.clone(),
966            sack: self.sack.clone(),
967        }
968    }
969
970    /// Convert to value traverser
971    fn with_value(&self, value: GremlinValue) -> Self {
972        Self {
973            current: TraverserElement::Value(value),
974            path: self.path.clone(),
975            labels: self.labels.clone(),
976            bulk: self.bulk,
977            loops: self.loops.clone(),
978            sack: self.sack.clone(),
979        }
980    }
981}
982
983/// Check if a string matches a Gremlin value
984fn matches_gremlin_value(s: &str, value: &GremlinValue) -> bool {
985    match value {
986        GremlinValue::String(v) => {
987            s.to_lowercase() == v.to_lowercase() || s.to_lowercase().contains(&v.to_lowercase())
988        }
989        GremlinValue::Integer(i) => s.parse::<i64>().map(|n| n == *i).unwrap_or(false),
990        GremlinValue::Float(f) => s
991            .parse::<f64>()
992            .map(|n| (n - *f).abs() < 0.0001)
993            .unwrap_or(false),
994        GremlinValue::Boolean(b) => s.parse::<bool>().map(|n| n == *b).unwrap_or(false),
995        GremlinValue::Predicate(pred) => evaluate_predicate(s, pred),
996    }
997}
998
999/// Evaluate a Gremlin predicate
1000fn evaluate_predicate(s: &str, pred: &GremlinPredicate) -> bool {
1001    match pred {
1002        GremlinPredicate::Eq(v) => matches_gremlin_value(s, v),
1003        GremlinPredicate::Neq(v) => !matches_gremlin_value(s, v),
1004        GremlinPredicate::Lt(v) => compare_values(s, v, |a, b| a < b),
1005        GremlinPredicate::Lte(v) => compare_values(s, v, |a, b| a <= b),
1006        GremlinPredicate::Gt(v) => compare_values(s, v, |a, b| a > b),
1007        GremlinPredicate::Gte(v) => compare_values(s, v, |a, b| a >= b),
1008        GremlinPredicate::Between(a, b) => {
1009            compare_values(s, a, |x, y| x >= y) && compare_values(s, b, |x, y| x <= y)
1010        }
1011        GremlinPredicate::Inside(a, b) => {
1012            compare_values(s, a, |x, y| x > y) && compare_values(s, b, |x, y| x < y)
1013        }
1014        GremlinPredicate::Outside(a, b) => {
1015            compare_values(s, a, |x, y| x < y) || compare_values(s, b, |x, y| x > y)
1016        }
1017        GremlinPredicate::Within(vals) => vals.iter().any(|v| matches_gremlin_value(s, v)),
1018        GremlinPredicate::Without(vals) => !vals.iter().any(|v| matches_gremlin_value(s, v)),
1019        GremlinPredicate::StartingWith(prefix) => s.starts_with(prefix),
1020        GremlinPredicate::EndingWith(suffix) => s.ends_with(suffix),
1021        GremlinPredicate::Containing(substring) => s.contains(substring),
1022        GremlinPredicate::Regex(pattern) => s.contains(pattern), // Simplified
1023    }
1024}
1025
1026/// Compare string value with GremlinValue
1027fn compare_values<F>(s: &str, v: &GremlinValue, f: F) -> bool
1028where
1029    F: Fn(f64, f64) -> bool,
1030{
1031    match v {
1032        GremlinValue::Integer(i) => s.parse::<f64>().map(|n| f(n, *i as f64)).unwrap_or(false),
1033        GremlinValue::Float(fl) => s.parse::<f64>().map(|n| f(n, *fl)).unwrap_or(false),
1034        _ => false,
1035    }
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040    use super::*;
1041    use crate::storage::query::test_support::service_graph;
1042
1043    fn create_test_graph() -> Arc<GraphStore> {
1044        service_graph()
1045    }
1046
1047    #[test]
1048    fn test_gremlin_v() {
1049        let graph = create_test_graph();
1050        let executor = GremlinExecutor::new(graph);
1051
1052        let result = executor.execute("g.V()").unwrap();
1053        assert_eq!(result.records.len(), 4); // 4 nodes
1054    }
1055
1056    #[test]
1057    fn test_gremlin_v_out() {
1058        let graph = create_test_graph();
1059        let executor = GremlinExecutor::new(graph);
1060
1061        let result = executor.execute("g.V('host:10.0.0.1').out()").unwrap();
1062        assert_eq!(result.records.len(), 3); // 2 services + 1 host
1063    }
1064
1065    #[test]
1066    fn test_gremlin_has_label() {
1067        let graph = create_test_graph();
1068        let executor = GremlinExecutor::new(graph);
1069
1070        let result = executor.execute("g.V().hasLabel('Host')").unwrap();
1071        assert_eq!(result.records.len(), 2); // 2 hosts
1072    }
1073
1074    #[test]
1075    fn test_gremlin_limit() {
1076        let graph = create_test_graph();
1077        let executor = GremlinExecutor::new(graph);
1078
1079        let result = executor.execute("g.V().limit(2)").unwrap();
1080        assert_eq!(result.records.len(), 2);
1081    }
1082
1083    #[test]
1084    fn test_gremlin_count() {
1085        let graph = create_test_graph();
1086        let executor = GremlinExecutor::new(graph);
1087
1088        let result = executor.execute("g.V().count()").unwrap();
1089        assert_eq!(result.records.len(), 1);
1090    }
1091
1092    #[test]
1093    fn test_gremlin_path() {
1094        let graph = create_test_graph();
1095        let executor = GremlinExecutor::new(graph);
1096
1097        let result = executor
1098            .execute("g.V('host:10.0.0.1').out().path()")
1099            .unwrap();
1100        assert_eq!(result.records.len(), 3);
1101    }
1102
1103    #[test]
1104    fn test_gremlin_as_select() {
1105        let graph = create_test_graph();
1106        let executor = GremlinExecutor::new(graph);
1107
1108        let result = executor
1109            .execute("g.V('host:10.0.0.1').as('a').out().as('b').select('a', 'b')")
1110            .unwrap();
1111        assert_eq!(result.records.len(), 3);
1112    }
1113}