grafeo_engine/transaction/
prepared.rs1use std::collections::HashMap;
30
31use grafeo_common::types::{EpochId, TxId};
32use grafeo_common::utils::error::{Error, Result, TransactionError};
33
34use crate::Session;
35
36#[derive(Debug, Clone)]
38pub struct CommitInfo {
39 pub txn_id: TxId,
41 pub start_epoch: EpochId,
43 pub nodes_written: u64,
45 pub edges_written: u64,
47}
48
49pub struct PreparedCommit<'a> {
57 session: &'a mut Session,
58 metadata: HashMap<String, String>,
59 info: CommitInfo,
60 finalized: bool,
61}
62
63impl<'a> PreparedCommit<'a> {
64 pub(crate) fn new(session: &'a mut Session) -> Result<Self> {
66 let tx_id = session.current_tx_id().ok_or_else(|| {
67 Error::Transaction(TransactionError::InvalidState(
68 "No active transaction to prepare".to_string(),
69 ))
70 })?;
71
72 let start_epoch = session
73 .tx_manager()
74 .start_epoch(tx_id)
75 .unwrap_or(EpochId::new(0));
76
77 let (start_nodes, current_nodes) = session.node_count_delta();
79 let (start_edges, current_edges) = session.edge_count_delta();
80 let nodes_written = current_nodes.saturating_sub(start_nodes) as u64;
81 let edges_written = current_edges.saturating_sub(start_edges) as u64;
82
83 let info = CommitInfo {
84 txn_id: tx_id,
85 start_epoch,
86 nodes_written,
87 edges_written,
88 };
89
90 Ok(Self {
91 session,
92 metadata: HashMap::new(),
93 info,
94 finalized: false,
95 })
96 }
97
98 #[must_use]
100 pub fn info(&self) -> &CommitInfo {
101 &self.info
102 }
103
104 pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
108 self.metadata.insert(key.into(), value.into());
109 }
110
111 #[must_use]
113 pub fn metadata(&self) -> &HashMap<String, String> {
114 &self.metadata
115 }
116
117 pub fn commit(mut self) -> Result<EpochId> {
125 self.finalized = true;
126 self.session.commit()?;
127 Ok(self.session.tx_manager().current_epoch())
128 }
129
130 pub fn abort(mut self) -> Result<()> {
136 self.finalized = true;
137 self.session.rollback()
138 }
139}
140
141impl Drop for PreparedCommit<'_> {
142 fn drop(&mut self) {
143 if !self.finalized {
144 let _ = self.session.rollback();
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use crate::GrafeoDB;
153
154 #[test]
155 fn test_prepared_commit_basic() {
156 let db = GrafeoDB::new_in_memory();
157 let mut session = db.session();
158
159 session.begin_tx().unwrap();
160 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
161
162 let prepared = session.prepare_commit().unwrap();
163 let info = prepared.info();
164
165 assert_eq!(info.nodes_written, 1);
166 assert_eq!(info.edges_written, 0);
167
168 let epoch = prepared.commit().unwrap();
169 assert!(epoch.as_u64() > 0);
170 }
171
172 #[test]
173 fn test_prepared_commit_with_edges() {
174 let db = GrafeoDB::new_in_memory();
175 let mut session = db.session();
176
177 session.begin_tx().unwrap();
178 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
179 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
180 session
181 .execute(
182 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
183 )
184 .unwrap();
185
186 let prepared = session.prepare_commit().unwrap();
187 let info = prepared.info();
188
189 assert_eq!(info.nodes_written, 2);
190 assert_eq!(info.edges_written, 1);
191
192 prepared.commit().unwrap();
193 }
194
195 #[test]
196 fn test_prepared_commit_metadata() {
197 let db = GrafeoDB::new_in_memory();
198 let mut session = db.session();
199
200 session.begin_tx().unwrap();
201 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
202
203 let mut prepared = session.prepare_commit().unwrap();
204 prepared.set_metadata("audit_user", "admin");
205 prepared.set_metadata("source", "api");
206
207 assert_eq!(prepared.metadata().len(), 2);
208 assert_eq!(prepared.metadata().get("audit_user").unwrap(), "admin");
209
210 prepared.commit().unwrap();
211 }
212
213 #[test]
214 fn test_prepared_commit_abort() {
215 let db = GrafeoDB::new_in_memory();
216 let mut session = db.session();
217
218 session.begin_tx().unwrap();
219 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
220
221 let prepared = session.prepare_commit().unwrap();
222 prepared.abort().unwrap();
223
224 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
226 assert_eq!(result.rows.len(), 0);
227 }
228
229 #[test]
230 fn test_prepared_commit_drop_rollback() {
231 let db = GrafeoDB::new_in_memory();
232 let mut session = db.session();
233
234 session.begin_tx().unwrap();
235 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
236
237 {
238 let _prepared = session.prepare_commit().unwrap();
239 }
241
242 let result = session.execute("MATCH (n:Person) RETURN n").unwrap();
244 assert_eq!(result.rows.len(), 0);
245 }
246
247 #[test]
248 fn test_prepared_commit_no_transaction() {
249 let db = GrafeoDB::new_in_memory();
250 let mut session = db.session();
251
252 let result = session.prepare_commit();
254 assert!(result.is_err());
255 }
256}