Skip to main content

jellyflow_core/ops/mutation/planner/
ports.rs

1use crate::core::{NodeId, Port, PortId};
2use crate::ops::{GraphOp, GraphTransaction};
3
4use super::GraphMutationPlanner;
5use crate::ops::mutation::collect::{
6    bindings_for_port_removal, incident_edges_for_port, remove_edge_ops_for_port,
7};
8use crate::ops::mutation::{GraphMutationError, PortInsert};
9
10impl GraphMutationPlanner<'_> {
11    pub fn add_port_tx(
12        &self,
13        id: PortId,
14        port: Port,
15        insert: PortInsert,
16        label: impl Into<String>,
17    ) -> Result<GraphTransaction, GraphMutationError> {
18        Ok(GraphTransaction::new()
19            .with_label(label)
20            .with_ops(self.add_port_ops(id, port, insert)?))
21    }
22
23    pub fn add_port_ops(
24        &self,
25        id: PortId,
26        port: Port,
27        insert: PortInsert,
28    ) -> Result<Vec<GraphOp>, GraphMutationError> {
29        let node_id = port.node;
30        if self.graph.ports.contains_key(&id) {
31            return Err(GraphMutationError::PortAlreadyExists(id));
32        }
33        let node = self
34            .graph
35            .nodes
36            .get(&node_id)
37            .ok_or(GraphMutationError::MissingNode(node_id))?;
38        if node.ports.contains(&id) {
39            return Err(GraphMutationError::DuplicateNodePort {
40                node: node_id,
41                port: id,
42            });
43        }
44
45        let order = NodePortOrderEdit::insert(node_id, &node.ports, id, insert)?;
46
47        Ok(vec![
48            GraphOp::AddPort { id, port },
49            GraphOp::SetNodePorts {
50                id: node_id,
51                from: order.from,
52                to: order.to,
53            },
54        ])
55    }
56
57    pub fn remove_port_op(&self, id: PortId) -> Result<GraphOp, GraphMutationError> {
58        let port = self
59            .graph
60            .ports
61            .get(&id)
62            .cloned()
63            .ok_or(GraphMutationError::MissingPort(id))?;
64
65        let edges = incident_edges_for_port(self.graph, id);
66        Ok(GraphOp::RemovePort {
67            id,
68            port,
69            bindings: bindings_for_port_removal(self.graph, id, &edges),
70            edges,
71        })
72    }
73
74    pub fn remove_port_ops(&self, id: PortId) -> Result<Vec<GraphOp>, GraphMutationError> {
75        let remove_op = self.remove_port_op(id)?;
76        let mut ops = Vec::new();
77
78        if let GraphOp::RemovePort { port, .. } = &remove_op
79            && let Some(node) = self.graph.nodes.get(&port.node)
80        {
81            let order = NodePortOrderEdit::remove(&node.ports, id);
82            if order.from != order.to {
83                ops.push(GraphOp::SetNodePorts {
84                    id: port.node,
85                    from: order.from,
86                    to: order.to,
87                });
88            }
89        }
90
91        ops.push(remove_op);
92        Ok(ops)
93    }
94
95    pub fn disconnect_port_ops(&self, id: PortId) -> Result<Vec<GraphOp>, GraphMutationError> {
96        self.graph
97            .ports
98            .get(&id)
99            .ok_or(GraphMutationError::MissingPort(id))?;
100
101        Ok(remove_edge_ops_for_port(self.graph, id))
102    }
103
104    pub fn remove_port_tx(
105        &self,
106        id: PortId,
107        label: impl Into<String>,
108    ) -> Result<GraphTransaction, GraphMutationError> {
109        Ok(GraphTransaction::new()
110            .with_label(label)
111            .with_ops(self.remove_port_ops(id)?))
112    }
113}
114
115struct NodePortOrderEdit {
116    from: Vec<PortId>,
117    to: Vec<PortId>,
118}
119
120impl NodePortOrderEdit {
121    fn insert(
122        node: NodeId,
123        existing: &[PortId],
124        inserted: PortId,
125        insert: PortInsert,
126    ) -> Result<Self, GraphMutationError> {
127        let from = existing.to_vec();
128        let mut to = from.clone();
129        match insert {
130            PortInsert::Append => to.push(inserted),
131            PortInsert::At(index) => {
132                if index > to.len() {
133                    return Err(GraphMutationError::PortInsertOutOfBounds {
134                        node,
135                        index,
136                        len: to.len(),
137                    });
138                }
139                to.insert(index, inserted);
140            }
141        }
142
143        Ok(Self { from, to })
144    }
145
146    fn remove(existing: &[PortId], removed: PortId) -> Self {
147        let from = existing.to_vec();
148        let mut to = from.clone();
149        to.retain(|id| *id != removed);
150
151        Self { from, to }
152    }
153}