rust_logic_graph/multi_db/
transaction.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tracing::{error, info, warn};
5
6use crate::error::{ErrorContext, RustLogicGraphError};
7
8/// Transaction state for distributed transactions
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum TransactionState {
11    /// Transaction initiated, waiting for prepare phase
12    Initiated,
13    /// All participants prepared, ready to commit
14    Prepared,
15    /// Transaction committed successfully
16    Committed,
17    /// Transaction aborted/rolled back
18    Aborted,
19}
20
21/// Participant in a distributed transaction
22#[derive(Debug, Clone)]
23pub struct TransactionParticipant {
24    pub id: String,
25    pub database: String,
26    pub state: TransactionState,
27}
28
29/// Distributed transaction using Two-Phase Commit (2PC) protocol
30///
31/// Coordinates transactions across multiple databases to ensure atomicity.
32///
33/// # Example
34/// ```no_run
35/// use rust_logic_graph::multi_db::DistributedTransaction;
36///
37/// #[tokio::main]
38/// async fn main() -> anyhow::Result<()> {
39///     let mut txn = DistributedTransaction::new("order_txn_123");
40///     
41///     // Register participants
42///     txn.add_participant("orders_db", "Insert order");
43///     txn.add_participant("inventory_db", "Decrement stock");
44///     txn.add_participant("payments_db", "Charge customer");
45///     
46///     // Phase 1: Prepare
47///     txn.prepare().await?;
48///     
49///     // Phase 2: Commit (or abort if any participant fails)
50///     if txn.can_commit() {
51///         txn.commit().await?;
52///     } else {
53///         txn.abort().await?;
54///     }
55///     
56///     Ok(())
57/// }
58/// ```
59#[derive(Clone)]
60pub struct DistributedTransaction {
61    pub id: String,
62    pub participants: Vec<TransactionParticipant>,
63    pub state: TransactionState,
64    pub metadata: HashMap<String, String>,
65}
66
67impl DistributedTransaction {
68    /// Create a new distributed transaction
69    pub fn new(id: impl Into<String>) -> Self {
70        let txn_id = id.into();
71        info!(
72            "🔄 Distributed Transaction: Creating transaction '{}'",
73            txn_id
74        );
75
76        Self {
77            id: txn_id,
78            participants: Vec::new(),
79            state: TransactionState::Initiated,
80            metadata: HashMap::new(),
81        }
82    }
83
84    /// Add a participant to the transaction
85    pub fn add_participant(
86        &mut self,
87        database: impl Into<String>,
88        id: impl Into<String>,
89    ) -> &mut Self {
90        let participant = TransactionParticipant {
91            id: id.into(),
92            database: database.into(),
93            state: TransactionState::Initiated,
94        };
95
96        info!(
97            "  ➕ Adding participant: {} ({})",
98            participant.id, participant.database
99        );
100        self.participants.push(participant);
101        self
102    }
103
104    /// Add metadata to the transaction
105    pub fn add_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
106        self.metadata.insert(key.into(), value.into());
107        self
108    }
109
110    /// Phase 1: Prepare - Ask all participants if they can commit
111    ///
112    /// In a real implementation, this would:
113    /// 1. Lock resources in each database
114    /// 2. Validate constraints
115    /// 3. Write to transaction log
116    /// 4. Return ready/abort status
117    pub async fn prepare(&mut self) -> Result<bool, RustLogicGraphError> {
118        info!(
119            "🔄 Transaction '{}': PREPARE phase starting ({} participants)",
120            self.id,
121            self.participants.len()
122        );
123
124        if self.state != TransactionState::Initiated {
125            return Err(RustLogicGraphError::configuration_error(format!(
126                "Cannot prepare transaction in state {:?}",
127                self.state
128            )));
129        }
130
131        let mut all_prepared = true;
132
133        // Ask each participant to prepare
134        let participant_count = self.participants.len();
135        for i in 0..participant_count {
136            info!(
137                "  🔍 Preparing participant: {} ({})",
138                self.participants[i].id, self.participants[i].database
139            );
140
141            // TODO: In real implementation, call database-specific prepare
142            // For now, simulate success
143            let participant = &self.participants[i];
144            let prepared = self.simulate_prepare(participant).await?;
145
146            if prepared {
147                self.participants[i].state = TransactionState::Prepared;
148                info!(
149                    "  ✅ Participant {} prepared successfully",
150                    self.participants[i].id
151                );
152            } else {
153                self.participants[i].state = TransactionState::Aborted;
154                warn!(
155                    "  ❌ Participant {} failed to prepare",
156                    self.participants[i].id
157                );
158                all_prepared = false;
159                break;
160            }
161        }
162
163        if all_prepared {
164            self.state = TransactionState::Prepared;
165            info!("✅ Transaction '{}': All participants prepared", self.id);
166        } else {
167            self.state = TransactionState::Aborted;
168            warn!("âš ī¸  Transaction '{}': Prepare phase failed", self.id);
169        }
170
171        Ok(all_prepared)
172    }
173
174    /// Check if transaction can commit (all participants prepared)
175    pub fn can_commit(&self) -> bool {
176        self.state == TransactionState::Prepared
177            && self
178                .participants
179                .iter()
180                .all(|p| p.state == TransactionState::Prepared)
181    }
182
183    /// Phase 2: Commit - Instruct all participants to commit
184    pub async fn commit(&mut self) -> Result<(), RustLogicGraphError> {
185        info!("🔄 Transaction '{}': COMMIT phase starting", self.id);
186
187        if !self.can_commit() {
188            return Err(RustLogicGraphError::configuration_error(format!(
189                "Cannot commit transaction in state {:?}",
190                self.state
191            )));
192        }
193
194        // Commit all participants
195        let participant_count = self.participants.len();
196        for i in 0..participant_count {
197            info!(
198                "  💾 Committing participant: {} ({})",
199                self.participants[i].id, self.participants[i].database
200            );
201
202            // TODO: In real implementation, call database-specific commit
203            let participant = &self.participants[i];
204            self.simulate_commit(participant).await?;
205
206            self.participants[i].state = TransactionState::Committed;
207            info!("  ✅ Participant {} committed", self.participants[i].id);
208        }
209
210        self.state = TransactionState::Committed;
211        info!("✅ Transaction '{}': Successfully committed", self.id);
212
213        Ok(())
214    }
215
216    /// Abort/rollback the transaction
217    pub async fn abort(&mut self) -> Result<(), RustLogicGraphError> {
218        warn!("🔄 Transaction '{}': ABORT phase starting", self.id);
219
220        // Rollback all participants
221        let participant_count = self.participants.len();
222        for i in 0..participant_count {
223            if self.participants[i].state == TransactionState::Prepared {
224                warn!(
225                    "  â†Šī¸  Rolling back participant: {} ({})",
226                    self.participants[i].id, self.participants[i].database
227                );
228
229                // TODO: In real implementation, call database-specific rollback
230                let participant = &self.participants[i];
231                self.simulate_rollback(participant).await?;
232
233                self.participants[i].state = TransactionState::Aborted;
234                warn!("  ✅ Participant {} rolled back", self.participants[i].id);
235            }
236        }
237
238        self.state = TransactionState::Aborted;
239        warn!("âš ī¸  Transaction '{}': Aborted and rolled back", self.id);
240
241        Ok(())
242    }
243
244    // Simulation methods (to be replaced with real database calls)
245
246    async fn simulate_prepare(
247        &self,
248        _participant: &TransactionParticipant,
249    ) -> Result<bool, RustLogicGraphError> {
250        // Simulate network delay
251        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
252        Ok(true) // Always succeed for now
253    }
254
255    async fn simulate_commit(
256        &self,
257        _participant: &TransactionParticipant,
258    ) -> Result<(), RustLogicGraphError> {
259        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
260        Ok(())
261    }
262
263    async fn simulate_rollback(
264        &self,
265        _participant: &TransactionParticipant,
266    ) -> Result<(), RustLogicGraphError> {
267        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
268        Ok(())
269    }
270}
271
272/// Transaction coordinator managing multiple distributed transactions
273pub struct TransactionCoordinator {
274    transactions: Arc<Mutex<HashMap<String, DistributedTransaction>>>,
275}
276
277impl TransactionCoordinator {
278    /// Create a new transaction coordinator
279    pub fn new() -> Self {
280        Self {
281            transactions: Arc::new(Mutex::new(HashMap::new())),
282        }
283    }
284
285    /// Begin a new distributed transaction
286    pub async fn begin(&self, txn_id: impl Into<String>) -> Result<String, RustLogicGraphError> {
287        let id = txn_id.into();
288        let txn = DistributedTransaction::new(id.clone());
289
290        let mut txns = self.transactions.lock().await;
291        if txns.contains_key(&id) {
292            return Err(RustLogicGraphError::configuration_error(format!(
293                "Transaction '{}' already exists",
294                id
295            )));
296        }
297
298        txns.insert(id.clone(), txn);
299        Ok(id)
300    }
301
302    /// Get a transaction by ID
303    pub async fn get(&self, txn_id: &str) -> Result<DistributedTransaction, RustLogicGraphError> {
304        let txns = self.transactions.lock().await;
305        txns.get(txn_id).cloned().ok_or_else(|| {
306            RustLogicGraphError::configuration_error(format!("Transaction '{}' not found", txn_id))
307        })
308    }
309
310    /// Update a transaction
311    pub async fn update(&self, txn: DistributedTransaction) -> Result<(), RustLogicGraphError> {
312        let mut txns = self.transactions.lock().await;
313        txns.insert(txn.id.clone(), txn);
314        Ok(())
315    }
316
317    /// Remove a completed transaction
318    pub async fn remove(&self, txn_id: &str) -> Result<(), RustLogicGraphError> {
319        let mut txns = self.transactions.lock().await;
320        txns.remove(txn_id);
321        Ok(())
322    }
323
324    /// Get all active transactions
325    pub async fn active_transactions(&self) -> Vec<String> {
326        let txns = self.transactions.lock().await;
327        txns.keys().cloned().collect()
328    }
329}
330
331impl Default for TransactionCoordinator {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[tokio::test]
342    async fn test_transaction_lifecycle() {
343        let mut txn = DistributedTransaction::new("test_txn");
344
345        txn.add_participant("db1", "op1");
346        txn.add_participant("db2", "op2");
347
348        // Prepare
349        let prepared = txn.prepare().await.unwrap();
350        assert!(prepared);
351        assert_eq!(txn.state, TransactionState::Prepared);
352
353        // Commit
354        txn.commit().await.unwrap();
355        assert_eq!(txn.state, TransactionState::Committed);
356    }
357
358    #[tokio::test]
359    async fn test_transaction_abort() {
360        let mut txn = DistributedTransaction::new("test_txn");
361
362        txn.add_participant("db1", "op1");
363
364        // Prepare
365        txn.prepare().await.unwrap();
366
367        // Abort instead of commit
368        txn.abort().await.unwrap();
369        assert_eq!(txn.state, TransactionState::Aborted);
370    }
371
372    #[tokio::test]
373    async fn test_coordinator() {
374        let coordinator = TransactionCoordinator::new();
375
376        let txn_id = coordinator.begin("txn1").await.unwrap();
377        assert_eq!(txn_id, "txn1");
378
379        let txn = coordinator.get(&txn_id).await.unwrap();
380        assert_eq!(txn.id, "txn1");
381
382        let active = coordinator.active_transactions().await;
383        assert_eq!(active.len(), 1);
384    }
385}