jellyflow_core/ops/mutation/planner/
nodes.rs1use std::collections::BTreeSet;
2
3use crate::core::{Binding, BindingId, Edge, EdgeId, Graph, Node, NodeId, Port, PortId};
4use crate::ops::{GraphOp, GraphTransaction};
5
6use super::GraphMutationPlanner;
7use crate::ops::mutation::GraphMutationError;
8use crate::ops::mutation::collect::{
9 bindings_for_node_removal, incident_edges_for_ports, ports_for_node,
10};
11
12impl GraphMutationPlanner<'_> {
13 pub fn add_node_with_ports_tx(
14 &self,
15 id: NodeId,
16 node: Node,
17 ports: impl IntoIterator<Item = (PortId, Port)>,
18 label: impl Into<String>,
19 ) -> Result<GraphTransaction, GraphMutationError> {
20 Ok(GraphTransaction::new()
21 .with_label(label)
22 .with_ops(self.add_node_with_ports_ops(id, node, ports)?))
23 }
24
25 pub fn add_node_with_ports_ops(
26 &self,
27 id: NodeId,
28 mut node: Node,
29 ports: impl IntoIterator<Item = (PortId, Port)>,
30 ) -> Result<Vec<GraphOp>, GraphMutationError> {
31 if self.graph.nodes.contains_key(&id) {
32 return Err(GraphMutationError::NodeAlreadyExists(id));
33 }
34 if let Some(parent) = node.parent
35 && !self.graph.groups.contains_key(&parent)
36 {
37 return Err(GraphMutationError::MissingGroup(parent));
38 }
39
40 let NodePortsForInsert { ports, order } =
41 NodePortsForInsert::collect(self.graph, id, ports)?;
42
43 node.ports = Vec::new();
44 let mut ops = vec![GraphOp::AddNode { id, node }];
45 for (port_id, port) in ports {
46 ops.push(GraphOp::AddPort { id: port_id, port });
47 }
48 if !order.is_empty() {
49 ops.push(GraphOp::SetNodePorts {
50 id,
51 from: Vec::new(),
52 to: order,
53 });
54 }
55 Ok(ops)
56 }
57
58 pub fn remove_node_op(&self, id: NodeId) -> Result<GraphOp, GraphMutationError> {
59 let snapshot = NodeRemovalSnapshot::capture(self.graph, id)?;
60
61 Ok(GraphOp::RemoveNode {
62 id,
63 node: snapshot.node,
64 ports: snapshot.ports,
65 edges: snapshot.edges,
66 bindings: snapshot.bindings,
67 })
68 }
69
70 pub fn remove_node_tx(
71 &self,
72 id: NodeId,
73 label: impl Into<String>,
74 ) -> Result<GraphTransaction, GraphMutationError> {
75 Ok(GraphTransaction::new()
76 .with_label(label)
77 .with_ops([self.remove_node_op(id)?]))
78 }
79}
80
81struct NodePortsForInsert {
82 ports: Vec<(PortId, Port)>,
83 order: Vec<PortId>,
84}
85
86struct NodeRemovalSnapshot {
87 node: Node,
88 ports: Vec<(PortId, Port)>,
89 edges: Vec<(EdgeId, Edge)>,
90 bindings: Vec<(BindingId, Binding)>,
91}
92
93impl NodePortsForInsert {
94 fn collect(
95 graph: &Graph,
96 node: NodeId,
97 ports: impl IntoIterator<Item = (PortId, Port)>,
98 ) -> Result<Self, GraphMutationError> {
99 let mut seen = BTreeSet::new();
100 let mut collected = Vec::new();
101 let mut order = Vec::new();
102
103 for (port_id, port) in ports {
104 if graph.ports.contains_key(&port_id) {
105 return Err(GraphMutationError::PortAlreadyExists(port_id));
106 }
107 if !seen.insert(port_id) {
108 return Err(GraphMutationError::DuplicateNodePort {
109 node,
110 port: port_id,
111 });
112 }
113 if port.node != node {
114 return Err(GraphMutationError::PortOwnerMismatch {
115 port: port_id,
116 expected: node,
117 got: port.node,
118 });
119 }
120 order.push(port_id);
121 collected.push((port_id, port));
122 }
123
124 Ok(Self {
125 ports: collected,
126 order,
127 })
128 }
129}
130
131impl NodeRemovalSnapshot {
132 fn capture(graph: &Graph, node_id: NodeId) -> Result<Self, GraphMutationError> {
133 let node = graph
134 .nodes
135 .get(&node_id)
136 .cloned()
137 .ok_or(GraphMutationError::MissingNode(node_id))?;
138
139 let ports = ports_for_node(graph, node_id);
140 let port_ids = Self::port_ids(&ports);
141 let edges = incident_edges_for_ports(graph, &port_ids);
142
143 let bindings = bindings_for_node_removal(graph, node_id, &ports, &edges);
144
145 Ok(Self {
146 node,
147 ports,
148 edges,
149 bindings,
150 })
151 }
152
153 fn port_ids(ports: &[(PortId, Port)]) -> BTreeSet<PortId> {
154 ports.iter().map(|(port_id, _)| *port_id).collect()
155 }
156}