Skip to main content

uni_algo/algo/cypher/
shortest_path.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! uni.algo.shortestPath procedure implementation.
5
6use crate::algo::DirectTraversal;
7use crate::algo::procedures::{
8    AlgoContext, AlgoProcedure, AlgoResultRow, ProcedureSignature, ValueType,
9};
10use anyhow::{Result, anyhow};
11use futures::stream::{self, BoxStream, StreamExt};
12use serde_json::{Value, json};
13use uni_common::core::id::Vid;
14use uni_store::storage::direction::Direction;
15
16pub struct ShortestPathProcedure;
17
18impl AlgoProcedure for ShortestPathProcedure {
19    fn name(&self) -> &str {
20        "uni.algo.shortestPath"
21    }
22
23    fn signature(&self) -> ProcedureSignature {
24        ProcedureSignature {
25            args: vec![
26                ("sourceNode", ValueType::Node),
27                ("targetNode", ValueType::Node),
28                ("relationshipTypes", ValueType::List),
29            ],
30            optional_args: Vec::new(),
31            yields: vec![
32                ("nodeIds", ValueType::List),
33                ("edgeIds", ValueType::List),
34                ("length", ValueType::Int),
35            ],
36        }
37    }
38
39    fn execute(
40        &self,
41        ctx: AlgoContext,
42        args: Vec<Value>,
43    ) -> BoxStream<'static, Result<AlgoResultRow>> {
44        let signature = self.signature();
45        let args = match signature.validate_args(args) {
46            Ok(a) => a,
47            Err(e) => return stream::once(async { Err(e) }).boxed(),
48        };
49
50        let source_vid = match vid_from_value(&args[0]) {
51            Ok(v) => v,
52            Err(e) => return stream::once(async move { Err(e) }).boxed(),
53        };
54        let target_vid = match vid_from_value(&args[1]) {
55            Ok(v) => v,
56            Err(e) => return stream::once(async move { Err(e) }).boxed(),
57        };
58        let edge_types_str: Vec<String> = args[2]
59            .as_array()
60            .unwrap()
61            .iter()
62            .map(|v| v.as_str().unwrap().to_string())
63            .collect();
64
65        // Use stream::once with an async block for the single result
66        let result_stream = async move {
67            // 1. Resolve edge types and warm adjacency
68            let schema = ctx.storage.schema_manager().schema();
69            let mut edge_type_ids = Vec::new();
70
71            for type_name in &edge_types_str {
72                let meta = schema
73                    .edge_types
74                    .get(type_name)
75                    .ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
76                edge_type_ids.push(meta.id);
77
78                let edge_ver = ctx.storage.get_edge_version_by_id(meta.id);
79
80                // Warm Outgoing
81                ctx.storage
82                    .warm_adjacency(meta.id, Direction::Outgoing, edge_ver)
83                    .await?;
84
85                // Warm Incoming
86                ctx.storage
87                    .warm_adjacency(meta.id, Direction::Incoming, edge_ver)
88                    .await?;
89            }
90
91            let am = ctx.storage.adjacency_manager();
92            let traversal = DirectTraversal::new(&am, edge_type_ids);
93
94            if let Some(path) = traversal.shortest_path(source_vid, target_vid, Direction::Outgoing)
95            {
96                Ok(Some(AlgoResultRow {
97                    values: vec![
98                        json!(path.vertices.iter().map(|v| v.as_u64()).collect::<Vec<_>>()),
99                        json!(path.edges.iter().map(|e| e.as_u64()).collect::<Vec<_>>()),
100                        json!(path.len()),
101                    ],
102                }))
103            } else {
104                Ok(None)
105            }
106        };
107
108        // Convert the async block to a stream, filtering out None results
109        stream::once(result_stream)
110            .filter_map(|res: Result<Option<AlgoResultRow>>| async move {
111                match res {
112                    Ok(Some(row)) => Some(Ok(row)),
113                    Ok(None) => None,
114                    Err(e) => Some(Err(e)),
115                }
116            })
117            .boxed()
118    }
119}
120
121fn vid_from_value(val: &Value) -> Result<Vid> {
122    // In the new storage model, VIDs are pure auto-increment integers
123    if let Some(s) = val.as_str() {
124        // Try parsing as simple integer first
125        if let Ok(id) = s.parse::<u64>() {
126            return Ok(Vid::new(id));
127        }
128        // Legacy format "label:offset" - parse and combine
129        let parts: Vec<_> = s.split(':').collect();
130        if parts.len() == 2
131            && let (Ok(l), Ok(o)) = (parts[0].parse::<u16>(), parts[1].parse::<u64>())
132        {
133            // Legacy: combine label and offset for backward compat
134            return Ok(Vid::new((l as u64) << 48 | o));
135        }
136    }
137    if let Some(v) = val.as_u64() {
138        return Ok(Vid::from(v));
139    }
140    Err(anyhow!("Invalid Vid format: {:?}", val))
141}