uni_algo/algo/cypher/
shortest_path.rs1use crate::algo::DirectTraversal;
7use crate::algo::procedures::{
8 AlgoContext, AlgoProcedure, AlgoResultRow, ProcedureSignature, ValueType,
9};
10use anyhow::{Result, anyhow};
11use futures::stream::{self, BoxStream, StreamExt};
12use serde_json::{Value, json};
13use uni_common::core::id::Vid;
14use uni_store::storage::direction::Direction;
15
16pub struct ShortestPathProcedure;
17
18impl AlgoProcedure for ShortestPathProcedure {
19 fn name(&self) -> &str {
20 "uni.algo.shortestPath"
21 }
22
23 fn signature(&self) -> ProcedureSignature {
24 ProcedureSignature {
25 args: vec![
26 ("sourceNode", ValueType::Node),
27 ("targetNode", ValueType::Node),
28 ("relationshipTypes", ValueType::List),
29 ],
30 optional_args: Vec::new(),
31 yields: vec![
32 ("nodeIds", ValueType::List),
33 ("edgeIds", ValueType::List),
34 ("length", ValueType::Int),
35 ],
36 }
37 }
38
39 fn execute(
40 &self,
41 ctx: AlgoContext,
42 args: Vec<Value>,
43 ) -> BoxStream<'static, Result<AlgoResultRow>> {
44 let signature = self.signature();
45 let args = match signature.validate_args(args) {
46 Ok(a) => a,
47 Err(e) => return stream::once(async { Err(e) }).boxed(),
48 };
49
50 let source_vid = match vid_from_value(&args[0]) {
51 Ok(v) => v,
52 Err(e) => return stream::once(async move { Err(e) }).boxed(),
53 };
54 let target_vid = match vid_from_value(&args[1]) {
55 Ok(v) => v,
56 Err(e) => return stream::once(async move { Err(e) }).boxed(),
57 };
58 let edge_types_str: Vec<String> = args[2]
59 .as_array()
60 .unwrap()
61 .iter()
62 .map(|v| v.as_str().unwrap().to_string())
63 .collect();
64
65 let result_stream = async move {
67 let schema = ctx.storage.schema_manager().schema();
69 let mut edge_type_ids = Vec::new();
70
71 for type_name in &edge_types_str {
72 let meta = schema
73 .edge_types
74 .get(type_name)
75 .ok_or_else(|| anyhow!("Edge type {} not found", type_name))?;
76 edge_type_ids.push(meta.id);
77
78 let edge_ver = ctx.storage.get_edge_version_by_id(meta.id);
79
80 ctx.storage
82 .warm_adjacency(meta.id, Direction::Outgoing, edge_ver)
83 .await?;
84
85 ctx.storage
87 .warm_adjacency(meta.id, Direction::Incoming, edge_ver)
88 .await?;
89 }
90
91 let am = ctx.storage.adjacency_manager();
92 let traversal = DirectTraversal::new(&am, edge_type_ids);
93
94 if let Some(path) = traversal.shortest_path(source_vid, target_vid, Direction::Outgoing)
95 {
96 Ok(Some(AlgoResultRow {
97 values: vec![
98 json!(path.vertices.iter().map(|v| v.as_u64()).collect::<Vec<_>>()),
99 json!(path.edges.iter().map(|e| e.as_u64()).collect::<Vec<_>>()),
100 json!(path.len()),
101 ],
102 }))
103 } else {
104 Ok(None)
105 }
106 };
107
108 stream::once(result_stream)
110 .filter_map(|res: Result<Option<AlgoResultRow>>| async move {
111 match res {
112 Ok(Some(row)) => Some(Ok(row)),
113 Ok(None) => None,
114 Err(e) => Some(Err(e)),
115 }
116 })
117 .boxed()
118 }
119}
120
121fn vid_from_value(val: &Value) -> Result<Vid> {
122 if let Some(s) = val.as_str() {
124 if let Ok(id) = s.parse::<u64>() {
126 return Ok(Vid::new(id));
127 }
128 let parts: Vec<_> = s.split(':').collect();
130 if parts.len() == 2
131 && let (Ok(l), Ok(o)) = (parts[0].parse::<u16>(), parts[1].parse::<u64>())
132 {
133 return Ok(Vid::new((l as u64) << 48 | o));
135 }
136 }
137 if let Some(v) = val.as_u64() {
138 return Ok(Vid::from(v));
139 }
140 Err(anyhow!("Invalid Vid format: {:?}", val))
141}