grafeo_engine/transaction/
prepared.rs1use std::collections::HashMap;
30
31use grafeo_common::types::{EpochId, TransactionId};
32use grafeo_common::utils::error::{Error, Result, TransactionError};
33
34use crate::Session;
35
36#[derive(Debug, Clone)]
38pub struct CommitInfo {
39 pub txn_id: TransactionId,
41 pub start_epoch: EpochId,
43 pub nodes_written: u64,
45 pub edges_written: u64,
47}
48
49pub struct PreparedCommit<'a> {
57 session: &'a mut Session,
58 metadata: HashMap<String, String>,
59 info: CommitInfo,
60 finalized: bool,
61}
62
63impl<'a> PreparedCommit<'a> {
64 pub(crate) fn new(session: &'a mut Session) -> Result<Self> {
66 let transaction_id = session.current_transaction_id().ok_or_else(|| {
67 Error::Transaction(TransactionError::InvalidState(
68 "No active transaction to prepare".to_string(),
69 ))
70 })?;
71
72 let start_epoch = session
73 .transaction_manager()
74 .start_epoch(transaction_id)
75 .unwrap_or(EpochId::new(0));
76
77 let (start_nodes, current_nodes) = session.node_count_delta();
79 let (start_edges, current_edges) = session.edge_count_delta();
80 let nodes_written = current_nodes.saturating_sub(start_nodes) as u64;
81 let edges_written = current_edges.saturating_sub(start_edges) as u64;
82
83 let info = CommitInfo {
84 txn_id: transaction_id,
85 start_epoch,
86 nodes_written,
87 edges_written,
88 };
89
90 Ok(Self {
91 session,
92 metadata: HashMap::new(),
93 info,
94 finalized: false,
95 })
96 }
97
98 #[must_use]
100 pub fn info(&self) -> &CommitInfo {
101 &self.info
102 }
103
104 pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
108 self.metadata.insert(key.into(), value.into());
109 }
110
111 #[must_use]
113 pub fn metadata(&self) -> &HashMap<String, String> {
114 &self.metadata
115 }
116
117 pub fn commit(mut self) -> Result<EpochId> {
125 self.finalized = true;
126 self.session.commit()?;
127 Ok(self.session.transaction_manager().current_epoch())
128 }
129
130 pub fn abort(mut self) -> Result<()> {
136 self.finalized = true;
137 self.session.rollback()
138 }
139}
140
141impl Drop for PreparedCommit<'_> {
142 fn drop(&mut self) {
143 if !self.finalized {
144 let _ = self.session.rollback();
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use crate::GrafeoDB;
153
154 #[test]
155 fn test_prepared_commit_basic() {
156 let db = GrafeoDB::new_in_memory();
157 let mut session = db.session();
158
159 session.begin_transaction().unwrap();
160 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
161
162 let prepared = session.prepare_commit().unwrap();
163 let info = prepared.info();
164
165 assert_eq!(info.edges_written, 0);
169
170 let epoch = prepared.commit().unwrap();
171 assert!(epoch.as_u64() > 0);
172
173 assert_eq!(db.node_count(), 1, "Node should be visible after commit");
176 }
177
178 #[test]
179 fn test_prepared_commit_with_edges() {
180 let db = GrafeoDB::new_in_memory();
181 let mut session = db.session();
182
183 session.begin_transaction().unwrap();
187 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
188 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
189 session.commit().unwrap();
190
191 assert_eq!(
192 db.node_count(),
193 2,
194 "Both nodes should be visible after first commit"
195 );
196
197 session.begin_transaction().unwrap();
199 session
200 .execute(
201 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
202 )
203 .unwrap();
204
205 let prepared = session.prepare_commit().unwrap();
206 prepared.commit().unwrap();
207
208 assert_eq!(
210 db.node_count(),
211 2,
212 "Both nodes should be visible after commit"
213 );
214 let session2 = db.session();
215 let result = session2
216 .execute("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name")
217 .unwrap();
218 assert_eq!(result.row_count(), 1, "Edge should be visible after commit");
219 }
220
221 #[test]
222 fn test_prepared_commit_metadata() {
223 let db = GrafeoDB::new_in_memory();
224 let mut session = db.session();
225
226 session.begin_transaction().unwrap();
227 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
228
229 let mut prepared = session.prepare_commit().unwrap();
230 prepared.set_metadata("audit_user", "admin");
231 prepared.set_metadata("source", "api");
232
233 assert_eq!(prepared.metadata().len(), 2);
234 assert_eq!(prepared.metadata().get("audit_user").unwrap(), "admin");
235
236 prepared.commit().unwrap();
237 }
238
239 #[test]
240 fn test_prepared_commit_abort() {
241 let db = GrafeoDB::new_in_memory();
242 let mut session = db.session();
243
244 session.begin_transaction().unwrap();
245 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
246
247 let prepared = session.prepare_commit().unwrap();
248 prepared.abort().unwrap();
249
250 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
252 assert_eq!(result.rows.len(), 0);
253 }
254
255 #[test]
256 fn test_prepared_commit_drop_rollback() {
257 let db = GrafeoDB::new_in_memory();
258 let mut session = db.session();
259
260 session.begin_transaction().unwrap();
261 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
262
263 {
264 let _prepared = session.prepare_commit().unwrap();
265 }
267
268 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
270 assert_eq!(result.rows.len(), 0);
271 }
272
273 #[test]
274 fn test_prepared_commit_no_transaction() {
275 let db = GrafeoDB::new_in_memory();
276 let mut session = db.session();
277
278 let result = session.prepare_commit();
280 assert!(result.is_err());
281 }
282}