Skip to main content

oxirs_core/distributed/
transaction.rs

1//! Cross-shard distributed transactions with 2PC optimization
2//!
3//! This module implements Two-Phase Commit (2PC) protocol with optimizations
4//! for distributed transactions across multiple RDF shards.
5
6use crate::distributed::sharding::{ShardId, ShardManager};
7use crate::model::{BlankNode, Literal, NamedNode, Triple};
8use anyhow::{anyhow, Result};
9use dashmap::DashMap;
10use parking_lot::RwLock;
11use scirs2_core::random::{Random, RngExt};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::sync::{mpsc, oneshot};
17use uuid::Uuid;
18
19/// Transaction ID
20pub type TransactionId = Uuid;
21
22/// Node ID in the distributed system
23pub type NodeId = u64;
24
25/// Transaction configuration
26#[derive(Debug, Clone)]
27pub struct TransactionConfig {
28    /// Transaction timeout
29    pub timeout: Duration,
30
31    /// Enable read-only optimization
32    pub enable_read_only_optimization: bool,
33
34    /// Enable single-shard optimization
35    pub enable_single_shard_optimization: bool,
36
37    /// Maximum retry attempts
38    pub max_retries: usize,
39
40    /// Enable parallel prepare phase
41    pub enable_parallel_prepare: bool,
42
43    /// Deadlock detection timeout
44    pub deadlock_timeout: Duration,
45}
46
47impl Default for TransactionConfig {
48    fn default() -> Self {
49        Self {
50            timeout: Duration::from_secs(30),
51            enable_read_only_optimization: true,
52            enable_single_shard_optimization: true,
53            max_retries: 3,
54            enable_parallel_prepare: true,
55            deadlock_timeout: Duration::from_secs(10),
56        }
57    }
58}
59
60/// Transaction state
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum TransactionState {
63    /// Transaction started
64    Active,
65    /// Preparing to commit
66    Preparing,
67    /// Ready to commit
68    Prepared,
69    /// Committing
70    Committing,
71    /// Committed successfully
72    Committed,
73    /// Aborted
74    Aborted,
75}
76
77/// Transaction operation
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TransactionOp {
80    /// Insert triple
81    Insert(SerializableTriple),
82    /// Remove triple
83    Remove(SerializableTriple),
84    /// Read query
85    Read(ReadQuery),
86}
87
88/// Serializable triple for transactions
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct SerializableTriple {
91    pub subject: String,
92    pub predicate: String,
93    pub object: String,
94    pub object_type: ObjectType,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum ObjectType {
99    NamedNode,
100    BlankNode,
101    Literal {
102        datatype: Option<String>,
103        language: Option<String>,
104    },
105}
106
107/// Read query
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReadQuery {
110    pub subject: Option<String>,
111    pub predicate: Option<String>,
112    pub object: Option<String>,
113}
114
115/// Transaction coordinator (implements 2PC)
116#[allow(dead_code)]
117pub struct TransactionCoordinator {
118    /// Configuration
119    config: TransactionConfig,
120
121    /// Active transactions
122    transactions: Arc<DashMap<TransactionId, Transaction>>,
123
124    /// Shard manager
125    shard_manager: Arc<ShardManager>,
126
127    /// Transaction log
128    transaction_log: Arc<RwLock<TransactionLog>>,
129
130    /// Lock manager for deadlock detection
131    lock_manager: Arc<LockManager>,
132
133    /// Message sender for participants
134    participant_tx: mpsc::UnboundedSender<ParticipantMessage>,
135}
136
137/// Individual transaction
138pub struct Transaction {
139    /// Transaction ID
140    pub id: TransactionId,
141
142    /// Current state
143    pub state: Arc<RwLock<TransactionState>>,
144
145    /// Operations in the transaction
146    pub operations: Arc<RwLock<Vec<TransactionOp>>>,
147
148    /// Participating shards
149    pub participants: Arc<RwLock<HashSet<ShardId>>>,
150
151    /// Participant votes
152    pub votes: Arc<DashMap<ShardId, Vote>>,
153
154    /// Start time
155    pub start_time: Instant,
156
157    /// Completion channel
158    pub completion_tx: Option<oneshot::Sender<Result<()>>>,
159
160    /// Read-only flag
161    pub is_read_only: bool,
162
163    /// Single-shard flag
164    pub is_single_shard: bool,
165}
166
167/// Participant vote
168#[derive(Debug, Clone, Copy)]
169pub enum Vote {
170    /// Participant votes to commit
171    Yes,
172    /// Participant votes to abort
173    No(AbortReason),
174}
175
176/// Abort reason
177#[derive(Debug, Clone, Copy)]
178pub enum AbortReason {
179    /// Lock conflict
180    LockConflict,
181    /// Validation failure
182    ValidationFailure,
183    /// Timeout
184    Timeout,
185    /// Node failure
186    NodeFailure,
187    /// Other error
188    Other,
189}
190
191/// Transaction log for recovery
192pub struct TransactionLog {
193    /// Log entries
194    entries: Vec<LogEntry>,
195
196    /// Persistent storage path
197    log_path: Option<String>,
198}
199
200/// Log entry
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct LogEntry {
203    pub timestamp: SystemTime,
204    pub transaction_id: TransactionId,
205    pub event: LogEvent,
206}
207
208/// Log event type
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub enum LogEvent {
211    /// Transaction started
212    Started,
213    /// Prepare phase started
214    PrepareStarted { participants: Vec<ShardId> },
215    /// Participant voted
216    ParticipantVoted { shard: ShardId, vote: bool },
217    /// Global decision made
218    GlobalDecision { commit: bool },
219    /// Transaction completed
220    Completed,
221}
222
223/// Message to participants
224#[derive(Debug)]
225pub enum ParticipantMessage {
226    /// Prepare to commit
227    Prepare {
228        transaction_id: TransactionId,
229        operations: Vec<TransactionOp>,
230        reply_tx: oneshot::Sender<Vote>,
231    },
232
233    /// Commit transaction
234    Commit { transaction_id: TransactionId },
235
236    /// Abort transaction
237    Abort { transaction_id: TransactionId },
238}
239
240/// Lock manager for deadlock detection
241pub struct LockManager {
242    /// Locks held by transactions
243    transaction_locks: Arc<DashMap<TransactionId, HashSet<LockId>>>,
244
245    /// Lock wait graph for deadlock detection
246    wait_graph: Arc<RwLock<HashMap<TransactionId, HashSet<TransactionId>>>>,
247
248    /// Lock table
249    lock_table: Arc<DashMap<LockId, LockInfo>>,
250}
251
252/// Lock identifier
253#[derive(Debug, Clone, Hash, Eq, PartialEq)]
254pub struct LockId {
255    pub shard_id: ShardId,
256    pub resource: String,
257}
258
259/// Lock information
260#[derive(Debug, Clone)]
261pub struct LockInfo {
262    pub holder: Option<TransactionId>,
263    pub waiters: Vec<TransactionId>,
264    pub lock_type: LockType,
265}
266
267/// Lock type
268#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub enum LockType {
270    /// Shared (read) lock
271    Shared,
272    /// Exclusive (write) lock
273    Exclusive,
274}
275
276impl TransactionCoordinator {
277    /// Create a new transaction coordinator
278    pub fn new(config: TransactionConfig, shard_manager: Arc<ShardManager>) -> Self {
279        let (participant_tx, _participant_rx) = mpsc::unbounded_channel();
280
281        Self {
282            config,
283            transactions: Arc::new(DashMap::new()),
284            shard_manager,
285            transaction_log: Arc::new(RwLock::new(TransactionLog::new())),
286            lock_manager: Arc::new(LockManager::new()),
287            participant_tx,
288        }
289    }
290
291    /// Begin a new transaction
292    pub async fn begin_transaction(&self) -> Result<TransactionId> {
293        let transaction_id = Uuid::new_v4();
294        let (completion_tx, _completion_rx) = oneshot::channel();
295
296        let transaction = Transaction {
297            id: transaction_id,
298            state: Arc::new(RwLock::new(TransactionState::Active)),
299            operations: Arc::new(RwLock::new(Vec::new())),
300            participants: Arc::new(RwLock::new(HashSet::new())),
301            votes: Arc::new(DashMap::new()),
302            start_time: Instant::now(),
303            completion_tx: Some(completion_tx),
304            is_read_only: true,    // Start as read-only, change if write op added
305            is_single_shard: true, // Start as single-shard
306        };
307
308        self.transactions.insert(transaction_id, transaction);
309
310        // Log transaction start
311        self.log_event(LogEntry {
312            timestamp: SystemTime::now(),
313            transaction_id,
314            event: LogEvent::Started,
315        });
316
317        Ok(transaction_id)
318    }
319
320    /// Add operation to transaction
321    pub async fn add_operation(
322        &self,
323        transaction_id: TransactionId,
324        operation: TransactionOp,
325    ) -> Result<()> {
326        let transaction = self
327            .transactions
328            .get(&transaction_id)
329            .ok_or_else(|| anyhow!("Transaction not found"))?;
330
331        // Check if transaction is still active
332        if *transaction.state.read() != TransactionState::Active {
333            return Err(anyhow!("Transaction is not active"));
334        }
335
336        // Determine affected shards
337        let affected_shards = self.get_affected_shards(&operation)?;
338
339        // Update transaction properties
340        {
341            let mut ops = transaction.operations.write();
342            ops.push(operation.clone());
343
344            // Update read-only flag
345            if matches!(
346                operation,
347                TransactionOp::Insert(_) | TransactionOp::Remove(_)
348            ) {
349                let state = transaction.state.write();
350                drop(state); // Release lock before updating atomic
351                             // Can't modify is_read_only directly, would need to refactor
352            }
353
354            // Update participants
355            let mut participants = transaction.participants.write();
356            for shard in affected_shards {
357                participants.insert(shard);
358            }
359
360            // Update single-shard flag
361            if participants.len() > 1 {
362                // Can't modify is_single_shard directly, would need to refactor
363            }
364        }
365
366        Ok(())
367    }
368
369    /// Commit transaction
370    pub async fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
371        let transaction = self
372            .transactions
373            .get(&transaction_id)
374            .ok_or_else(|| anyhow!("Transaction not found"))?;
375
376        // Check timeout
377        if transaction.start_time.elapsed() > self.config.timeout {
378            self.abort_transaction(transaction_id).await?;
379            return Err(anyhow!("Transaction timeout"));
380        }
381
382        // Optimizations
383        if transaction.is_read_only && self.config.enable_read_only_optimization {
384            // Read-only transactions don't need 2PC
385            self.complete_transaction(transaction_id, true).await?;
386            return Ok(());
387        }
388
389        if transaction.is_single_shard && self.config.enable_single_shard_optimization {
390            // Single-shard transactions can skip prepare phase
391            return self.commit_single_shard(transaction_id).await;
392        }
393
394        // Full 2PC protocol
395        self.two_phase_commit(transaction_id).await
396    }
397
398    /// Two-phase commit protocol
399    async fn two_phase_commit(&self, transaction_id: TransactionId) -> Result<()> {
400        // Phase 1: Prepare
401        let prepare_result = self.prepare_phase(transaction_id).await?;
402
403        // Phase 2: Commit or Abort based on votes
404        if prepare_result {
405            self.commit_phase(transaction_id).await
406        } else {
407            self.abort_phase(transaction_id).await
408        }
409    }
410
411    /// Prepare phase of 2PC
412    async fn prepare_phase(&self, transaction_id: TransactionId) -> Result<bool> {
413        let transaction = self
414            .transactions
415            .get(&transaction_id)
416            .ok_or_else(|| anyhow!("Transaction not found"))?;
417
418        // Update state
419        *transaction.state.write() = TransactionState::Preparing;
420
421        let participants = transaction.participants.read().clone();
422        let operations = transaction.operations.read().clone();
423
424        // Log prepare start
425        self.log_event(LogEntry {
426            timestamp: SystemTime::now(),
427            transaction_id,
428            event: LogEvent::PrepareStarted {
429                participants: participants.iter().copied().collect(),
430            },
431        });
432
433        // Send prepare messages to all participants
434        let mut prepare_futures = Vec::new();
435
436        for shard_id in participants {
437            let (reply_tx, _reply_rx) = oneshot::channel();
438
439            let _message = ParticipantMessage::Prepare {
440                transaction_id,
441                operations: self.filter_operations_for_shard(shard_id, &operations),
442                reply_tx,
443            };
444
445            // In real implementation, would send message to actual shard nodes
446            // For now, simulate local processing and send vote directly
447            let vote = self.simulate_participant_vote(shard_id, &operations);
448
449            // Simulate sending the vote through a separate channel for this simulation
450            let (sim_tx, sim_rx) = oneshot::channel();
451            let _ = sim_tx.send(vote);
452
453            prepare_futures
454                .push(async move { sim_rx.await.unwrap_or(Vote::No(AbortReason::NodeFailure)) });
455        }
456
457        // Collect votes (with parallelism if enabled)
458        let votes = if self.config.enable_parallel_prepare {
459            futures::future::join_all(prepare_futures).await
460        } else {
461            let mut votes = Vec::new();
462            for future in prepare_futures {
463                votes.push(future.await);
464            }
465            votes
466        };
467
468        // Process votes
469        let mut all_yes = true;
470        for (i, vote) in votes.iter().enumerate() {
471            let shard_id = *transaction
472                .participants
473                .read()
474                .iter()
475                .nth(i)
476                .expect("participant index should be valid");
477            transaction.votes.insert(shard_id, *vote);
478
479            // Log vote
480            self.log_event(LogEntry {
481                timestamp: SystemTime::now(),
482                transaction_id,
483                event: LogEvent::ParticipantVoted {
484                    shard: shard_id,
485                    vote: matches!(vote, Vote::Yes),
486                },
487            });
488
489            if !matches!(vote, Vote::Yes) {
490                all_yes = false;
491            }
492        }
493
494        // Update state
495        if all_yes {
496            *transaction.state.write() = TransactionState::Prepared;
497        }
498
499        // Log global decision
500        self.log_event(LogEntry {
501            timestamp: SystemTime::now(),
502            transaction_id,
503            event: LogEvent::GlobalDecision { commit: all_yes },
504        });
505
506        Ok(all_yes)
507    }
508
509    /// Commit phase of 2PC
510    async fn commit_phase(&self, transaction_id: TransactionId) -> Result<()> {
511        let transaction = self
512            .transactions
513            .get(&transaction_id)
514            .ok_or_else(|| anyhow!("Transaction not found"))?;
515
516        // Update state
517        *transaction.state.write() = TransactionState::Committing;
518
519        let participants = transaction.participants.read().clone();
520
521        // Send commit messages to all participants
522        for shard_id in participants {
523            let _message = ParticipantMessage::Commit { transaction_id };
524            // In real implementation, would send to actual shard nodes
525            self.simulate_participant_commit(shard_id, transaction_id)?;
526        }
527
528        // Complete transaction
529        self.complete_transaction(transaction_id, true).await
530    }
531
532    /// Abort phase of 2PC
533    async fn abort_phase(&self, transaction_id: TransactionId) -> Result<()> {
534        let transaction = self
535            .transactions
536            .get(&transaction_id)
537            .ok_or_else(|| anyhow!("Transaction not found"))?;
538
539        let participants = transaction.participants.read().clone();
540
541        // Send abort messages to all participants
542        for shard_id in participants {
543            let _message = ParticipantMessage::Abort { transaction_id };
544            // In real implementation, would send to actual shard nodes
545            self.simulate_participant_abort(shard_id, transaction_id)?;
546        }
547
548        // Complete transaction
549        self.complete_transaction(transaction_id, false).await
550    }
551
552    /// Abort a transaction
553    pub async fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
554        self.abort_phase(transaction_id).await
555    }
556
557    /// Complete a transaction
558    async fn complete_transaction(
559        &self,
560        transaction_id: TransactionId,
561        committed: bool,
562    ) -> Result<()> {
563        if let Some((_, transaction)) = self.transactions.remove(&transaction_id) {
564            // Update state
565            *transaction.state.write() = if committed {
566                TransactionState::Committed
567            } else {
568                TransactionState::Aborted
569            };
570
571            // Log completion
572            self.log_event(LogEntry {
573                timestamp: SystemTime::now(),
574                transaction_id,
575                event: LogEvent::Completed,
576            });
577
578            // Notify completion
579            if let Some(tx) = transaction.completion_tx {
580                let _ = tx.send(Ok(()));
581            }
582
583            // Release locks
584            self.lock_manager.release_transaction_locks(transaction_id);
585        }
586
587        Ok(())
588    }
589
590    /// Commit single-shard transaction (optimization)
591    async fn commit_single_shard(&self, transaction_id: TransactionId) -> Result<()> {
592        let transaction = self
593            .transactions
594            .get(&transaction_id)
595            .ok_or_else(|| anyhow!("Transaction not found"))?;
596
597        let shard_id = *transaction
598            .participants
599            .read()
600            .iter()
601            .next()
602            .ok_or_else(|| anyhow!("No participants"))?;
603
604        // Direct commit without prepare phase
605        let _message = ParticipantMessage::Commit { transaction_id };
606        self.simulate_participant_commit(shard_id, transaction_id)?;
607
608        self.complete_transaction(transaction_id, true).await
609    }
610
611    /// Get affected shards for an operation
612    fn get_affected_shards(&self, operation: &TransactionOp) -> Result<Vec<ShardId>> {
613        match operation {
614            TransactionOp::Insert(triple) | TransactionOp::Remove(triple) => {
615                let t = self.deserialize_triple(triple)?;
616                Ok(vec![self.shard_manager.get_shard_for_triple(&t)])
617            }
618            TransactionOp::Read(_query) => {
619                // For reads, might need multiple shards
620                // Simplified: return all shards for now
621                Ok((0..16).collect()) // Assuming 16 shards
622            }
623        }
624    }
625
626    /// Filter operations for a specific shard
627    fn filter_operations_for_shard(
628        &self,
629        shard_id: ShardId,
630        operations: &[TransactionOp],
631    ) -> Vec<TransactionOp> {
632        operations
633            .iter()
634            .filter(|op| match self.get_affected_shards(op) {
635                Ok(shards) => shards.contains(&shard_id),
636                Err(_) => false,
637            })
638            .cloned()
639            .collect()
640    }
641
642    /// Simulate participant vote (for testing)
643    fn simulate_participant_vote(&self, _shard_id: ShardId, _operations: &[TransactionOp]) -> Vote {
644        // Simulate 95% success rate
645        if {
646            let mut rng = Random::default();
647            rng.random::<f32>()
648        } < 0.95
649        {
650            Vote::Yes
651        } else {
652            Vote::No(AbortReason::LockConflict)
653        }
654    }
655
656    /// Simulate participant commit
657    fn simulate_participant_commit(
658        &self,
659        _shard_id: ShardId,
660        _transaction_id: TransactionId,
661    ) -> Result<()> {
662        // In real implementation, would apply operations to shard
663        Ok(())
664    }
665
666    /// Simulate participant abort
667    fn simulate_participant_abort(
668        &self,
669        _shard_id: ShardId,
670        _transaction_id: TransactionId,
671    ) -> Result<()> {
672        // In real implementation, would rollback operations on shard
673        Ok(())
674    }
675
676    /// Log event
677    fn log_event(&self, entry: LogEntry) {
678        self.transaction_log.write().add_entry(entry);
679    }
680
681    /// Deserialize triple
682    fn deserialize_triple(&self, st: &SerializableTriple) -> Result<Triple> {
683        let subject = NamedNode::new(&st.subject)?;
684        let predicate = NamedNode::new(&st.predicate)?;
685
686        let object = match &st.object_type {
687            ObjectType::NamedNode => crate::model::Object::NamedNode(NamedNode::new(&st.object)?),
688            ObjectType::BlankNode => crate::model::Object::BlankNode(BlankNode::new(&st.object)?),
689            ObjectType::Literal { datatype, language } => {
690                if let Some(lang) = language {
691                    crate::model::Object::Literal(Literal::new_language_tagged_literal(
692                        &st.object, lang,
693                    )?)
694                } else if let Some(dt) = datatype {
695                    crate::model::Object::Literal(Literal::new_typed(
696                        &st.object,
697                        NamedNode::new(dt)?,
698                    ))
699                } else {
700                    crate::model::Object::Literal(Literal::new(&st.object))
701                }
702            }
703        };
704
705        Ok(Triple::new(subject, predicate, object))
706    }
707}
708
709impl Default for TransactionLog {
710    fn default() -> Self {
711        Self::new()
712    }
713}
714
715impl TransactionLog {
716    /// Create a new transaction log
717    pub fn new() -> Self {
718        Self {
719            entries: Vec::new(),
720            log_path: None,
721        }
722    }
723
724    /// Add log entry
725    pub fn add_entry(&mut self, entry: LogEntry) {
726        self.entries.push(entry);
727
728        // In real implementation, would persist to disk
729        if let Some(_path) = &self.log_path {
730            // Write to persistent storage
731        }
732    }
733
734    /// Get entries for a transaction
735    pub fn get_transaction_entries(&self, transaction_id: TransactionId) -> Vec<&LogEntry> {
736        self.entries
737            .iter()
738            .filter(|e| e.transaction_id == transaction_id)
739            .collect()
740    }
741}
742
743impl Default for LockManager {
744    fn default() -> Self {
745        Self::new()
746    }
747}
748
749impl LockManager {
750    /// Create a new lock manager
751    pub fn new() -> Self {
752        Self {
753            transaction_locks: Arc::new(DashMap::new()),
754            wait_graph: Arc::new(RwLock::new(HashMap::new())),
755            lock_table: Arc::new(DashMap::new()),
756        }
757    }
758
759    /// Acquire lock
760    pub fn acquire_lock(
761        &self,
762        transaction_id: TransactionId,
763        lock_id: LockId,
764        lock_type: LockType,
765    ) -> Result<()> {
766        let mut lock_info = self.lock_table.entry(lock_id.clone()).or_insert(LockInfo {
767            holder: None,
768            waiters: Vec::new(),
769            lock_type: LockType::Shared,
770        });
771
772        // Check if lock can be granted
773        let can_grant = match (&lock_info.holder, lock_type) {
774            (None, _) => true,
775            (Some(holder), LockType::Shared) if *holder == transaction_id => true,
776            (Some(_), LockType::Shared) if lock_info.lock_type == LockType::Shared => true,
777            _ => false,
778        };
779
780        if can_grant {
781            lock_info.holder = Some(transaction_id);
782            lock_info.lock_type = lock_type;
783
784            // Record lock
785            self.transaction_locks
786                .entry(transaction_id)
787                .or_default()
788                .insert(lock_id);
789
790            Ok(())
791        } else {
792            // Add to waiters
793            lock_info.waiters.push(transaction_id);
794
795            // Update wait graph for deadlock detection
796            if let Some(holder) = lock_info.holder {
797                let mut wait_graph = self.wait_graph.write();
798                wait_graph.entry(transaction_id).or_default().insert(holder);
799            }
800
801            Err(anyhow!("Lock not available"))
802        }
803    }
804
805    /// Release all locks held by a transaction
806    pub fn release_transaction_locks(&self, transaction_id: TransactionId) {
807        if let Some((_, locks)) = self.transaction_locks.remove(&transaction_id) {
808            for lock_id in locks {
809                self.release_lock(transaction_id, &lock_id);
810            }
811        }
812
813        // Clean up wait graph
814        let mut wait_graph = self.wait_graph.write();
815        wait_graph.remove(&transaction_id);
816        for waiters in wait_graph.values_mut() {
817            waiters.remove(&transaction_id);
818        }
819    }
820
821    /// Release a specific lock
822    fn release_lock(&self, transaction_id: TransactionId, lock_id: &LockId) {
823        if let Some(mut lock_info) = self.lock_table.get_mut(lock_id) {
824            if lock_info.holder == Some(transaction_id) {
825                // Find next waiter
826                if let Some(next_holder) = lock_info.waiters.first().copied() {
827                    lock_info.holder = Some(next_holder);
828                    lock_info.waiters.remove(0);
829
830                    // Update wait graph
831                    let mut wait_graph = self.wait_graph.write();
832                    if let Some(waiting_on) = wait_graph.get_mut(&next_holder) {
833                        waiting_on.remove(&transaction_id);
834                    }
835                } else {
836                    lock_info.holder = None;
837                }
838            }
839        }
840    }
841
842    /// Detect deadlocks using cycle detection in wait graph
843    pub fn detect_deadlocks(&self) -> Vec<Vec<TransactionId>> {
844        let wait_graph = self.wait_graph.read();
845        let mut cycles = Vec::new();
846        let mut visited = HashSet::new();
847        let mut rec_stack = HashSet::new();
848
849        for &node in wait_graph.keys() {
850            if !visited.contains(&node) {
851                let mut path = Vec::new();
852                if Self::detect_cycle_dfs(
853                    &wait_graph,
854                    node,
855                    &mut visited,
856                    &mut rec_stack,
857                    &mut path,
858                    &mut cycles,
859                ) {
860                    // Cycle detected
861                }
862            }
863        }
864
865        cycles
866    }
867
868    /// DFS for cycle detection
869    fn detect_cycle_dfs(
870        graph: &HashMap<TransactionId, HashSet<TransactionId>>,
871        node: TransactionId,
872        visited: &mut HashSet<TransactionId>,
873        rec_stack: &mut HashSet<TransactionId>,
874        path: &mut Vec<TransactionId>,
875        cycles: &mut Vec<Vec<TransactionId>>,
876    ) -> bool {
877        visited.insert(node);
878        rec_stack.insert(node);
879        path.push(node);
880
881        if let Some(neighbors) = graph.get(&node) {
882            for &neighbor in neighbors {
883                if !visited.contains(&neighbor) {
884                    if Self::detect_cycle_dfs(graph, neighbor, visited, rec_stack, path, cycles) {
885                        return true;
886                    }
887                } else if rec_stack.contains(&neighbor) {
888                    // Found cycle
889                    let cycle_start = path
890                        .iter()
891                        .position(|&n| n == neighbor)
892                        .expect("neighbor should exist in path when cycle detected");
893                    cycles.push(path[cycle_start..].to_vec());
894                    return true;
895                }
896            }
897        }
898
899        path.pop();
900        rec_stack.remove(&node);
901        false
902    }
903}
904
905// Import futures for async operations
906use futures;
907
908#[cfg(test)]
909mod tests {
910    use super::*;
911    use crate::distributed::sharding::ShardingStrategy;
912
913    #[tokio::test]
914    #[ignore] // Extremely slow test - over 14 minutes
915    async fn test_basic_transaction() {
916        use tokio::time::{timeout, Duration};
917
918        let config = TransactionConfig {
919            timeout: Duration::from_secs(5),
920            ..Default::default()
921        };
922        let shard_config = crate::distributed::sharding::ShardingConfig::default();
923        let shard_manager = Arc::new(ShardManager::new(shard_config, ShardingStrategy::Hash));
924        let coordinator = TransactionCoordinator::new(config, shard_manager);
925
926        // Begin transaction with timeout
927        let tx_id = timeout(Duration::from_secs(2), coordinator.begin_transaction())
928            .await
929            .expect("begin_transaction timed out")
930            .expect("begin_transaction failed");
931
932        // Add operations with timeout
933        let op = TransactionOp::Insert(SerializableTriple {
934            subject: "http://example.org/s".to_string(),
935            predicate: "http://example.org/p".to_string(),
936            object: "value".to_string(),
937            object_type: ObjectType::Literal {
938                datatype: None,
939                language: None,
940            },
941        });
942
943        timeout(Duration::from_secs(2), coordinator.add_operation(tx_id, op))
944            .await
945            .expect("add_operation timed out")
946            .expect("add_operation failed");
947
948        // Check participants were added
949        let transaction = coordinator
950            .transactions
951            .get(&tx_id)
952            .expect("Transaction should exist");
953        {
954            let participants = transaction.participants.read();
955            assert!(
956                !participants.is_empty(),
957                "Transaction should have participants after adding operation"
958            );
959            println!("Participants: {:?}", *participants);
960        } // Lock is dropped here before await
961
962        // Commit with timeout (will use single-shard optimization)
963        timeout(
964            Duration::from_secs(2),
965            coordinator.commit_transaction(tx_id),
966        )
967        .await
968        .expect("commit_transaction timed out")
969        .expect("commit_transaction failed");
970    }
971
972    #[test]
973    fn test_lock_manager() {
974        let lock_manager = LockManager::new();
975        let tx1 = Uuid::new_v4();
976        let tx2 = Uuid::new_v4();
977
978        let lock1 = LockId {
979            shard_id: 0,
980            resource: "resource1".to_string(),
981        };
982
983        // TX1 acquires exclusive lock
984        assert!(lock_manager
985            .acquire_lock(tx1, lock1.clone(), LockType::Exclusive)
986            .is_ok());
987
988        // TX2 tries to acquire, should fail
989        assert!(lock_manager
990            .acquire_lock(tx2, lock1.clone(), LockType::Shared)
991            .is_err());
992
993        // Release TX1's locks
994        lock_manager.release_transaction_locks(tx1);
995
996        // Now TX2 should be able to acquire
997        assert!(lock_manager
998            .acquire_lock(tx2, lock1, LockType::Shared)
999            .is_ok());
1000    }
1001
1002    #[test]
1003    fn test_deadlock_detection() {
1004        let lock_manager = LockManager::new();
1005
1006        // Create circular wait: TX1 -> TX2 -> TX3 -> TX1
1007        let mut wait_graph = lock_manager.wait_graph.write();
1008        let tx1 = Uuid::new_v4();
1009        let tx2 = Uuid::new_v4();
1010        let tx3 = Uuid::new_v4();
1011
1012        wait_graph.insert(tx1, vec![tx2].into_iter().collect());
1013        wait_graph.insert(tx2, vec![tx3].into_iter().collect());
1014        wait_graph.insert(tx3, vec![tx1].into_iter().collect());
1015        drop(wait_graph);
1016
1017        let cycles = lock_manager.detect_deadlocks();
1018        assert_eq!(cycles.len(), 1);
1019        assert_eq!(cycles[0].len(), 3);
1020    }
1021}