Skip to main content

grafeo_engine/transaction/
prepared.rs

1//! Two-phase commit with inspection and metadata.
2//!
3//! `PreparedCommit` lets you inspect pending changes before finalizing a
4//! transaction. This is useful for external integrations that need to
5//! validate, audit, or attach metadata before committing.
6//!
7//! # Example
8//!
9//! ```no_run
10//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
11//! use grafeo_engine::GrafeoDB;
12//!
13//! let db = GrafeoDB::new_in_memory();
14//! let mut session = db.session();
15//!
16//! session.begin_tx()?;
17//! session.execute("INSERT (:Person {name: 'Alice'})")?;
18//!
19//! let mut prepared = session.prepare_commit()?;
20//! let info = prepared.info();
21//! assert_eq!(info.nodes_written, 1);
22//!
23//! prepared.set_metadata("audit_user", "admin");
24//! prepared.commit()?;
25//! # Ok(())
26//! # }
27//! ```
28
29use std::collections::HashMap;
30
31use grafeo_common::types::{EpochId, TxId};
32use grafeo_common::utils::error::{Error, Result, TransactionError};
33
34use crate::Session;
35
36/// Summary of pending transaction mutations.
37#[derive(Debug, Clone)]
38pub struct CommitInfo {
39    /// Transaction ID.
40    pub txn_id: TxId,
41    /// Snapshot epoch the transaction read from.
42    pub start_epoch: EpochId,
43    /// Number of node entities in the write set.
44    pub nodes_written: u64,
45    /// Number of edge entities in the write set.
46    pub edges_written: u64,
47}
48
49/// A transaction that has been validated and is ready to commit.
50///
51/// Created by [`Session::prepare_commit`]. The mutable borrow on the session
52/// prevents any concurrent operations while the commit is pending.
53///
54/// If dropped without calling [`commit`](Self::commit) or [`abort`](Self::abort),
55/// the transaction is automatically rolled back.
56pub struct PreparedCommit<'a> {
57    session: &'a mut Session,
58    metadata: HashMap<String, String>,
59    info: CommitInfo,
60    finalized: bool,
61}
62
63impl<'a> PreparedCommit<'a> {
64    /// Creates a new prepared commit from a session with an active transaction.
65    pub(crate) fn new(session: &'a mut Session) -> Result<Self> {
66        let tx_id = session.current_tx_id().ok_or_else(|| {
67            Error::Transaction(TransactionError::InvalidState(
68                "No active transaction to prepare".to_string(),
69            ))
70        })?;
71
72        let start_epoch = session
73            .tx_manager()
74            .start_epoch(tx_id)
75            .unwrap_or(EpochId::new(0));
76
77        // Compute mutation counts from store deltas since begin_tx.
78        let (start_nodes, current_nodes) = session.node_count_delta();
79        let (start_edges, current_edges) = session.edge_count_delta();
80        let nodes_written = current_nodes.saturating_sub(start_nodes) as u64;
81        let edges_written = current_edges.saturating_sub(start_edges) as u64;
82
83        let info = CommitInfo {
84            txn_id: tx_id,
85            start_epoch,
86            nodes_written,
87            edges_written,
88        };
89
90        Ok(Self {
91            session,
92            metadata: HashMap::new(),
93            info,
94            finalized: false,
95        })
96    }
97
98    /// Returns the commit info (mutation summary).
99    #[must_use]
100    pub fn info(&self) -> &CommitInfo {
101        &self.info
102    }
103
104    /// Attaches metadata to this commit.
105    ///
106    /// Metadata is available for logging, auditing, or CDC consumers.
107    pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
108        self.metadata.insert(key.into(), value.into());
109    }
110
111    /// Returns the attached metadata.
112    #[must_use]
113    pub fn metadata(&self) -> &HashMap<String, String> {
114        &self.metadata
115    }
116
117    /// Finalizes the commit, persisting all changes.
118    ///
119    /// Consumes self to prevent double-commit.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the commit fails (write-write conflict, SSI violation, etc.).
124    pub fn commit(mut self) -> Result<EpochId> {
125        self.finalized = true;
126        self.session.commit()?;
127        Ok(self.session.tx_manager().current_epoch())
128    }
129
130    /// Explicitly aborts the transaction, discarding all changes.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the rollback fails.
135    pub fn abort(mut self) -> Result<()> {
136        self.finalized = true;
137        self.session.rollback()
138    }
139}
140
141impl Drop for PreparedCommit<'_> {
142    fn drop(&mut self) {
143        if !self.finalized {
144            // Best-effort rollback on drop
145            let _ = self.session.rollback();
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use crate::GrafeoDB;
153
154    #[test]
155    fn test_prepared_commit_basic() {
156        let db = GrafeoDB::new_in_memory();
157        let mut session = db.session();
158
159        session.begin_tx().unwrap();
160        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
161
162        let prepared = session.prepare_commit().unwrap();
163        let info = prepared.info();
164
165        assert_eq!(info.nodes_written, 1);
166        assert_eq!(info.edges_written, 0);
167
168        let epoch = prepared.commit().unwrap();
169        assert!(epoch.as_u64() > 0);
170    }
171
172    #[test]
173    fn test_prepared_commit_with_edges() {
174        let db = GrafeoDB::new_in_memory();
175        let mut session = db.session();
176
177        session.begin_tx().unwrap();
178        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
179        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
180        session
181            .execute(
182                "MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) INSERT (a)-[:KNOWS]->(b)",
183            )
184            .unwrap();
185
186        let prepared = session.prepare_commit().unwrap();
187        let info = prepared.info();
188
189        assert_eq!(info.nodes_written, 2);
190        assert_eq!(info.edges_written, 1);
191
192        prepared.commit().unwrap();
193    }
194
195    #[test]
196    fn test_prepared_commit_metadata() {
197        let db = GrafeoDB::new_in_memory();
198        let mut session = db.session();
199
200        session.begin_tx().unwrap();
201        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
202
203        let mut prepared = session.prepare_commit().unwrap();
204        prepared.set_metadata("audit_user", "admin");
205        prepared.set_metadata("source", "api");
206
207        assert_eq!(prepared.metadata().len(), 2);
208        assert_eq!(prepared.metadata().get("audit_user").unwrap(), "admin");
209
210        prepared.commit().unwrap();
211    }
212
213    #[test]
214    fn test_prepared_commit_abort() {
215        let db = GrafeoDB::new_in_memory();
216        let mut session = db.session();
217
218        session.begin_tx().unwrap();
219        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
220
221        let prepared = session.prepare_commit().unwrap();
222        prepared.abort().unwrap();
223
224        // Data should not be visible after abort
225        let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
226        assert_eq!(result.rows.len(), 0);
227    }
228
229    #[test]
230    fn test_prepared_commit_drop_rollback() {
231        let db = GrafeoDB::new_in_memory();
232        let mut session = db.session();
233
234        session.begin_tx().unwrap();
235        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
236
237        {
238            let _prepared = session.prepare_commit().unwrap();
239            // Drop without commit or abort
240        }
241
242        // Data should not be visible after drop-induced rollback
243        let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
244        assert_eq!(result.rows.len(), 0);
245    }
246
247    #[test]
248    fn test_prepared_commit_no_transaction() {
249        let db = GrafeoDB::new_in_memory();
250        let mut session = db.session();
251
252        // Should fail when no transaction is active
253        let result = session.prepare_commit();
254        assert!(result.is_err());
255    }
256}