Skip to main content

jellyflow_runtime/rules/connection/insert/
between_ports.rs

1use crate::io::NodeGraphInteractionState;
2use crate::rules::{ConnectPlan, InsertNodeSpec};
3use jellyflow_core::core::{EdgeId, Graph, PortId};
4use jellyflow_core::interaction::NodeGraphConnectionMode;
5
6use super::super::common::{
7    ConnectionOpBuilder, edge_between, ensure_edge_id_available, resolve_policy_checked_connection,
8    validate_insert_node_spec,
9};
10use super::batch::InsertNodeMutationBuilder;
11
12/// Plans connecting two ports by inserting a node between them.
13///
14/// This is intended for "auto-fix" workflows like inserting a conversion node when types mismatch.
15pub fn plan_connect_by_inserting_node_with_policy(
16    graph: &Graph,
17    a: PortId,
18    b: PortId,
19    first_edge_id: EdgeId,
20    second_edge_id: EdgeId,
21    inserted: InsertNodeSpec,
22    state: &NodeGraphInteractionState,
23) -> ConnectPlan {
24    let endpoints = match resolve_policy_checked_connection(
25        graph,
26        a,
27        b,
28        NodeGraphConnectionMode::Strict,
29        state,
30    ) {
31        Ok(endpoints) => endpoints,
32        Err(plan) => return plan,
33    };
34
35    if let Err(reject) = ensure_edge_id_available(graph, first_edge_id) {
36        return reject;
37    }
38    if let Err(reject) = ensure_edge_id_available(graph, second_edge_id) {
39        return reject;
40    }
41
42    let expected_port_kind = endpoints.edge_kind.port_kind();
43    let inserted_ports = match validate_insert_node_spec(
44        graph,
45        &inserted,
46        endpoints.from.node,
47        endpoints.to.node,
48        expected_port_kind,
49    ) {
50        Ok(inserted_ports) => inserted_ports,
51        Err(plan) => return plan,
52    };
53
54    let mut ops = ConnectionOpBuilder::with_endpoint_capacity_disconnects(graph, &endpoints, None);
55
56    let mut batch = match InsertNodeMutationBuilder::new(graph, inserted) {
57        Ok(batch) => batch,
58        Err(plan) => return plan,
59    };
60    if let Err(plan) = batch.add_edge(
61        first_edge_id,
62        edge_between(endpoints.edge_kind, endpoints.from_id, inserted_ports.input),
63    ) {
64        return plan;
65    }
66    if let Err(plan) = batch.add_edge(
67        second_edge_id,
68        edge_between(endpoints.edge_kind, inserted_ports.output, endpoints.to_id),
69    ) {
70        return plan;
71    }
72    ops.extend(batch.into_ops());
73
74    ConnectPlan::from_ops(ops.into_ops())
75}
76
77/// Plans connecting two ports by inserting a node with default interaction policy.
78pub fn plan_connect_by_inserting_node(
79    graph: &Graph,
80    a: PortId,
81    b: PortId,
82    first_edge_id: EdgeId,
83    second_edge_id: EdgeId,
84    inserted: InsertNodeSpec,
85) -> ConnectPlan {
86    plan_connect_by_inserting_node_with_policy(
87        graph,
88        a,
89        b,
90        first_edge_id,
91        second_edge_id,
92        inserted,
93        &NodeGraphInteractionState::default(),
94    )
95}