drasi_lib/component_graph/transaction.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::stable_graph::{EdgeIndex, NodeIndex};
16
17use crate::channels::ComponentEvent;
18
19use super::graph::ComponentGraph;
20use super::{ComponentNode, RelationshipKind};
21
22// ============================================================================
23// Graph Transaction
24// ============================================================================
25
26/// A transactional wrapper for batching graph mutations.
27///
28/// Collects added nodes and edges, deferring event emission until [`commit()`](Self::commit).
29/// If dropped without commit, all additions are rolled back (nodes and edges removed).
30///
31/// The `&'g mut ComponentGraph` borrow ensures Rust's borrow checker prevents
32/// any concurrent access to the graph during the transaction — zero-cost safety.
33///
34/// # Cross-system usage
35///
36/// For operations that span the graph and external systems (runtime initialization,
37/// HashMap insertion), use this pattern:
38///
39/// ```ignore
40/// // Phase 1: Graph transaction
41/// {
42/// let mut graph = self.graph.write().await;
43/// let mut txn = graph.begin();
44/// txn.add_component(node)?;
45/// txn.commit(); // graph is consistent
46/// }
47///
48/// // Phase 2: Runtime init (no graph lock)
49/// let instance = match RuntimeType::new(...) {
50/// Ok(i) => i,
51/// Err(e) => {
52/// // Compensating rollback
53/// let mut graph = self.graph.write().await;
54/// let _ = graph.remove_component(&id);
55/// return Err(e);
56/// }
57/// };
58/// ```
59pub struct GraphTransaction<'g> {
60 graph: &'g mut ComponentGraph,
61 added_nodes: Vec<(NodeIndex, String)>,
62 added_edges: Vec<EdgeIndex>,
63 pending_events: Vec<ComponentEvent>,
64 committed: bool,
65}
66
67impl<'g> GraphTransaction<'g> {
68 pub(super) fn new(graph: &'g mut ComponentGraph) -> Self {
69 Self {
70 graph,
71 added_nodes: Vec::new(),
72 added_edges: Vec::new(),
73 pending_events: Vec::new(),
74 committed: false,
75 }
76 }
77
78 /// Add a component node to the graph within this transaction.
79 ///
80 /// The node is added immediately (so subsequent `add_relationship` calls
81 /// can reference it), but the event is deferred until `commit()`.
82 /// On rollback (drop without commit), the node and its ownership edges
83 /// are removed.
84 pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
85 let id = node.id.clone();
86 let (node_idx, event, owns_edge, owned_by_edge) =
87 self.graph.add_component_internal(node)?;
88 self.added_nodes.push((node_idx, id));
89
90 // Record the ownership edges for rollback
91 self.added_edges.push(owns_edge);
92 self.added_edges.push(owned_by_edge);
93
94 if let Some(event) = event {
95 self.pending_events.push(event);
96 }
97
98 Ok(node_idx)
99 }
100
101 /// Add a bidirectional relationship within this transaction.
102 ///
103 /// The edges are added immediately; on rollback they are removed.
104 /// Idempotent: if the relationship already exists, this is a no-op.
105 pub fn add_relationship(
106 &mut self,
107 from_id: &str,
108 to_id: &str,
109 forward: RelationshipKind,
110 ) -> anyhow::Result<()> {
111 let (fwd, rev) = self
112 .graph
113 .add_relationship_internal(from_id, to_id, forward)?;
114 if let Some(e) = fwd {
115 self.added_edges.push(e);
116 }
117 if let Some(e) = rev {
118 self.added_edges.push(e);
119 }
120 Ok(())
121 }
122
123 /// Commit the transaction: emit all deferred events.
124 ///
125 /// After commit, the mutations are permanent and cannot be rolled back
126 /// through this transaction. For cross-system rollback, use compensating
127 /// actions (e.g., `graph.remove_component()`).
128 pub fn commit(mut self) {
129 self.committed = true;
130 for event in &self.pending_events {
131 let _ = self.graph.event_tx.send(event.clone());
132 }
133 }
134}
135
136impl<'g> Drop for GraphTransaction<'g> {
137 fn drop(&mut self) {
138 if self.committed {
139 return;
140 }
141
142 // Rollback: remove all added edges first (reverse order to maintain index stability)
143 for &edge_idx in self.added_edges.iter().rev() {
144 self.graph.graph.remove_edge(edge_idx);
145 }
146
147 // Remove all added nodes (reverse order)
148 for (node_idx, ref id) in self.added_nodes.iter().rev() {
149 self.graph.index.remove(id.as_str());
150 self.graph.graph.remove_node(*node_idx);
151 }
152 }
153}