jellyflow_core/ops/mutation/planner/
ports.rs1use crate::core::{NodeId, Port, PortId};
2use crate::ops::{GraphOp, GraphTransaction};
3
4use super::GraphMutationPlanner;
5use crate::ops::mutation::collect::{
6 bindings_for_port_removal, incident_edges_for_port, remove_edge_ops_for_port,
7};
8use crate::ops::mutation::{GraphMutationError, PortInsert};
9
10impl GraphMutationPlanner<'_> {
11 pub fn add_port_tx(
12 &self,
13 id: PortId,
14 port: Port,
15 insert: PortInsert,
16 label: impl Into<String>,
17 ) -> Result<GraphTransaction, GraphMutationError> {
18 Ok(GraphTransaction::new()
19 .with_label(label)
20 .with_ops(self.add_port_ops(id, port, insert)?))
21 }
22
23 pub fn add_port_ops(
24 &self,
25 id: PortId,
26 port: Port,
27 insert: PortInsert,
28 ) -> Result<Vec<GraphOp>, GraphMutationError> {
29 let node_id = port.node;
30 if self.graph.ports.contains_key(&id) {
31 return Err(GraphMutationError::PortAlreadyExists(id));
32 }
33 let node = self
34 .graph
35 .nodes
36 .get(&node_id)
37 .ok_or(GraphMutationError::MissingNode(node_id))?;
38 if node.ports.contains(&id) {
39 return Err(GraphMutationError::DuplicateNodePort {
40 node: node_id,
41 port: id,
42 });
43 }
44
45 let order = NodePortOrderEdit::insert(node_id, &node.ports, id, insert)?;
46
47 Ok(vec![
48 GraphOp::AddPort { id, port },
49 GraphOp::SetNodePorts {
50 id: node_id,
51 from: order.from,
52 to: order.to,
53 },
54 ])
55 }
56
57 pub fn remove_port_op(&self, id: PortId) -> Result<GraphOp, GraphMutationError> {
58 let port = self
59 .graph
60 .ports
61 .get(&id)
62 .cloned()
63 .ok_or(GraphMutationError::MissingPort(id))?;
64
65 let edges = incident_edges_for_port(self.graph, id);
66 Ok(GraphOp::RemovePort {
67 id,
68 port,
69 bindings: bindings_for_port_removal(self.graph, id, &edges),
70 edges,
71 })
72 }
73
74 pub fn remove_port_ops(&self, id: PortId) -> Result<Vec<GraphOp>, GraphMutationError> {
75 let remove_op = self.remove_port_op(id)?;
76 let mut ops = Vec::new();
77
78 if let GraphOp::RemovePort { port, .. } = &remove_op
79 && let Some(node) = self.graph.nodes.get(&port.node)
80 {
81 let order = NodePortOrderEdit::remove(&node.ports, id);
82 if order.from != order.to {
83 ops.push(GraphOp::SetNodePorts {
84 id: port.node,
85 from: order.from,
86 to: order.to,
87 });
88 }
89 }
90
91 ops.push(remove_op);
92 Ok(ops)
93 }
94
95 pub fn disconnect_port_ops(&self, id: PortId) -> Result<Vec<GraphOp>, GraphMutationError> {
96 self.graph
97 .ports
98 .get(&id)
99 .ok_or(GraphMutationError::MissingPort(id))?;
100
101 Ok(remove_edge_ops_for_port(self.graph, id))
102 }
103
104 pub fn remove_port_tx(
105 &self,
106 id: PortId,
107 label: impl Into<String>,
108 ) -> Result<GraphTransaction, GraphMutationError> {
109 Ok(GraphTransaction::new()
110 .with_label(label)
111 .with_ops(self.remove_port_ops(id)?))
112 }
113}
114
115struct NodePortOrderEdit {
116 from: Vec<PortId>,
117 to: Vec<PortId>,
118}
119
120impl NodePortOrderEdit {
121 fn insert(
122 node: NodeId,
123 existing: &[PortId],
124 inserted: PortId,
125 insert: PortInsert,
126 ) -> Result<Self, GraphMutationError> {
127 let from = existing.to_vec();
128 let mut to = from.clone();
129 match insert {
130 PortInsert::Append => to.push(inserted),
131 PortInsert::At(index) => {
132 if index > to.len() {
133 return Err(GraphMutationError::PortInsertOutOfBounds {
134 node,
135 index,
136 len: to.len(),
137 });
138 }
139 to.insert(index, inserted);
140 }
141 }
142
143 Ok(Self { from, to })
144 }
145
146 fn remove(existing: &[PortId], removed: PortId) -> Self {
147 let from = existing.to_vec();
148 let mut to = from.clone();
149 to.retain(|id| *id != removed);
150
151 Self { from, to }
152 }
153}