Skip to main content

jellyflow_core/ops/mutation/planner/
nodes.rs

1use std::collections::BTreeSet;
2
3use crate::core::{Binding, BindingId, Edge, EdgeId, Graph, Node, NodeId, Port, PortId};
4use crate::ops::{GraphOp, GraphTransaction};
5
6use super::GraphMutationPlanner;
7use crate::ops::mutation::GraphMutationError;
8use crate::ops::mutation::collect::{
9    bindings_for_node_removal, incident_edges_for_ports, ports_for_node,
10};
11
12impl GraphMutationPlanner<'_> {
13    pub fn add_node_with_ports_tx(
14        &self,
15        id: NodeId,
16        node: Node,
17        ports: impl IntoIterator<Item = (PortId, Port)>,
18        label: impl Into<String>,
19    ) -> Result<GraphTransaction, GraphMutationError> {
20        Ok(GraphTransaction::new()
21            .with_label(label)
22            .with_ops(self.add_node_with_ports_ops(id, node, ports)?))
23    }
24
25    pub fn add_node_with_ports_ops(
26        &self,
27        id: NodeId,
28        mut node: Node,
29        ports: impl IntoIterator<Item = (PortId, Port)>,
30    ) -> Result<Vec<GraphOp>, GraphMutationError> {
31        if self.graph.nodes.contains_key(&id) {
32            return Err(GraphMutationError::NodeAlreadyExists(id));
33        }
34        if let Some(parent) = node.parent
35            && !self.graph.groups.contains_key(&parent)
36        {
37            return Err(GraphMutationError::MissingGroup(parent));
38        }
39
40        let NodePortsForInsert { ports, order } =
41            NodePortsForInsert::collect(self.graph, id, ports)?;
42
43        node.ports = Vec::new();
44        let mut ops = vec![GraphOp::AddNode { id, node }];
45        for (port_id, port) in ports {
46            ops.push(GraphOp::AddPort { id: port_id, port });
47        }
48        if !order.is_empty() {
49            ops.push(GraphOp::SetNodePorts {
50                id,
51                from: Vec::new(),
52                to: order,
53            });
54        }
55        Ok(ops)
56    }
57
58    pub fn remove_node_op(&self, id: NodeId) -> Result<GraphOp, GraphMutationError> {
59        let snapshot = NodeRemovalSnapshot::capture(self.graph, id)?;
60
61        Ok(GraphOp::RemoveNode {
62            id,
63            node: snapshot.node,
64            ports: snapshot.ports,
65            edges: snapshot.edges,
66            bindings: snapshot.bindings,
67        })
68    }
69
70    pub fn remove_node_tx(
71        &self,
72        id: NodeId,
73        label: impl Into<String>,
74    ) -> Result<GraphTransaction, GraphMutationError> {
75        Ok(GraphTransaction::new()
76            .with_label(label)
77            .with_ops([self.remove_node_op(id)?]))
78    }
79}
80
81struct NodePortsForInsert {
82    ports: Vec<(PortId, Port)>,
83    order: Vec<PortId>,
84}
85
86struct NodeRemovalSnapshot {
87    node: Node,
88    ports: Vec<(PortId, Port)>,
89    edges: Vec<(EdgeId, Edge)>,
90    bindings: Vec<(BindingId, Binding)>,
91}
92
93impl NodePortsForInsert {
94    fn collect(
95        graph: &Graph,
96        node: NodeId,
97        ports: impl IntoIterator<Item = (PortId, Port)>,
98    ) -> Result<Self, GraphMutationError> {
99        let mut seen = BTreeSet::new();
100        let mut collected = Vec::new();
101        let mut order = Vec::new();
102
103        for (port_id, port) in ports {
104            if graph.ports.contains_key(&port_id) {
105                return Err(GraphMutationError::PortAlreadyExists(port_id));
106            }
107            if !seen.insert(port_id) {
108                return Err(GraphMutationError::DuplicateNodePort {
109                    node,
110                    port: port_id,
111                });
112            }
113            if port.node != node {
114                return Err(GraphMutationError::PortOwnerMismatch {
115                    port: port_id,
116                    expected: node,
117                    got: port.node,
118                });
119            }
120            order.push(port_id);
121            collected.push((port_id, port));
122        }
123
124        Ok(Self {
125            ports: collected,
126            order,
127        })
128    }
129}
130
131impl NodeRemovalSnapshot {
132    fn capture(graph: &Graph, node_id: NodeId) -> Result<Self, GraphMutationError> {
133        let node = graph
134            .nodes
135            .get(&node_id)
136            .cloned()
137            .ok_or(GraphMutationError::MissingNode(node_id))?;
138
139        let ports = ports_for_node(graph, node_id);
140        let port_ids = Self::port_ids(&ports);
141        let edges = incident_edges_for_ports(graph, &port_ids);
142
143        let bindings = bindings_for_node_removal(graph, node_id, &ports, &edges);
144
145        Ok(Self {
146            node,
147            ports,
148            edges,
149            bindings,
150        })
151    }
152
153    fn port_ids(ports: &[(PortId, Port)]) -> BTreeSet<PortId> {
154        ports.iter().map(|(port_id, _)| *port_id).collect()
155    }
156}