Skip to main content

grafeo_engine/transaction/
prepared.rs

1//! Two-phase commit with inspection and metadata.
2//!
3//! `PreparedCommit` lets you inspect pending changes before finalizing a
4//! transaction. This is useful for external integrations that need to
5//! validate, audit, or attach metadata before committing.
6//!
7//! # Example
8//!
9//! ```no_run
10//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
11//! use grafeo_engine::GrafeoDB;
12//!
13//! let db = GrafeoDB::new_in_memory();
14//! let mut session = db.session();
15//!
16//! session.begin_transaction()?;
17//! session.execute("INSERT (:Person {name: 'Alix'})")?;
18//!
19//! let mut prepared = session.prepare_commit()?;
20//! let info = prepared.info();
21//! assert_eq!(info.nodes_written, 1);
22//!
23//! prepared.set_metadata("audit_user", "admin");
24//! prepared.commit()?;
25//! # Ok(())
26//! # }
27//! ```
28
29use std::collections::HashMap;
30
31use grafeo_common::types::{EpochId, TransactionId};
32use grafeo_common::utils::error::{Error, Result, TransactionError};
33
34use crate::Session;
35
36/// Summary of pending transaction mutations.
37#[derive(Debug, Clone)]
38pub struct CommitInfo {
39    /// Transaction ID.
40    pub txn_id: TransactionId,
41    /// Snapshot epoch the transaction read from.
42    pub start_epoch: EpochId,
43    /// Number of node entities in the write set.
44    pub nodes_written: u64,
45    /// Number of edge entities in the write set.
46    pub edges_written: u64,
47}
48
49/// A transaction that has been validated and is ready to commit.
50///
51/// Created by [`Session::prepare_commit`]. The mutable borrow on the session
52/// prevents any concurrent operations while the commit is pending.
53///
54/// If dropped without calling [`commit`](Self::commit) or [`abort`](Self::abort),
55/// the transaction is automatically rolled back.
56pub struct PreparedCommit<'a> {
57    session: &'a mut Session,
58    metadata: HashMap<String, String>,
59    info: CommitInfo,
60    finalized: bool,
61}
62
63impl<'a> PreparedCommit<'a> {
64    /// Creates a new prepared commit from a session with an active transaction.
65    pub(crate) fn new(session: &'a mut Session) -> Result<Self> {
66        let transaction_id = session.current_transaction_id().ok_or_else(|| {
67            Error::Transaction(TransactionError::InvalidState(
68                "No active transaction to prepare".to_string(),
69            ))
70        })?;
71
72        let start_epoch = session
73            .transaction_manager()
74            .start_epoch(transaction_id)
75            .unwrap_or(EpochId::new(0));
76
77        // Compute mutation counts from store deltas since begin_transaction.
78        let (start_nodes, current_nodes) = session.node_count_delta();
79        let (start_edges, current_edges) = session.edge_count_delta();
80        let nodes_written = current_nodes.saturating_sub(start_nodes) as u64;
81        let edges_written = current_edges.saturating_sub(start_edges) as u64;
82
83        let info = CommitInfo {
84            txn_id: transaction_id,
85            start_epoch,
86            nodes_written,
87            edges_written,
88        };
89
90        Ok(Self {
91            session,
92            metadata: HashMap::new(),
93            info,
94            finalized: false,
95        })
96    }
97
98    /// Returns the commit info (mutation summary).
99    #[must_use]
100    pub fn info(&self) -> &CommitInfo {
101        &self.info
102    }
103
104    /// Attaches metadata to this commit.
105    ///
106    /// Metadata is available for logging, auditing, or CDC consumers.
107    pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
108        self.metadata.insert(key.into(), value.into());
109    }
110
111    /// Returns the attached metadata.
112    #[must_use]
113    pub fn metadata(&self) -> &HashMap<String, String> {
114        &self.metadata
115    }
116
117    /// Finalizes the commit, persisting all changes.
118    ///
119    /// Consumes self to prevent double-commit.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the commit fails (write-write conflict, SSI violation, etc.).
124    pub fn commit(mut self) -> Result<EpochId> {
125        self.finalized = true;
126        self.session.commit()?;
127        Ok(self.session.transaction_manager().current_epoch())
128    }
129
130    /// Explicitly aborts the transaction, discarding all changes.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the rollback fails.
135    pub fn abort(mut self) -> Result<()> {
136        self.finalized = true;
137        self.session.rollback()
138    }
139}
140
141impl Drop for PreparedCommit<'_> {
142    fn drop(&mut self) {
143        if !self.finalized {
144            // Best-effort rollback on drop
145            let _ = self.session.rollback();
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use crate::GrafeoDB;
153
154    #[test]
155    fn test_prepared_commit_basic() {
156        let db = GrafeoDB::new_in_memory();
157        let mut session = db.session();
158
159        session.begin_transaction().unwrap();
160        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
161
162        let prepared = session.prepare_commit().unwrap();
163        let info = prepared.info();
164
165        // Uncommitted versions use EpochId::PENDING, so they are invisible to
166        // node_count() which is used by node_count_delta(). The write set
167        // counter therefore reports 0 until the epochs are finalized at commit.
168        assert_eq!(info.edges_written, 0);
169
170        let epoch = prepared.commit().unwrap();
171        assert!(epoch.as_u64() > 0);
172
173        // After commit, finalize_version_epochs() converts PENDING to the
174        // real commit epoch, making the data visible.
175        assert_eq!(db.node_count(), 1, "Node should be visible after commit");
176    }
177
178    #[test]
179    fn test_prepared_commit_with_edges() {
180        let db = GrafeoDB::new_in_memory();
181        let mut session = db.session();
182
183        // Create nodes first, commit, then create edge in a second transaction.
184        // This avoids the issue where MATCH within the same transaction cannot
185        // find PENDING-epoch nodes for the cross-product join.
186        session.begin_transaction().unwrap();
187        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
188        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
189        session.commit().unwrap();
190
191        assert_eq!(
192            db.node_count(),
193            2,
194            "Both nodes should be visible after first commit"
195        );
196
197        // Second transaction: create edge between the now-committed nodes.
198        session.begin_transaction().unwrap();
199        session
200            .execute(
201                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
202            )
203            .unwrap();
204
205        let prepared = session.prepare_commit().unwrap();
206        prepared.commit().unwrap();
207
208        // Verify everything is visible after commit.
209        assert_eq!(
210            db.node_count(),
211            2,
212            "Both nodes should be visible after commit"
213        );
214        let session2 = db.session();
215        let result = session2
216            .execute("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name")
217            .unwrap();
218        assert_eq!(result.row_count(), 1, "Edge should be visible after commit");
219    }
220
221    #[test]
222    fn test_prepared_commit_metadata() {
223        let db = GrafeoDB::new_in_memory();
224        let mut session = db.session();
225
226        session.begin_transaction().unwrap();
227        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
228
229        let mut prepared = session.prepare_commit().unwrap();
230        prepared.set_metadata("audit_user", "admin");
231        prepared.set_metadata("source", "api");
232
233        assert_eq!(prepared.metadata().len(), 2);
234        assert_eq!(prepared.metadata().get("audit_user").unwrap(), "admin");
235
236        prepared.commit().unwrap();
237    }
238
239    #[test]
240    fn test_prepared_commit_abort() {
241        let db = GrafeoDB::new_in_memory();
242        let mut session = db.session();
243
244        session.begin_transaction().unwrap();
245        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
246
247        let prepared = session.prepare_commit().unwrap();
248        prepared.abort().unwrap();
249
250        // Data should not be visible after abort
251        let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
252        assert_eq!(result.rows.len(), 0);
253    }
254
255    #[test]
256    fn test_prepared_commit_drop_rollback() {
257        let db = GrafeoDB::new_in_memory();
258        let mut session = db.session();
259
260        session.begin_transaction().unwrap();
261        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
262
263        {
264            let _prepared = session.prepare_commit().unwrap();
265            // Drop without commit or abort
266        }
267
268        // Data should not be visible after drop-induced rollback
269        let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
270        assert_eq!(result.rows.len(), 0);
271    }
272
273    #[test]
274    fn test_prepared_commit_no_transaction() {
275        let db = GrafeoDB::new_in_memory();
276        let mut session = db.session();
277
278        // Should fail when no transaction is active
279        let result = session.prepare_commit();
280        assert!(result.is_err());
281    }
282}