Skip to main content

jellyflow_runtime/rules/connection/insert/
split_edge.rs

1use crate::rules::{ConnectPlan, InsertNodeSpec};
2use jellyflow_core::core::{Edge, EdgeId, Graph, Port, PortDirection, PortKind};
3use jellyflow_core::ops::EdgeEndpoints;
4
5use super::super::common::{
6    edge_like, ensure_edge_id_available, reject_edge_kind_incompatible, reject_missing_edge,
7    validate_insert_node_spec,
8};
9use super::batch::InsertNodeMutationBuilder;
10
11/// Plans splitting an existing edge by inserting a node (preserving the edge identity for the first segment).
12pub fn plan_split_edge_by_inserting_node(
13    graph: &Graph,
14    edge_id: EdgeId,
15    new_edge_id: EdgeId,
16    inserted: InsertNodeSpec,
17) -> ConnectPlan {
18    let split_edge = match SplittableEdge::resolve(graph, edge_id) {
19        Ok(split_edge) => split_edge,
20        Err(plan) => return plan,
21    };
22    if let Err(reject) = ensure_edge_id_available(graph, new_edge_id) {
23        return reject;
24    }
25
26    let inserted_ports = match validate_insert_node_spec(
27        graph,
28        &inserted,
29        split_edge.from_port.node,
30        split_edge.to_port.node,
31        split_edge.expected_port_kind(),
32    ) {
33        Ok(inserted_ports) => inserted_ports,
34        Err(plan) => return plan,
35    };
36
37    let mut batch = match InsertNodeMutationBuilder::new(graph, inserted) {
38        Ok(batch) => batch,
39        Err(plan) => return plan,
40    };
41    if let Err(plan) = batch.set_edge_endpoints(
42        edge_id,
43        EdgeEndpoints::new(split_edge.edge.from, inserted_ports.input),
44    ) {
45        return plan;
46    }
47    if let Err(plan) = batch.add_edge(
48        new_edge_id,
49        edge_like(split_edge.edge, inserted_ports.output, split_edge.edge.to),
50    ) {
51        return plan;
52    }
53
54    ConnectPlan::from_ops(batch.into_ops())
55}
56
57struct SplittableEdge<'a> {
58    edge: &'a Edge,
59    from_port: &'a Port,
60    to_port: &'a Port,
61}
62
63impl<'a> SplittableEdge<'a> {
64    fn resolve(graph: &'a Graph, edge_id: EdgeId) -> Result<Self, ConnectPlan> {
65        let Some(edge) = graph.edges.get(&edge_id) else {
66            return Err(reject_missing_edge(edge_id));
67        };
68        let Some(from_port) = graph.ports.get(&edge.from) else {
69            return Err(ConnectPlan::reject("missing edge.from port"));
70        };
71        let Some(to_port) = graph.ports.get(&edge.to) else {
72            return Err(ConnectPlan::reject("missing edge.to port"));
73        };
74
75        let split_edge = Self {
76            edge,
77            from_port,
78            to_port,
79        };
80        split_edge.ensure_direction()?;
81        split_edge.ensure_port_kinds()?;
82
83        Ok(split_edge)
84    }
85
86    fn expected_port_kind(&self) -> PortKind {
87        self.edge.kind.port_kind()
88    }
89
90    fn ensure_direction(&self) -> Result<(), ConnectPlan> {
91        if self.from_port.dir != PortDirection::Out || self.to_port.dir != PortDirection::In {
92            return Err(ConnectPlan::reject("edge must be out -> in"));
93        }
94        Ok(())
95    }
96
97    fn ensure_port_kinds(&self) -> Result<(), ConnectPlan> {
98        let expected_port_kind = self.expected_port_kind();
99        if self.from_port.kind != expected_port_kind || self.to_port.kind != expected_port_kind {
100            return Err(reject_edge_kind_incompatible());
101        }
102        Ok(())
103    }
104}