jellyflow_core/core/
subgraph.rs1use std::collections::BTreeSet;
2
3use serde_json::Value;
4use uuid::Uuid;
5
6use super::{Graph, GraphId, Node, NodeId};
7
8pub const SUBGRAPH_NODE_KIND: &str = "jellyflow.subgraph";
12
13#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
14pub enum SubgraphNodeError {
15 #[error("subgraph node missing graph_id: node={node:?}")]
16 MissingGraphId { node: NodeId },
17
18 #[error("subgraph node graph_id is not a string: node={node:?}")]
19 GraphIdNotString { node: NodeId },
20
21 #[error("subgraph node graph_id is not a valid uuid: node={node:?} value={value:?}")]
22 InvalidGraphId { node: NodeId, value: String },
23}
24
25#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
26pub enum SubgraphBindingError {
27 #[error("subgraph target is not declared in graph imports: node={node:?} graph_id={graph_id}")]
28 TargetNotImported { node: NodeId, graph_id: GraphId },
29}
30
31pub fn is_subgraph_node(node: &Node) -> bool {
32 node.kind.0 == SUBGRAPH_NODE_KIND
33}
34
35pub fn subgraph_target_graph_id(
41 node_id: NodeId,
42 node: &Node,
43) -> Result<Option<GraphId>, SubgraphNodeError> {
44 if !is_subgraph_node(node) {
45 return Ok(None);
46 }
47
48 let Some(obj) = node.data.as_object() else {
49 return Err(SubgraphNodeError::MissingGraphId { node: node_id });
50 };
51
52 let Some(raw) = obj.get("graph_id") else {
53 return Err(SubgraphNodeError::MissingGraphId { node: node_id });
54 };
55
56 let Some(s) = raw.as_str() else {
57 return Err(SubgraphNodeError::GraphIdNotString { node: node_id });
58 };
59
60 let uuid = Uuid::parse_str(s).map_err(|_| SubgraphNodeError::InvalidGraphId {
61 node: node_id,
62 value: s.to_string(),
63 })?;
64
65 Ok(Some(GraphId(uuid)))
66}
67
68pub fn collect_subgraph_targets(graph: &Graph) -> Result<BTreeSet<GraphId>, SubgraphNodeError> {
72 let mut out = BTreeSet::new();
73 for (node_id, node) in &graph.nodes {
74 if let Some(target) = subgraph_target_graph_id(*node_id, node)? {
75 out.insert(target);
76 }
77 }
78 Ok(out)
79}
80
81pub fn validate_subgraph_targets_are_imported(
83 graph: &Graph,
84) -> Result<(), Vec<SubgraphBindingError>> {
85 let mut errors = Vec::new();
86 for (node_id, node) in &graph.nodes {
87 let Ok(Some(target)) = subgraph_target_graph_id(*node_id, node) else {
88 continue;
89 };
90 if !graph.imports.contains_key(&target) {
91 errors.push(SubgraphBindingError::TargetNotImported {
92 node: *node_id,
93 graph_id: target,
94 });
95 }
96 }
97
98 if errors.is_empty() {
99 Ok(())
100 } else {
101 Err(errors)
102 }
103}
104
105pub fn subgraph_node_data(graph_id: GraphId) -> Value {
107 serde_json::json!({ "graph_id": graph_id })
108}