Skip to main content

reddb_server/runtime/
graph_dsl.rs

1use super::*;
2
3pub(super) fn materialize_graph(store: &UnifiedStore) -> RedDBResult<GraphStore> {
4    materialize_graph_with_projection(store, None)
5}
6
7pub(super) fn materialize_graph_with_projection(
8    store: &UnifiedStore,
9    projection: Option<&RuntimeGraphProjection>,
10) -> RedDBResult<GraphStore> {
11    let graph = GraphStore::new();
12    // Phase 1.2 MVCC universal: capture the current connection's
13    // snapshot before `query_all` spawns parallel scan threads — the
14    // thread-local CURRENT_SNAPSHOT does not propagate into spawned
15    // workers, so we hand the context to the filter closure by move.
16    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
17    let entities = store.query_all(move |e| {
18        crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), e)
19    });
20    let node_label_filters = projection
21        .and_then(|projection| normalize_token_filter_list(projection.node_labels.clone()));
22    let node_type_filters = projection
23        .and_then(|projection| normalize_token_filter_list(projection.node_types.clone()));
24    let edge_label_filters = projection
25        .and_then(|projection| normalize_token_filter_list(projection.edge_labels.clone()));
26    let mut allowed_nodes = HashSet::new();
27
28    for (_, entity) in &entities {
29        if let EntityKind::GraphNode(ref node) = &entity.kind {
30            if !matches_graph_node_projection(
31                &node.label,
32                &node.node_type,
33                node_label_filters.as_ref(),
34                node_type_filters.as_ref(),
35            ) {
36                continue;
37            }
38            graph
39                .add_node_with_label(
40                    &entity.id.raw().to_string(),
41                    &node.label,
42                    &graph_node_label(&node.node_type),
43                )
44                .map_err(|err| RedDBError::Query(err.to_string()))?;
45            allowed_nodes.insert(entity.id.raw().to_string());
46        }
47    }
48
49    for (_, entity) in &entities {
50        if let EntityKind::GraphEdge(ref edge) = &entity.kind {
51            if !allowed_nodes.contains(&edge.from_node) || !allowed_nodes.contains(&edge.to_node) {
52                continue;
53            }
54            if !matches_graph_edge_projection(&edge.label, edge_label_filters.as_ref()) {
55                continue;
56            }
57            let resolved_weight = match &entity.data {
58                EntityData::Edge(e) => e.weight,
59                _ => edge.weight as f32 / 1000.0,
60            };
61
62            graph
63                .add_edge_with_label(
64                    &edge.from_node,
65                    &edge.to_node,
66                    &graph_edge_label(&edge.label),
67                    resolved_weight,
68                )
69                .map_err(|err| RedDBError::Query(err.to_string()))?;
70        }
71    }
72
73    Ok(graph)
74}
75
76/// Lazy graph materialization — only loads nodes reachable from seed IDs via BFS.
77/// Much faster than materialize_graph() when you only need a subgraph.
78pub(super) fn materialize_graph_lazy(
79    store: &UnifiedStore,
80    seed_entity_ids: &[u64],
81    max_depth: usize,
82) -> RedDBResult<GraphStore> {
83    let graph = GraphStore::new();
84    let mut visited_nodes: HashSet<String> = HashSet::new();
85    let mut queue: VecDeque<(String, usize)> = VecDeque::new();
86
87    // Phase 1: Load seed nodes
88    for &id in seed_entity_ids {
89        let id_str = id.to_string();
90        if visited_nodes.contains(&id_str) {
91            continue;
92        }
93        if let Some((_, entity)) = store.get_any(EntityId::new(id)) {
94            if let EntityKind::GraphNode(ref node) = &entity.kind {
95                let _ = graph.add_node_with_label(
96                    &id_str,
97                    &node.label,
98                    &graph_node_label(&node.node_type),
99                );
100                visited_nodes.insert(id_str.clone());
101                queue.push_back((id_str, 0));
102            }
103        }
104    }
105
106    // Phase 2: BFS — load neighbors on demand
107    // Collect edges from all collections in parallel
108    let collections = store.list_collections();
109    let use_parallel = collections.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
110    let all_edges: Vec<UnifiedEntity> = if use_parallel {
111        let store_ref = &store;
112        let edge_batches: Vec<Vec<UnifiedEntity>> = std::thread::scope(|s| {
113            collections
114                .iter()
115                .map(|col| {
116                    s.spawn(move || {
117                        store_ref
118                            .get_collection(col)
119                            .map(|m| m.query_all(|e| matches!(e.kind, EntityKind::GraphEdge(_))))
120                            .unwrap_or_default()
121                    })
122                })
123                .collect::<Vec<_>>()
124                .into_iter()
125                .map(|h| h.join().unwrap_or_default())
126                .collect()
127        });
128        edge_batches.into_iter().flatten().collect()
129    } else {
130        collections
131            .iter()
132            .flat_map(|col| {
133                store
134                    .get_collection(col)
135                    .map(|m| m.query_all(|e| matches!(e.kind, EntityKind::GraphEdge(_))))
136                    .unwrap_or_default()
137            })
138            .collect()
139    };
140
141    // Build adjacency from edges
142    let mut adjacency: HashMap<String, Vec<(String, String, String, f32)>> = HashMap::new();
143    for entity in &all_edges {
144        if let EntityKind::GraphEdge(ref edge) = &entity.kind {
145            let w = match &entity.data {
146                EntityData::Edge(e) => e.weight,
147                _ => edge.weight as f32 / 1000.0,
148            };
149            adjacency.entry(edge.from_node.clone()).or_default().push((
150                edge.to_node.clone(),
151                edge.label.clone(),
152                entity.id.raw().to_string(),
153                w,
154            ));
155            adjacency.entry(edge.to_node.clone()).or_default().push((
156                edge.from_node.clone(),
157                edge.label.clone(),
158                entity.id.raw().to_string(),
159                w,
160            ));
161        }
162    }
163
164    while let Some((node_id, depth)) = queue.pop_front() {
165        if depth >= max_depth {
166            continue;
167        }
168        if let Some(neighbors) = adjacency.get(&node_id) {
169            for (neighbor_id, label, _edge_id, weight) in neighbors {
170                // Add neighbor node if not visited
171                if !visited_nodes.contains(neighbor_id) {
172                    if let Ok(parsed) = neighbor_id.parse::<u64>() {
173                        if let Some((_, entity)) = store.get_any(EntityId::new(parsed)) {
174                            if let EntityKind::GraphNode(ref node) = &entity.kind {
175                                let _ = graph.add_node_with_label(
176                                    neighbor_id,
177                                    &node.label,
178                                    &graph_node_label(&node.node_type),
179                                );
180                                visited_nodes.insert(neighbor_id.clone());
181                                queue.push_back((neighbor_id.clone(), depth + 1));
182                            }
183                        }
184                    }
185                }
186                // Add edge
187                if visited_nodes.contains(neighbor_id) {
188                    let _ = graph.add_edge_with_label(
189                        &node_id,
190                        neighbor_id,
191                        &graph_edge_label(label),
192                        *weight,
193                    );
194                }
195            }
196        }
197    }
198
199    Ok(graph)
200}
201
202pub(super) fn materialize_graph_node_properties(
203    store: &UnifiedStore,
204) -> RedDBResult<HashMap<String, HashMap<String, Value>>> {
205    let mut node_properties = HashMap::new();
206
207    for (_, entity) in store.query_all(|_| true) {
208        if let (EntityKind::GraphNode(_), EntityData::Node(node)) = (&entity.kind, &entity.data) {
209            node_properties.insert(entity.id.raw().to_string(), node.properties.clone());
210        }
211    }
212
213    Ok(node_properties)
214}
215
216pub(super) fn materialize_graph_edge_properties(
217    store: &UnifiedStore,
218) -> RedDBResult<crate::storage::query::unified::EdgeProperties> {
219    let mut edge_properties = HashMap::new();
220
221    for (_, entity) in store.query_all(|_| true) {
222        if let (EntityKind::GraphEdge(edge), EntityData::Edge(edge_data)) =
223            (&entity.kind, &entity.data)
224        {
225            edge_properties.insert(
226                (
227                    edge.from_node.clone(),
228                    graph_edge_label(&edge.label),
229                    edge.to_node.clone(),
230                ),
231                edge_data.properties.clone(),
232            );
233        }
234    }
235
236    Ok(edge_properties)
237}
238
239pub(super) fn normalize_token_filter_list(values: Option<Vec<String>>) -> Option<BTreeSet<String>> {
240    values
241        .map(|values| {
242            values
243                .into_iter()
244                .map(|value| normalize_graph_token(&value))
245                .filter(|value| !value.is_empty())
246                .collect::<BTreeSet<_>>()
247        })
248        .filter(|set| !set.is_empty())
249}
250
251pub(super) fn matches_graph_node_projection(
252    label: &str,
253    node_type: &str,
254    label_filters: Option<&BTreeSet<String>>,
255    node_type_filters: Option<&BTreeSet<String>>,
256) -> bool {
257    let label_ok =
258        label_filters.is_none_or(|filters| filters.contains(&normalize_graph_token(label)));
259    let node_type_ok =
260        node_type_filters.is_none_or(|filters| filters.contains(&normalize_graph_token(node_type)));
261    label_ok && node_type_ok
262}
263
264pub(super) fn matches_graph_edge_projection(
265    label: &str,
266    edge_filters: Option<&BTreeSet<String>>,
267) -> bool {
268    edge_filters.is_none_or(|filters| filters.contains(&normalize_graph_token(label)))
269}
270
271pub(super) fn ensure_graph_node(graph: &GraphStore, id: &str) -> RedDBResult<()> {
272    if graph.has_node(id) {
273        Ok(())
274    } else {
275        Err(RedDBError::NotFound(id.to_string()))
276    }
277}
278
279/// Resolve a user-supplied graph node reference to its canonical entity id.
280///
281/// Accepts either a numeric entity id (e.g. `"177"`) — returned as-is when the
282/// node exists — or a node label (e.g. `"cinderella"`) resolved via the label
283/// secondary index. Errors when the label resolves to more than one node, so
284/// callers can fall back to the numeric id form.
285pub(super) fn resolve_graph_node_id(graph: &GraphStore, input: &str) -> RedDBResult<String> {
286    if graph.has_node(input) {
287        return Ok(input.to_string());
288    }
289    let matches = graph.nodes_by_label(input);
290    match matches.len() {
291        0 => Err(RedDBError::NotFound(input.to_string())),
292        1 => Ok(matches.into_iter().next().unwrap().id),
293        n => Err(RedDBError::Query(format!(
294            "ambiguous graph node reference '{input}': matches {n} nodes by label; use the numeric id"
295        ))),
296    }
297}
298
299pub(super) fn stored_node_to_runtime(node: StoredNode) -> RuntimeGraphNode {
300    RuntimeGraphNode {
301        id: node.id,
302        label: node.label,
303        node_type: node.node_type.as_str().to_string(),
304        out_edge_count: node.out_edge_count,
305        in_edge_count: node.in_edge_count,
306    }
307}
308
309pub(super) fn path_to_runtime(
310    graph: &GraphStore,
311    path: &crate::storage::engine::pathfinding::Path,
312) -> RuntimeGraphPath {
313    let nodes = path
314        .nodes
315        .iter()
316        .filter_map(|id| graph.get_node(id))
317        .map(stored_node_to_runtime)
318        .collect();
319
320    let mut edges = Vec::new();
321    for index in 0..path.edge_types.len() {
322        let Some(source) = path.nodes.get(index) else {
323            continue;
324        };
325        let Some(target) = path.nodes.get(index + 1) else {
326            continue;
327        };
328        let Some(edge_type) = path.edge_types.get(index) else {
329            continue;
330        };
331        let weight = graph
332            .outgoing_edges(source)
333            .into_iter()
334            .find(|(candidate_type, candidate_target, _)| {
335                candidate_type.as_str() == edge_type.as_str() && candidate_target == target
336            })
337            .map(|(_, _, weight)| weight)
338            .unwrap_or(0.0);
339        edges.push(RuntimeGraphEdge {
340            source: source.clone(),
341            target: target.clone(),
342            edge_type: edge_type.as_str().to_string(),
343            weight,
344        });
345    }
346
347    RuntimeGraphPath {
348        hop_count: path.len(),
349        total_weight: path.total_weight,
350        nodes,
351        edges,
352    }
353}
354
355pub(super) fn cycle_to_runtime(
356    graph: &GraphStore,
357    cycle: crate::storage::engine::Cycle,
358) -> RuntimeGraphPath {
359    let nodes = cycle
360        .nodes
361        .iter()
362        .filter_map(|id| graph.get_node(id))
363        .map(stored_node_to_runtime)
364        .collect::<Vec<_>>();
365    let mut edges = Vec::new();
366    let mut total_weight = 0.0;
367
368    for window in cycle.nodes.windows(2) {
369        let Some(source) = window.first() else {
370            continue;
371        };
372        let Some(target) = window.get(1) else {
373            continue;
374        };
375        if let Some((edge_type, _, weight)) = graph
376            .outgoing_edges(source)
377            .into_iter()
378            .find(|(_, candidate_target, _)| candidate_target == target)
379        {
380            total_weight += weight as f64;
381            edges.push(RuntimeGraphEdge {
382                source: source.clone(),
383                target: target.clone(),
384                edge_type: edge_type.as_str().to_string(),
385                weight,
386            });
387        }
388    }
389
390    RuntimeGraphPath {
391        hop_count: cycle.length,
392        total_weight,
393        nodes,
394        edges,
395    }
396}
397
398pub(super) fn normalize_edge_filters(edge_labels: Option<Vec<String>>) -> Option<BTreeSet<String>> {
399    edge_labels
400        .map(|labels| {
401            labels
402                .into_iter()
403                .map(|label| normalize_graph_token(&label))
404                .filter(|label| !label.is_empty())
405                .collect()
406        })
407        .filter(|set: &BTreeSet<String>| !set.is_empty())
408}
409
410pub(super) fn merge_edge_filters(
411    edge_labels: Option<Vec<String>>,
412    projection: Option<&RuntimeGraphProjection>,
413) -> Option<BTreeSet<String>> {
414    let mut merged = BTreeSet::new();
415
416    if let Some(filters) = normalize_edge_filters(edge_labels) {
417        merged.extend(filters);
418    }
419
420    if let Some(filters) = projection
421        .and_then(|projection| normalize_token_filter_list(projection.edge_labels.clone()))
422    {
423        merged.extend(filters);
424    }
425
426    if merged.is_empty() {
427        None
428    } else {
429        Some(merged)
430    }
431}
432
433pub(super) fn merge_runtime_projection(
434    base: Option<RuntimeGraphProjection>,
435    overlay: Option<RuntimeGraphProjection>,
436) -> Option<RuntimeGraphProjection> {
437    let merge_list =
438        |left: Option<Vec<String>>, right: Option<Vec<String>>| -> Option<Vec<String>> {
439            let mut values = BTreeSet::new();
440            if let Some(left) = left {
441                values.extend(left);
442            }
443            if let Some(right) = right {
444                values.extend(right);
445            }
446            if values.is_empty() {
447                None
448            } else {
449                Some(values.into_iter().collect())
450            }
451        };
452
453    let _ = base.clone().or(overlay.clone())?;
454
455    Some(RuntimeGraphProjection {
456        node_labels: merge_list(
457            base.as_ref()
458                .and_then(|projection| projection.node_labels.clone()),
459            overlay
460                .as_ref()
461                .and_then(|projection| projection.node_labels.clone()),
462        ),
463        node_types: merge_list(
464            base.as_ref()
465                .and_then(|projection| projection.node_types.clone()),
466            overlay
467                .as_ref()
468                .and_then(|projection| projection.node_types.clone()),
469        ),
470        edge_labels: merge_list(
471            base.as_ref()
472                .and_then(|projection| projection.edge_labels.clone()),
473            overlay
474                .as_ref()
475                .and_then(|projection| projection.edge_labels.clone()),
476        ),
477    })
478}
479
480pub(super) fn edge_allowed(edge_label: &str, filters: Option<&BTreeSet<String>>) -> bool {
481    filters.is_none_or(|filters| filters.contains(&normalize_graph_token(edge_label)))
482}
483
484pub(super) fn graph_adjacent_edges(
485    graph: &GraphStore,
486    node: &str,
487    direction: RuntimeGraphDirection,
488    edge_filters: Option<&BTreeSet<String>>,
489) -> Vec<(String, RuntimeGraphEdge)> {
490    let mut adjacent = Vec::new();
491
492    if matches!(
493        direction,
494        RuntimeGraphDirection::Outgoing | RuntimeGraphDirection::Both
495    ) {
496        for (edge_type, target, weight) in graph.outgoing_edges(node) {
497            if edge_allowed(edge_type.as_str(), edge_filters) {
498                adjacent.push((
499                    target.clone(),
500                    RuntimeGraphEdge {
501                        source: node.to_string(),
502                        target,
503                        edge_type: edge_type.as_str().to_string(),
504                        weight,
505                    },
506                ));
507            }
508        }
509    }
510
511    if matches!(
512        direction,
513        RuntimeGraphDirection::Incoming | RuntimeGraphDirection::Both
514    ) {
515        for (edge_type, source, weight) in graph.incoming_edges(node) {
516            if edge_allowed(edge_type.as_str(), edge_filters) {
517                adjacent.push((
518                    source.clone(),
519                    RuntimeGraphEdge {
520                        source,
521                        target: node.to_string(),
522                        edge_type: edge_type.as_str().to_string(),
523                        weight,
524                    },
525                ));
526            }
527        }
528    }
529
530    adjacent
531}
532
533pub(super) fn push_runtime_edge(
534    edges: &mut Vec<RuntimeGraphEdge>,
535    seen_edges: &mut HashSet<(String, String, String, u32)>,
536    edge: RuntimeGraphEdge,
537) {
538    let key = (
539        edge.source.clone(),
540        edge.target.clone(),
541        edge.edge_type.clone(),
542        edge.weight.to_bits(),
543    );
544    if seen_edges.insert(key) {
545        edges.push(edge);
546    }
547}
548
549#[derive(Clone)]
550pub(super) struct RuntimeDijkstraState {
551    node: String,
552    cost: f64,
553}
554
555impl PartialEq for RuntimeDijkstraState {
556    fn eq(&self, other: &Self) -> bool {
557        self.node == other.node && self.cost == other.cost
558    }
559}
560
561impl Eq for RuntimeDijkstraState {}
562
563impl Ord for RuntimeDijkstraState {
564    fn cmp(&self, other: &Self) -> Ordering {
565        other
566            .cost
567            .partial_cmp(&self.cost)
568            .unwrap_or(Ordering::Equal)
569    }
570}
571
572impl PartialOrd for RuntimeDijkstraState {
573    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
574        Some(self.cmp(other))
575    }
576}
577
578pub(super) fn shortest_path_runtime(
579    graph: &GraphStore,
580    source: &str,
581    target: &str,
582    direction: RuntimeGraphDirection,
583    algorithm: RuntimeGraphPathAlgorithm,
584    edge_filters: Option<&BTreeSet<String>>,
585) -> RedDBResult<RuntimeGraphPathResult> {
586    let mut nodes_visited = 0;
587    let (path, negative_cycle_detected) = match algorithm {
588        RuntimeGraphPathAlgorithm::Bfs => {
589            let mut queue = VecDeque::new();
590            let mut visited = HashSet::new();
591            let mut previous: HashMap<String, (String, RuntimeGraphEdge)> = HashMap::new();
592
593            queue.push_back(source.to_string());
594            visited.insert(source.to_string());
595
596            while let Some(current) = queue.pop_front() {
597                nodes_visited += 1;
598                if current == target {
599                    break;
600                }
601                let mut adjacent = graph_adjacent_edges(graph, &current, direction, edge_filters);
602                adjacent.sort_by(|left, right| left.0.cmp(&right.0));
603                for (neighbor, edge) in adjacent {
604                    if visited.insert(neighbor.clone()) {
605                        previous.insert(neighbor.clone(), (current.clone(), edge));
606                        queue.push_back(neighbor);
607                    }
608                }
609            }
610
611            (rebuild_runtime_path(graph, source, target, &previous), None)
612        }
613        RuntimeGraphPathAlgorithm::Dijkstra | RuntimeGraphPathAlgorithm::AStar => {
614            let mut dist: HashMap<String, f64> = HashMap::new();
615            let mut previous: HashMap<String, (String, RuntimeGraphEdge)> = HashMap::new();
616            let mut heap = BinaryHeap::new();
617
618            dist.insert(source.to_string(), 0.0);
619            heap.push(RuntimeDijkstraState {
620                node: source.to_string(),
621                cost: 0.0,
622            });
623
624            while let Some(RuntimeDijkstraState { node, cost }) = heap.pop() {
625                nodes_visited += 1;
626                if node == target {
627                    break;
628                }
629                if let Some(best) = dist.get(&node) {
630                    if cost > *best {
631                        continue;
632                    }
633                }
634
635                let mut adjacent = graph_adjacent_edges(graph, &node, direction, edge_filters);
636                adjacent.sort_by(|left, right| left.0.cmp(&right.0));
637                for (neighbor, edge) in adjacent {
638                    let next_cost = cost + edge.weight as f64;
639                    if dist.get(&neighbor).is_none_or(|best| next_cost < *best) {
640                        dist.insert(neighbor.clone(), next_cost);
641                        previous.insert(neighbor.clone(), (node.clone(), edge));
642                        heap.push(RuntimeDijkstraState {
643                            node: neighbor,
644                            cost: next_cost,
645                        });
646                    }
647                }
648            }
649
650            (rebuild_runtime_path(graph, source, target, &previous), None)
651        }
652        RuntimeGraphPathAlgorithm::BellmanFord => {
653            let nodes: Vec<String> = graph.iter_nodes().map(|node| node.id.clone()).collect();
654            let mut dist: HashMap<String, f64> = nodes
655                .iter()
656                .map(|node| (node.clone(), f64::INFINITY))
657                .collect();
658            let mut previous: HashMap<String, (String, RuntimeGraphEdge)> = HashMap::new();
659
660            dist.insert(source.to_string(), 0.0);
661
662            for _ in 0..nodes.len().saturating_sub(1) {
663                let mut changed = false;
664
665                for node in &nodes {
666                    nodes_visited += 1;
667                    let Some(current_dist) = dist.get(node).copied() else {
668                        continue;
669                    };
670                    if !current_dist.is_finite() {
671                        continue;
672                    }
673
674                    let mut adjacent = graph_adjacent_edges(graph, node, direction, edge_filters);
675                    adjacent.sort_by(|left, right| left.0.cmp(&right.0));
676                    for (neighbor, edge) in adjacent {
677                        let next_cost = current_dist + edge.weight as f64;
678                        if dist.get(&neighbor).is_none_or(|best| next_cost < *best) {
679                            dist.insert(neighbor.clone(), next_cost);
680                            previous.insert(neighbor, (node.clone(), edge));
681                            changed = true;
682                        }
683                    }
684                }
685
686                if !changed {
687                    break;
688                }
689            }
690
691            let mut has_negative_cycle = false;
692            for node in &nodes {
693                let Some(current_dist) = dist.get(node).copied() else {
694                    continue;
695                };
696                if !current_dist.is_finite() {
697                    continue;
698                }
699
700                let adjacent = graph_adjacent_edges(graph, node, direction, edge_filters);
701                for (neighbor, edge) in adjacent {
702                    let next_cost = current_dist + edge.weight as f64;
703                    if dist.get(&neighbor).is_none_or(|best| next_cost < *best) {
704                        has_negative_cycle = true;
705                        break;
706                    }
707                }
708
709                if has_negative_cycle {
710                    break;
711                }
712            }
713
714            let path = if has_negative_cycle {
715                None
716            } else {
717                rebuild_runtime_path(graph, source, target, &previous)
718            };
719            (path, Some(has_negative_cycle))
720        }
721    };
722
723    Ok(RuntimeGraphPathResult {
724        source: source.to_string(),
725        target: target.to_string(),
726        direction,
727        algorithm,
728        nodes_visited,
729        negative_cycle_detected,
730        path,
731    })
732}
733
734pub(super) fn rebuild_runtime_path(
735    graph: &GraphStore,
736    source: &str,
737    target: &str,
738    previous: &HashMap<String, (String, RuntimeGraphEdge)>,
739) -> Option<RuntimeGraphPath> {
740    if source != target && !previous.contains_key(target) {
741        return None;
742    }
743
744    let mut node_ids = vec![target.to_string()];
745    let mut edges = Vec::new();
746    let mut current = target.to_string();
747
748    while current != source {
749        let (parent, edge) = previous.get(&current)?.clone();
750        edges.push(edge);
751        node_ids.push(parent.clone());
752        current = parent;
753    }
754
755    node_ids.reverse();
756    edges.reverse();
757
758    let total_weight = edges.iter().map(|edge| edge.weight as f64).sum();
759    let nodes = node_ids
760        .iter()
761        .filter_map(|id| graph.get_node(id))
762        .map(stored_node_to_runtime)
763        .collect();
764
765    Some(RuntimeGraphPath {
766        hop_count: node_ids.len().saturating_sub(1),
767        total_weight,
768        nodes,
769        edges,
770    })
771}
772
773pub(super) fn top_runtime_scores(
774    graph: &GraphStore,
775    scores: HashMap<String, f64>,
776    top_k: usize,
777) -> Vec<RuntimeGraphCentralityScore> {
778    let mut pairs: Vec<_> = scores.into_iter().collect();
779    pairs.sort_by(|left, right| {
780        right
781            .1
782            .partial_cmp(&left.1)
783            .unwrap_or(Ordering::Equal)
784            .then_with(|| left.0.cmp(&right.0))
785    });
786    pairs.truncate(top_k.max(1));
787    pairs
788        .into_iter()
789        .filter_map(|(node_id, score)| {
790            graph
791                .get_node(&node_id)
792                .map(|node| RuntimeGraphCentralityScore {
793                    node: stored_node_to_runtime(node),
794                    score,
795                })
796        })
797        .collect()
798}
799
800/// Normalise a user-supplied node-type token to its canonical lower-snake-case
801/// form. Pentest-flavoured aliases (`tech`, `cert`) are kept as a courtesy
802/// but the result is just a label string the caller can intern into the
803/// [`crate::storage::engine::graph_store::LabelRegistry`].
804pub(super) fn graph_node_label(input: &str) -> String {
805    let token = normalize_graph_token(input);
806    match token.as_str() {
807        "host" | "service" | "credential" | "vulnerability" | "endpoint" | "technology"
808        | "user" | "domain" | "certificate" => token,
809        "tech" => "technology".to_string(),
810        "cert" => "certificate".to_string(),
811        // Unknown token: pass through so callers can register new labels.
812        _ if !token.is_empty() => token,
813        _ => "endpoint".to_string(),
814    }
815}
816
817/// Edge-label counterpart to [`graph_node_label`].
818pub(super) fn graph_edge_label(input: &str) -> String {
819    let token = normalize_graph_token(input);
820    match token.as_str() {
821        "hasservice" => "has_service".to_string(),
822        "hasendpoint" => "has_endpoint".to_string(),
823        "usestech" | "usestechnology" => "uses_tech".to_string(),
824        "authaccess" | "hascredential" => "auth_access".to_string(),
825        "affectedby" => "affected_by".to_string(),
826        "contains" => "contains".to_string(),
827        "connectsto" | "connects" => "connects_to".to_string(),
828        "relatedto" | "related" => "related_to".to_string(),
829        "hasuser" => "has_user".to_string(),
830        "hascert" | "hascertificate" => "has_cert".to_string(),
831        _ if !token.is_empty() => input.trim().to_ascii_lowercase(),
832        _ => "related_to".to_string(),
833    }
834}
835
836pub(super) fn normalize_graph_token(input: &str) -> String {
837    input
838        .chars()
839        .filter(|ch| ch.is_ascii_alphanumeric())
840        .flat_map(|ch| ch.to_lowercase())
841        .collect()
842}
843
844#[derive(Debug, Clone)]
845pub struct RuntimeGraphPattern {
846    pub node_label: Option<String>,
847    pub node_type: Option<String>,
848    pub edge_labels: Vec<String>,
849}
850
851#[derive(Debug, Clone, Default)]
852pub struct RuntimeGraphProjection {
853    pub node_labels: Option<Vec<String>>,
854    pub node_types: Option<Vec<String>>,
855    pub edge_labels: Option<Vec<String>>,
856}
857
858#[derive(Debug, Clone, Copy)]
859pub struct RuntimeQueryWeights {
860    pub vector: f32,
861    pub graph: f32,
862    pub filter: f32,
863}
864
865#[derive(Debug, Clone)]
866pub struct RuntimeFilter {
867    pub field: String,
868    pub op: String,
869    pub value: Option<RuntimeFilterValue>,
870}
871
872#[derive(Debug, Clone)]
873pub enum RuntimeFilterValue {
874    String(String),
875    Int(i64),
876    Float(f64),
877    Bool(bool),
878    Null,
879    List(Vec<RuntimeFilterValue>),
880    Range(Box<RuntimeFilterValue>, Box<RuntimeFilterValue>),
881}
882
883pub(super) fn runtime_filter_to_dsl(filter: RuntimeFilter) -> RedDBResult<DslFilter> {
884    Ok(DslFilter {
885        field: filter.field,
886        op: parse_runtime_filter_op(&filter.op)?,
887        value: match filter.value {
888            Some(value) => runtime_filter_value_to_dsl(value),
889            None => DslFilterValue::Null,
890        },
891    })
892}
893
894pub(super) fn parse_runtime_filter_op(op: &str) -> RedDBResult<DslFilterOp> {
895    match op.trim().to_ascii_lowercase().as_str() {
896        "eq" | "equals" => Ok(DslFilterOp::Equals),
897        "ne" | "not_equals" | "not-equals" => Ok(DslFilterOp::NotEquals),
898        "gt" | "greater_than" | "greater-than" => Ok(DslFilterOp::GreaterThan),
899        "gte" | "greater_than_or_equals" | "greater-than-or-equals" => {
900            Ok(DslFilterOp::GreaterThanOrEquals)
901        }
902        "lt" | "less_than" | "less-than" => Ok(DslFilterOp::LessThan),
903        "lte" | "less_than_or_equals" | "less-than-or-equals" => Ok(DslFilterOp::LessThanOrEquals),
904        "contains" => Ok(DslFilterOp::Contains),
905        "starts_with" | "starts-with" => Ok(DslFilterOp::StartsWith),
906        "ends_with" | "ends-with" => Ok(DslFilterOp::EndsWith),
907        "in" | "in_list" | "in-list" => Ok(DslFilterOp::In),
908        "between" => Ok(DslFilterOp::Between),
909        "is_null" | "is-null" => Ok(DslFilterOp::IsNull),
910        "is_not_null" | "is-not-null" => Ok(DslFilterOp::IsNotNull),
911        other => Err(RedDBError::Query(format!(
912            "unsupported hybrid filter op: {other}"
913        ))),
914    }
915}
916
917pub(super) fn runtime_filter_value_to_dsl(value: RuntimeFilterValue) -> DslFilterValue {
918    match value {
919        RuntimeFilterValue::String(value) => DslFilterValue::String(value),
920        RuntimeFilterValue::Int(value) => DslFilterValue::Int(value),
921        RuntimeFilterValue::Float(value) => DslFilterValue::Float(value),
922        RuntimeFilterValue::Bool(value) => DslFilterValue::Bool(value),
923        RuntimeFilterValue::Null => DslFilterValue::Null,
924        RuntimeFilterValue::List(values) => DslFilterValue::List(
925            values
926                .into_iter()
927                .map(runtime_filter_value_to_dsl)
928                .collect(),
929        ),
930        RuntimeFilterValue::Range(start, end) => DslFilterValue::Range(
931            Box::new(runtime_filter_value_to_dsl(*start)),
932            Box::new(runtime_filter_value_to_dsl(*end)),
933        ),
934    }
935}