rust_logic_graph/multi_db/
transaction.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tracing::{info, warn, error};
5
6use crate::error::{RustLogicGraphError, ErrorContext};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum TransactionState {
11 Initiated,
13 Prepared,
15 Committed,
17 Aborted,
19}
20
21#[derive(Debug, Clone)]
23pub struct TransactionParticipant {
24 pub id: String,
25 pub database: String,
26 pub state: TransactionState,
27}
28
29#[derive(Clone)]
60pub struct DistributedTransaction {
61 pub id: String,
62 pub participants: Vec<TransactionParticipant>,
63 pub state: TransactionState,
64 pub metadata: HashMap<String, String>,
65}
66
67impl DistributedTransaction {
68 pub fn new(id: impl Into<String>) -> Self {
70 let txn_id = id.into();
71 info!("đ Distributed Transaction: Creating transaction '{}'", txn_id);
72
73 Self {
74 id: txn_id,
75 participants: Vec::new(),
76 state: TransactionState::Initiated,
77 metadata: HashMap::new(),
78 }
79 }
80
81 pub fn add_participant(&mut self, database: impl Into<String>, id: impl Into<String>) -> &mut Self {
83 let participant = TransactionParticipant {
84 id: id.into(),
85 database: database.into(),
86 state: TransactionState::Initiated,
87 };
88
89 info!(" â Adding participant: {} ({})", participant.id, participant.database);
90 self.participants.push(participant);
91 self
92 }
93
94 pub fn add_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
96 self.metadata.insert(key.into(), value.into());
97 self
98 }
99
100 pub async fn prepare(&mut self) -> Result<bool, RustLogicGraphError> {
108 info!("đ Transaction '{}': PREPARE phase starting ({} participants)",
109 self.id, self.participants.len());
110
111 if self.state != TransactionState::Initiated {
112 return Err(RustLogicGraphError::configuration_error(
113 format!("Cannot prepare transaction in state {:?}", self.state)
114 ));
115 }
116
117 let mut all_prepared = true;
118
119 let participant_count = self.participants.len();
121 for i in 0..participant_count {
122 info!(" đ Preparing participant: {} ({})",
123 self.participants[i].id, self.participants[i].database);
124
125 let participant = &self.participants[i];
128 let prepared = self.simulate_prepare(participant).await?;
129
130 if prepared {
131 self.participants[i].state = TransactionState::Prepared;
132 info!(" â
Participant {} prepared successfully", self.participants[i].id);
133 } else {
134 self.participants[i].state = TransactionState::Aborted;
135 warn!(" â Participant {} failed to prepare", self.participants[i].id);
136 all_prepared = false;
137 break;
138 }
139 }
140
141 if all_prepared {
142 self.state = TransactionState::Prepared;
143 info!("â
Transaction '{}': All participants prepared", self.id);
144 } else {
145 self.state = TransactionState::Aborted;
146 warn!("â ī¸ Transaction '{}': Prepare phase failed", self.id);
147 }
148
149 Ok(all_prepared)
150 }
151
152 pub fn can_commit(&self) -> bool {
154 self.state == TransactionState::Prepared &&
155 self.participants.iter().all(|p| p.state == TransactionState::Prepared)
156 }
157
158 pub async fn commit(&mut self) -> Result<(), RustLogicGraphError> {
160 info!("đ Transaction '{}': COMMIT phase starting", self.id);
161
162 if !self.can_commit() {
163 return Err(RustLogicGraphError::configuration_error(
164 format!("Cannot commit transaction in state {:?}", self.state)
165 ));
166 }
167
168 let participant_count = self.participants.len();
170 for i in 0..participant_count {
171 info!(" đž Committing participant: {} ({})",
172 self.participants[i].id, self.participants[i].database);
173
174 let participant = &self.participants[i];
176 self.simulate_commit(participant).await?;
177
178 self.participants[i].state = TransactionState::Committed;
179 info!(" â
Participant {} committed", self.participants[i].id);
180 }
181
182 self.state = TransactionState::Committed;
183 info!("â
Transaction '{}': Successfully committed", self.id);
184
185 Ok(())
186 }
187
188 pub async fn abort(&mut self) -> Result<(), RustLogicGraphError> {
190 warn!("đ Transaction '{}': ABORT phase starting", self.id);
191
192 let participant_count = self.participants.len();
194 for i in 0..participant_count {
195 if self.participants[i].state == TransactionState::Prepared {
196 warn!(" âŠī¸ Rolling back participant: {} ({})",
197 self.participants[i].id, self.participants[i].database);
198
199 let participant = &self.participants[i];
201 self.simulate_rollback(participant).await?;
202
203 self.participants[i].state = TransactionState::Aborted;
204 warn!(" â
Participant {} rolled back", self.participants[i].id);
205 }
206 }
207
208 self.state = TransactionState::Aborted;
209 warn!("â ī¸ Transaction '{}': Aborted and rolled back", self.id);
210
211 Ok(())
212 }
213
214 async fn simulate_prepare(&self, _participant: &TransactionParticipant) -> Result<bool, RustLogicGraphError> {
217 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
219 Ok(true) }
221
222 async fn simulate_commit(&self, _participant: &TransactionParticipant) -> Result<(), RustLogicGraphError> {
223 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
224 Ok(())
225 }
226
227 async fn simulate_rollback(&self, _participant: &TransactionParticipant) -> Result<(), RustLogicGraphError> {
228 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
229 Ok(())
230 }
231}
232
233pub struct TransactionCoordinator {
235 transactions: Arc<Mutex<HashMap<String, DistributedTransaction>>>,
236}
237
238impl TransactionCoordinator {
239 pub fn new() -> Self {
241 Self {
242 transactions: Arc::new(Mutex::new(HashMap::new())),
243 }
244 }
245
246 pub async fn begin(&self, txn_id: impl Into<String>) -> Result<String, RustLogicGraphError> {
248 let id = txn_id.into();
249 let txn = DistributedTransaction::new(id.clone());
250
251 let mut txns = self.transactions.lock().await;
252 if txns.contains_key(&id) {
253 return Err(RustLogicGraphError::configuration_error(
254 format!("Transaction '{}' already exists", id)
255 ));
256 }
257
258 txns.insert(id.clone(), txn);
259 Ok(id)
260 }
261
262 pub async fn get(&self, txn_id: &str) -> Result<DistributedTransaction, RustLogicGraphError> {
264 let txns = self.transactions.lock().await;
265 txns.get(txn_id)
266 .cloned()
267 .ok_or_else(|| RustLogicGraphError::configuration_error(
268 format!("Transaction '{}' not found", txn_id)
269 ))
270 }
271
272 pub async fn update(&self, txn: DistributedTransaction) -> Result<(), RustLogicGraphError> {
274 let mut txns = self.transactions.lock().await;
275 txns.insert(txn.id.clone(), txn);
276 Ok(())
277 }
278
279 pub async fn remove(&self, txn_id: &str) -> Result<(), RustLogicGraphError> {
281 let mut txns = self.transactions.lock().await;
282 txns.remove(txn_id);
283 Ok(())
284 }
285
286 pub async fn active_transactions(&self) -> Vec<String> {
288 let txns = self.transactions.lock().await;
289 txns.keys().cloned().collect()
290 }
291}
292
293impl Default for TransactionCoordinator {
294 fn default() -> Self {
295 Self::new()
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[tokio::test]
304 async fn test_transaction_lifecycle() {
305 let mut txn = DistributedTransaction::new("test_txn");
306
307 txn.add_participant("db1", "op1");
308 txn.add_participant("db2", "op2");
309
310 let prepared = txn.prepare().await.unwrap();
312 assert!(prepared);
313 assert_eq!(txn.state, TransactionState::Prepared);
314
315 txn.commit().await.unwrap();
317 assert_eq!(txn.state, TransactionState::Committed);
318 }
319
320 #[tokio::test]
321 async fn test_transaction_abort() {
322 let mut txn = DistributedTransaction::new("test_txn");
323
324 txn.add_participant("db1", "op1");
325
326 txn.prepare().await.unwrap();
328
329 txn.abort().await.unwrap();
331 assert_eq!(txn.state, TransactionState::Aborted);
332 }
333
334 #[tokio::test]
335 async fn test_coordinator() {
336 let coordinator = TransactionCoordinator::new();
337
338 let txn_id = coordinator.begin("txn1").await.unwrap();
339 assert_eq!(txn_id, "txn1");
340
341 let txn = coordinator.get(&txn_id).await.unwrap();
342 assert_eq!(txn.id, "txn1");
343
344 let active = coordinator.active_transactions().await;
345 assert_eq!(active.len(), 1);
346 }
347}