Skip to main content

uni_algo/algo/cypher/
all_simple_paths.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! uni.algo.allSimplePaths procedure implementation.
5
6use crate::algo::ProjectionBuilder;
7use crate::algo::algorithms::{Algorithm, AllSimplePaths, AllSimplePathsConfig};
8use crate::algo::procedures::{
9    AlgoContext, AlgoProcedure, AlgoResultRow, ProcedureSignature, ValueType,
10};
11use anyhow::Result;
12use futures::stream::{self, BoxStream, StreamExt};
13use serde_json::{Value, json};
14use uni_common::core::id::Vid;
15
16pub struct AllSimplePathsProcedure;
17
18impl AlgoProcedure for AllSimplePathsProcedure {
19    fn name(&self) -> &str {
20        "uni.algo.allSimplePaths"
21    }
22
23    fn signature(&self) -> ProcedureSignature {
24        ProcedureSignature {
25            args: vec![
26                ("startNode", ValueType::Node),
27                ("endNode", ValueType::Node),
28                ("relationshipTypes", ValueType::List),
29                ("maxLength", ValueType::Int),
30            ],
31            optional_args: vec![("nodeLabels", ValueType::List, Value::Null)],
32            yields: vec![("path", ValueType::List)],
33        }
34    }
35
36    fn execute(
37        &self,
38        ctx: AlgoContext,
39        args: Vec<Value>,
40    ) -> BoxStream<'static, Result<AlgoResultRow>> {
41        let signature = self.signature();
42        let args = match signature.validate_args(args) {
43            Ok(a) => a,
44            Err(e) => return stream::once(async { Err(e) }).boxed(),
45        };
46
47        let start_vid_u64 = match &args[0] {
48            Value::String(s) => s.parse::<u64>().unwrap_or(0),
49            Value::Number(n) => n.as_u64().unwrap_or(0),
50            _ => return stream::once(async { Err(anyhow::anyhow!("Invalid start node")) }).boxed(),
51        };
52
53        let end_vid_u64 = match &args[1] {
54            Value::String(s) => s.parse::<u64>().unwrap_or(0),
55            Value::Number(n) => n.as_u64().unwrap_or(0),
56            _ => return stream::once(async { Err(anyhow::anyhow!("Invalid end node")) }).boxed(),
57        };
58
59        let edge_types = args[2]
60            .as_array()
61            .unwrap()
62            .iter()
63            .map(|v| v.as_str().unwrap().to_string())
64            .collect::<Vec<_>>();
65        let max_len = args[3].as_u64().unwrap() as usize;
66
67        let node_labels = if args[4].is_null() {
68            Vec::new()
69        } else {
70            args[4]
71                .as_array()
72                .unwrap()
73                .iter()
74                .map(|v| v.as_str().unwrap().to_string())
75                .collect::<Vec<_>>()
76        };
77
78        let start_vid = Vid::from(start_vid_u64);
79        let end_vid = Vid::from(end_vid_u64);
80
81        let stream = async_stream::try_stream! {
82            let schema = ctx.storage.schema_manager().schema();
83
84            if !node_labels.is_empty() {
85                for label in &node_labels {
86                    if !schema.labels.contains_key(label) {
87                        Err(anyhow::anyhow!("Label '{}' not found", label))?;
88                    }
89                }
90            }
91            for etype in &edge_types {
92                if !schema.edge_types.contains_key(etype) {
93                    Err(anyhow::anyhow!("Edge type '{}' not found", etype))?;
94                }
95            }
96
97            let mut builder = ProjectionBuilder::new(ctx.storage.clone())
98                .l0_manager(ctx.l0_manager.clone())
99                .edge_types(&edge_types.iter().map(|s| s.as_str()).collect::<Vec<_>>());
100
101            if !node_labels.is_empty() {
102                builder = builder.node_labels(&node_labels.iter().map(|s| s.as_str()).collect::<Vec<_>>());
103            }
104
105            let projection = builder.build().await?;
106
107            let config = AllSimplePathsConfig {
108                source: start_vid,
109                target: end_vid,
110                max_depth: max_len,
111                limit: 1000,
112                min_depth: 0,
113            };
114
115            let result = tokio::task::spawn_blocking(move || {
116                AllSimplePaths::run(&projection, config)
117            }).await?;
118
119            for path in result.paths {
120                let path_json: Vec<Value> = path.into_iter().map(|v| json!(v.as_u64())).collect();
121                yield AlgoResultRow {
122                    values: vec![
123                        Value::Array(path_json),
124                    ],
125                };
126            }
127        };
128
129        Box::pin(stream)
130    }
131}