Skip to main content

drasi_lib/component_graph/
transaction.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use petgraph::stable_graph::{EdgeIndex, NodeIndex};
16
17use crate::channels::ComponentEvent;
18
19use super::graph::ComponentGraph;
20use super::{ComponentNode, RelationshipKind};
21
22// ============================================================================
23// Graph Transaction
24// ============================================================================
25
26/// A transactional wrapper for batching graph mutations.
27///
28/// Collects added nodes and edges, deferring event emission until [`commit()`](Self::commit).
29/// If dropped without commit, all additions are rolled back (nodes and edges removed).
30///
31/// The `&'g mut ComponentGraph` borrow ensures Rust's borrow checker prevents
32/// any concurrent access to the graph during the transaction — zero-cost safety.
33///
34/// # Cross-system usage
35///
36/// For operations that span the graph and external systems (runtime initialization,
37/// HashMap insertion), use this pattern:
38///
39/// ```ignore
40/// // Phase 1: Graph transaction
41/// {
42///     let mut graph = self.graph.write().await;
43///     let mut txn = graph.begin();
44///     txn.add_component(node)?;
45///     txn.commit(); // graph is consistent
46/// }
47///
48/// // Phase 2: Runtime init (no graph lock)
49/// let instance = match RuntimeType::new(...) {
50///     Ok(i) => i,
51///     Err(e) => {
52///         // Compensating rollback
53///         let mut graph = self.graph.write().await;
54///         let _ = graph.remove_component(&id);
55///         return Err(e);
56///     }
57/// };
58/// ```
59pub struct GraphTransaction<'g> {
60    graph: &'g mut ComponentGraph,
61    added_nodes: Vec<(NodeIndex, String)>,
62    added_edges: Vec<EdgeIndex>,
63    pending_events: Vec<ComponentEvent>,
64    committed: bool,
65}
66
67impl<'g> GraphTransaction<'g> {
68    pub(super) fn new(graph: &'g mut ComponentGraph) -> Self {
69        Self {
70            graph,
71            added_nodes: Vec::new(),
72            added_edges: Vec::new(),
73            pending_events: Vec::new(),
74            committed: false,
75        }
76    }
77
78    /// Add a component node to the graph within this transaction.
79    ///
80    /// The node is added immediately (so subsequent `add_relationship` calls
81    /// can reference it), but the event is deferred until `commit()`.
82    /// On rollback (drop without commit), the node and its ownership edges
83    /// are removed.
84    pub fn add_component(&mut self, node: ComponentNode) -> anyhow::Result<NodeIndex> {
85        let id = node.id.clone();
86        let (node_idx, event, owns_edge, owned_by_edge) =
87            self.graph.add_component_internal(node)?;
88        self.added_nodes.push((node_idx, id));
89
90        // Record the ownership edges for rollback
91        self.added_edges.push(owns_edge);
92        self.added_edges.push(owned_by_edge);
93
94        if let Some(event) = event {
95            self.pending_events.push(event);
96        }
97
98        Ok(node_idx)
99    }
100
101    /// Add a bidirectional relationship within this transaction.
102    ///
103    /// The edges are added immediately; on rollback they are removed.
104    /// Idempotent: if the relationship already exists, this is a no-op.
105    pub fn add_relationship(
106        &mut self,
107        from_id: &str,
108        to_id: &str,
109        forward: RelationshipKind,
110    ) -> anyhow::Result<()> {
111        let (fwd, rev) = self
112            .graph
113            .add_relationship_internal(from_id, to_id, forward)?;
114        if let Some(e) = fwd {
115            self.added_edges.push(e);
116        }
117        if let Some(e) = rev {
118            self.added_edges.push(e);
119        }
120        Ok(())
121    }
122
123    /// Commit the transaction: emit all deferred events.
124    ///
125    /// After commit, the mutations are permanent and cannot be rolled back
126    /// through this transaction. For cross-system rollback, use compensating
127    /// actions (e.g., `graph.remove_component()`).
128    pub fn commit(mut self) {
129        self.committed = true;
130        for event in &self.pending_events {
131            let _ = self.graph.event_tx.send(event.clone());
132        }
133    }
134}
135
136impl<'g> Drop for GraphTransaction<'g> {
137    fn drop(&mut self) {
138        if self.committed {
139            return;
140        }
141
142        // Rollback: remove all added edges first (reverse order to maintain index stability)
143        for &edge_idx in self.added_edges.iter().rev() {
144            self.graph.graph.remove_edge(edge_idx);
145        }
146
147        // Remove all added nodes (reverse order)
148        for (node_idx, ref id) in self.added_nodes.iter().rev() {
149            self.graph.index.remove(id.as_str());
150            self.graph.graph.remove_node(*node_idx);
151        }
152    }
153}