rust_logic_graph/multi_db/
transaction.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4use tracing::{info, warn, error};
5
6use crate::error::{RustLogicGraphError, ErrorContext};
7
8/// Transaction state for distributed transactions
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum TransactionState {
11    /// Transaction initiated, waiting for prepare phase
12    Initiated,
13    /// All participants prepared, ready to commit
14    Prepared,
15    /// Transaction committed successfully
16    Committed,
17    /// Transaction aborted/rolled back
18    Aborted,
19}
20
21/// Participant in a distributed transaction
22#[derive(Debug, Clone)]
23pub struct TransactionParticipant {
24    pub id: String,
25    pub database: String,
26    pub state: TransactionState,
27}
28
29/// Distributed transaction using Two-Phase Commit (2PC) protocol
30/// 
31/// Coordinates transactions across multiple databases to ensure atomicity.
32/// 
33/// # Example
34/// ```no_run
35/// use rust_logic_graph::multi_db::DistributedTransaction;
36/// 
37/// #[tokio::main]
38/// async fn main() -> anyhow::Result<()> {
39///     let mut txn = DistributedTransaction::new("order_txn_123");
40///     
41///     // Register participants
42///     txn.add_participant("orders_db", "Insert order");
43///     txn.add_participant("inventory_db", "Decrement stock");
44///     txn.add_participant("payments_db", "Charge customer");
45///     
46///     // Phase 1: Prepare
47///     txn.prepare().await?;
48///     
49///     // Phase 2: Commit (or abort if any participant fails)
50///     if txn.can_commit() {
51///         txn.commit().await?;
52///     } else {
53///         txn.abort().await?;
54///     }
55///     
56///     Ok(())
57/// }
58/// ```
59#[derive(Clone)]
60pub struct DistributedTransaction {
61    pub id: String,
62    pub participants: Vec<TransactionParticipant>,
63    pub state: TransactionState,
64    pub metadata: HashMap<String, String>,
65}
66
67impl DistributedTransaction {
68    /// Create a new distributed transaction
69    pub fn new(id: impl Into<String>) -> Self {
70        let txn_id = id.into();
71        info!("🔄 Distributed Transaction: Creating transaction '{}'", txn_id);
72        
73        Self {
74            id: txn_id,
75            participants: Vec::new(),
76            state: TransactionState::Initiated,
77            metadata: HashMap::new(),
78        }
79    }
80    
81    /// Add a participant to the transaction
82    pub fn add_participant(&mut self, database: impl Into<String>, id: impl Into<String>) -> &mut Self {
83        let participant = TransactionParticipant {
84            id: id.into(),
85            database: database.into(),
86            state: TransactionState::Initiated,
87        };
88        
89        info!("  ➕ Adding participant: {} ({})", participant.id, participant.database);
90        self.participants.push(participant);
91        self
92    }
93    
94    /// Add metadata to the transaction
95    pub fn add_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
96        self.metadata.insert(key.into(), value.into());
97        self
98    }
99    
100    /// Phase 1: Prepare - Ask all participants if they can commit
101    /// 
102    /// In a real implementation, this would:
103    /// 1. Lock resources in each database
104    /// 2. Validate constraints
105    /// 3. Write to transaction log
106    /// 4. Return ready/abort status
107    pub async fn prepare(&mut self) -> Result<bool, RustLogicGraphError> {
108        info!("🔄 Transaction '{}': PREPARE phase starting ({} participants)", 
109            self.id, self.participants.len());
110        
111        if self.state != TransactionState::Initiated {
112            return Err(RustLogicGraphError::configuration_error(
113                format!("Cannot prepare transaction in state {:?}", self.state)
114            ));
115        }
116        
117        let mut all_prepared = true;
118        
119        // Ask each participant to prepare
120        let participant_count = self.participants.len();
121        for i in 0..participant_count {
122            info!("  🔍 Preparing participant: {} ({})", 
123                self.participants[i].id, self.participants[i].database);
124            
125            // TODO: In real implementation, call database-specific prepare
126            // For now, simulate success
127            let participant = &self.participants[i];
128            let prepared = self.simulate_prepare(participant).await?;
129            
130            if prepared {
131                self.participants[i].state = TransactionState::Prepared;
132                info!("  ✅ Participant {} prepared successfully", self.participants[i].id);
133            } else {
134                self.participants[i].state = TransactionState::Aborted;
135                warn!("  ❌ Participant {} failed to prepare", self.participants[i].id);
136                all_prepared = false;
137                break;
138            }
139        }
140        
141        if all_prepared {
142            self.state = TransactionState::Prepared;
143            info!("✅ Transaction '{}': All participants prepared", self.id);
144        } else {
145            self.state = TransactionState::Aborted;
146            warn!("âš ī¸  Transaction '{}': Prepare phase failed", self.id);
147        }
148        
149        Ok(all_prepared)
150    }
151    
152    /// Check if transaction can commit (all participants prepared)
153    pub fn can_commit(&self) -> bool {
154        self.state == TransactionState::Prepared &&
155        self.participants.iter().all(|p| p.state == TransactionState::Prepared)
156    }
157    
158    /// Phase 2: Commit - Instruct all participants to commit
159    pub async fn commit(&mut self) -> Result<(), RustLogicGraphError> {
160        info!("🔄 Transaction '{}': COMMIT phase starting", self.id);
161        
162        if !self.can_commit() {
163            return Err(RustLogicGraphError::configuration_error(
164                format!("Cannot commit transaction in state {:?}", self.state)
165            ));
166        }
167        
168        // Commit all participants
169        let participant_count = self.participants.len();
170        for i in 0..participant_count {
171            info!("  💾 Committing participant: {} ({})", 
172                self.participants[i].id, self.participants[i].database);
173            
174            // TODO: In real implementation, call database-specific commit
175            let participant = &self.participants[i];
176            self.simulate_commit(participant).await?;
177            
178            self.participants[i].state = TransactionState::Committed;
179            info!("  ✅ Participant {} committed", self.participants[i].id);
180        }
181        
182        self.state = TransactionState::Committed;
183        info!("✅ Transaction '{}': Successfully committed", self.id);
184        
185        Ok(())
186    }
187    
188    /// Abort/rollback the transaction
189    pub async fn abort(&mut self) -> Result<(), RustLogicGraphError> {
190        warn!("🔄 Transaction '{}': ABORT phase starting", self.id);
191        
192        // Rollback all participants
193        let participant_count = self.participants.len();
194        for i in 0..participant_count {
195            if self.participants[i].state == TransactionState::Prepared {
196                warn!("  â†Šī¸  Rolling back participant: {} ({})", 
197                    self.participants[i].id, self.participants[i].database);
198                
199                // TODO: In real implementation, call database-specific rollback
200                let participant = &self.participants[i];
201                self.simulate_rollback(participant).await?;
202                
203                self.participants[i].state = TransactionState::Aborted;
204                warn!("  ✅ Participant {} rolled back", self.participants[i].id);
205            }
206        }
207        
208        self.state = TransactionState::Aborted;
209        warn!("âš ī¸  Transaction '{}': Aborted and rolled back", self.id);
210        
211        Ok(())
212    }
213    
214    // Simulation methods (to be replaced with real database calls)
215    
216    async fn simulate_prepare(&self, _participant: &TransactionParticipant) -> Result<bool, RustLogicGraphError> {
217        // Simulate network delay
218        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
219        Ok(true) // Always succeed for now
220    }
221    
222    async fn simulate_commit(&self, _participant: &TransactionParticipant) -> Result<(), RustLogicGraphError> {
223        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
224        Ok(())
225    }
226    
227    async fn simulate_rollback(&self, _participant: &TransactionParticipant) -> Result<(), RustLogicGraphError> {
228        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
229        Ok(())
230    }
231}
232
233/// Transaction coordinator managing multiple distributed transactions
234pub struct TransactionCoordinator {
235    transactions: Arc<Mutex<HashMap<String, DistributedTransaction>>>,
236}
237
238impl TransactionCoordinator {
239    /// Create a new transaction coordinator
240    pub fn new() -> Self {
241        Self {
242            transactions: Arc::new(Mutex::new(HashMap::new())),
243        }
244    }
245    
246    /// Begin a new distributed transaction
247    pub async fn begin(&self, txn_id: impl Into<String>) -> Result<String, RustLogicGraphError> {
248        let id = txn_id.into();
249        let txn = DistributedTransaction::new(id.clone());
250        
251        let mut txns = self.transactions.lock().await;
252        if txns.contains_key(&id) {
253            return Err(RustLogicGraphError::configuration_error(
254                format!("Transaction '{}' already exists", id)
255            ));
256        }
257        
258        txns.insert(id.clone(), txn);
259        Ok(id)
260    }
261    
262    /// Get a transaction by ID
263    pub async fn get(&self, txn_id: &str) -> Result<DistributedTransaction, RustLogicGraphError> {
264        let txns = self.transactions.lock().await;
265        txns.get(txn_id)
266            .cloned()
267            .ok_or_else(|| RustLogicGraphError::configuration_error(
268                format!("Transaction '{}' not found", txn_id)
269            ))
270    }
271    
272    /// Update a transaction
273    pub async fn update(&self, txn: DistributedTransaction) -> Result<(), RustLogicGraphError> {
274        let mut txns = self.transactions.lock().await;
275        txns.insert(txn.id.clone(), txn);
276        Ok(())
277    }
278    
279    /// Remove a completed transaction
280    pub async fn remove(&self, txn_id: &str) -> Result<(), RustLogicGraphError> {
281        let mut txns = self.transactions.lock().await;
282        txns.remove(txn_id);
283        Ok(())
284    }
285    
286    /// Get all active transactions
287    pub async fn active_transactions(&self) -> Vec<String> {
288        let txns = self.transactions.lock().await;
289        txns.keys().cloned().collect()
290    }
291}
292
293impl Default for TransactionCoordinator {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    
303    #[tokio::test]
304    async fn test_transaction_lifecycle() {
305        let mut txn = DistributedTransaction::new("test_txn");
306        
307        txn.add_participant("db1", "op1");
308        txn.add_participant("db2", "op2");
309        
310        // Prepare
311        let prepared = txn.prepare().await.unwrap();
312        assert!(prepared);
313        assert_eq!(txn.state, TransactionState::Prepared);
314        
315        // Commit
316        txn.commit().await.unwrap();
317        assert_eq!(txn.state, TransactionState::Committed);
318    }
319    
320    #[tokio::test]
321    async fn test_transaction_abort() {
322        let mut txn = DistributedTransaction::new("test_txn");
323        
324        txn.add_participant("db1", "op1");
325        
326        // Prepare
327        txn.prepare().await.unwrap();
328        
329        // Abort instead of commit
330        txn.abort().await.unwrap();
331        assert_eq!(txn.state, TransactionState::Aborted);
332    }
333    
334    #[tokio::test]
335    async fn test_coordinator() {
336        let coordinator = TransactionCoordinator::new();
337        
338        let txn_id = coordinator.begin("txn1").await.unwrap();
339        assert_eq!(txn_id, "txn1");
340        
341        let txn = coordinator.get(&txn_id).await.unwrap();
342        assert_eq!(txn.id, "txn1");
343        
344        let active = coordinator.active_transactions().await;
345        assert_eq!(active.len(), 1);
346    }
347}