uni_algo/algo/cypher/
all_simple_paths.rs1use crate::algo::ProjectionBuilder;
7use crate::algo::algorithms::{Algorithm, AllSimplePaths, AllSimplePathsConfig};
8use crate::algo::procedures::{
9 AlgoContext, AlgoProcedure, AlgoResultRow, ProcedureSignature, ValueType,
10};
11use anyhow::Result;
12use futures::stream::{self, BoxStream, StreamExt};
13use serde_json::{Value, json};
14use uni_common::core::id::Vid;
15
16pub struct AllSimplePathsProcedure;
17
18impl AlgoProcedure for AllSimplePathsProcedure {
19 fn name(&self) -> &str {
20 "uni.algo.allSimplePaths"
21 }
22
23 fn signature(&self) -> ProcedureSignature {
24 ProcedureSignature {
25 args: vec![
26 ("startNode", ValueType::Node),
27 ("endNode", ValueType::Node),
28 ("relationshipTypes", ValueType::List),
29 ("maxLength", ValueType::Int),
30 ],
31 optional_args: vec![("nodeLabels", ValueType::List, Value::Null)],
32 yields: vec![("path", ValueType::List)],
33 }
34 }
35
36 fn execute(
37 &self,
38 ctx: AlgoContext,
39 args: Vec<Value>,
40 ) -> BoxStream<'static, Result<AlgoResultRow>> {
41 let signature = self.signature();
42 let args = match signature.validate_args(args) {
43 Ok(a) => a,
44 Err(e) => return stream::once(async { Err(e) }).boxed(),
45 };
46
47 let start_vid_u64 = match &args[0] {
48 Value::String(s) => s.parse::<u64>().unwrap_or(0),
49 Value::Number(n) => n.as_u64().unwrap_or(0),
50 _ => return stream::once(async { Err(anyhow::anyhow!("Invalid start node")) }).boxed(),
51 };
52
53 let end_vid_u64 = match &args[1] {
54 Value::String(s) => s.parse::<u64>().unwrap_or(0),
55 Value::Number(n) => n.as_u64().unwrap_or(0),
56 _ => return stream::once(async { Err(anyhow::anyhow!("Invalid end node")) }).boxed(),
57 };
58
59 let edge_types = args[2]
60 .as_array()
61 .unwrap()
62 .iter()
63 .map(|v| v.as_str().unwrap().to_string())
64 .collect::<Vec<_>>();
65 let max_len = args[3].as_u64().unwrap() as usize;
66
67 let node_labels = if args[4].is_null() {
68 Vec::new()
69 } else {
70 args[4]
71 .as_array()
72 .unwrap()
73 .iter()
74 .map(|v| v.as_str().unwrap().to_string())
75 .collect::<Vec<_>>()
76 };
77
78 let start_vid = Vid::from(start_vid_u64);
79 let end_vid = Vid::from(end_vid_u64);
80
81 let stream = async_stream::try_stream! {
82 let schema = ctx.storage.schema_manager().schema();
83
84 if !node_labels.is_empty() {
85 for label in &node_labels {
86 if !schema.labels.contains_key(label) {
87 Err(anyhow::anyhow!("Label '{}' not found", label))?;
88 }
89 }
90 }
91 for etype in &edge_types {
92 if !schema.edge_types.contains_key(etype) {
93 Err(anyhow::anyhow!("Edge type '{}' not found", etype))?;
94 }
95 }
96
97 let mut builder = ProjectionBuilder::new(ctx.storage.clone())
98 .l0_manager(ctx.l0_manager.clone())
99 .edge_types(&edge_types.iter().map(|s| s.as_str()).collect::<Vec<_>>());
100
101 if !node_labels.is_empty() {
102 builder = builder.node_labels(&node_labels.iter().map(|s| s.as_str()).collect::<Vec<_>>());
103 }
104
105 let projection = builder.build().await?;
106
107 let config = AllSimplePathsConfig {
108 source: start_vid,
109 target: end_vid,
110 max_depth: max_len,
111 limit: 1000,
112 min_depth: 0,
113 };
114
115 let result = tokio::task::spawn_blocking(move || {
116 AllSimplePaths::run(&projection, config)
117 }).await?;
118
119 for path in result.paths {
120 let path_json: Vec<Value> = path.into_iter().map(|v| json!(v.as_u64())).collect();
121 yield AlgoResultRow {
122 values: vec![
123 Value::Array(path_json),
124 ],
125 };
126 }
127 };
128
129 Box::pin(stream)
130 }
131}