jellyflow_runtime/rules/connection/insert/
split_edge.rs1use crate::rules::{ConnectPlan, InsertNodeSpec};
2use jellyflow_core::core::{Edge, EdgeId, Graph, Port, PortDirection, PortKind};
3use jellyflow_core::ops::EdgeEndpoints;
4
5use super::super::common::{
6 edge_like, ensure_edge_id_available, reject_edge_kind_incompatible, reject_missing_edge,
7 validate_insert_node_spec,
8};
9use super::batch::InsertNodeMutationBuilder;
10
11pub fn plan_split_edge_by_inserting_node(
13 graph: &Graph,
14 edge_id: EdgeId,
15 new_edge_id: EdgeId,
16 inserted: InsertNodeSpec,
17) -> ConnectPlan {
18 let split_edge = match SplittableEdge::resolve(graph, edge_id) {
19 Ok(split_edge) => split_edge,
20 Err(plan) => return plan,
21 };
22 if let Err(reject) = ensure_edge_id_available(graph, new_edge_id) {
23 return reject;
24 }
25
26 let inserted_ports = match validate_insert_node_spec(
27 graph,
28 &inserted,
29 split_edge.from_port.node,
30 split_edge.to_port.node,
31 split_edge.expected_port_kind(),
32 ) {
33 Ok(inserted_ports) => inserted_ports,
34 Err(plan) => return plan,
35 };
36
37 let mut batch = match InsertNodeMutationBuilder::new(graph, inserted) {
38 Ok(batch) => batch,
39 Err(plan) => return plan,
40 };
41 if let Err(plan) = batch.set_edge_endpoints(
42 edge_id,
43 EdgeEndpoints::new(split_edge.edge.from, inserted_ports.input),
44 ) {
45 return plan;
46 }
47 if let Err(plan) = batch.add_edge(
48 new_edge_id,
49 edge_like(split_edge.edge, inserted_ports.output, split_edge.edge.to),
50 ) {
51 return plan;
52 }
53
54 ConnectPlan::from_ops(batch.into_ops())
55}
56
57struct SplittableEdge<'a> {
58 edge: &'a Edge,
59 from_port: &'a Port,
60 to_port: &'a Port,
61}
62
63impl<'a> SplittableEdge<'a> {
64 fn resolve(graph: &'a Graph, edge_id: EdgeId) -> Result<Self, ConnectPlan> {
65 let Some(edge) = graph.edges.get(&edge_id) else {
66 return Err(reject_missing_edge(edge_id));
67 };
68 let Some(from_port) = graph.ports.get(&edge.from) else {
69 return Err(ConnectPlan::reject("missing edge.from port"));
70 };
71 let Some(to_port) = graph.ports.get(&edge.to) else {
72 return Err(ConnectPlan::reject("missing edge.to port"));
73 };
74
75 let split_edge = Self {
76 edge,
77 from_port,
78 to_port,
79 };
80 split_edge.ensure_direction()?;
81 split_edge.ensure_port_kinds()?;
82
83 Ok(split_edge)
84 }
85
86 fn expected_port_kind(&self) -> PortKind {
87 self.edge.kind.port_kind()
88 }
89
90 fn ensure_direction(&self) -> Result<(), ConnectPlan> {
91 if self.from_port.dir != PortDirection::Out || self.to_port.dir != PortDirection::In {
92 return Err(ConnectPlan::reject("edge must be out -> in"));
93 }
94 Ok(())
95 }
96
97 fn ensure_port_kinds(&self) -> Result<(), ConnectPlan> {
98 let expected_port_kind = self.expected_port_kind();
99 if self.from_port.kind != expected_port_kind || self.to_port.kind != expected_port_kind {
100 return Err(reject_edge_kind_incompatible());
101 }
102 Ok(())
103 }
104}