1use crate::distributed::sharding::{ShardId, ShardManager};
7use crate::model::{BlankNode, Literal, NamedNode, Triple};
8use anyhow::{anyhow, Result};
9use dashmap::DashMap;
10use parking_lot::RwLock;
11use scirs2_core::random::{Random, RngExt};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::sync::{mpsc, oneshot};
17use uuid::Uuid;
18
19pub type TransactionId = Uuid;
21
22pub type NodeId = u64;
24
25#[derive(Debug, Clone)]
27pub struct TransactionConfig {
28 pub timeout: Duration,
30
31 pub enable_read_only_optimization: bool,
33
34 pub enable_single_shard_optimization: bool,
36
37 pub max_retries: usize,
39
40 pub enable_parallel_prepare: bool,
42
43 pub deadlock_timeout: Duration,
45}
46
47impl Default for TransactionConfig {
48 fn default() -> Self {
49 Self {
50 timeout: Duration::from_secs(30),
51 enable_read_only_optimization: true,
52 enable_single_shard_optimization: true,
53 max_retries: 3,
54 enable_parallel_prepare: true,
55 deadlock_timeout: Duration::from_secs(10),
56 }
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum TransactionState {
63 Active,
65 Preparing,
67 Prepared,
69 Committing,
71 Committed,
73 Aborted,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TransactionOp {
80 Insert(SerializableTriple),
82 Remove(SerializableTriple),
84 Read(ReadQuery),
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct SerializableTriple {
91 pub subject: String,
92 pub predicate: String,
93 pub object: String,
94 pub object_type: ObjectType,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub enum ObjectType {
99 NamedNode,
100 BlankNode,
101 Literal {
102 datatype: Option<String>,
103 language: Option<String>,
104 },
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ReadQuery {
110 pub subject: Option<String>,
111 pub predicate: Option<String>,
112 pub object: Option<String>,
113}
114
115#[allow(dead_code)]
117pub struct TransactionCoordinator {
118 config: TransactionConfig,
120
121 transactions: Arc<DashMap<TransactionId, Transaction>>,
123
124 shard_manager: Arc<ShardManager>,
126
127 transaction_log: Arc<RwLock<TransactionLog>>,
129
130 lock_manager: Arc<LockManager>,
132
133 participant_tx: mpsc::UnboundedSender<ParticipantMessage>,
135}
136
137pub struct Transaction {
139 pub id: TransactionId,
141
142 pub state: Arc<RwLock<TransactionState>>,
144
145 pub operations: Arc<RwLock<Vec<TransactionOp>>>,
147
148 pub participants: Arc<RwLock<HashSet<ShardId>>>,
150
151 pub votes: Arc<DashMap<ShardId, Vote>>,
153
154 pub start_time: Instant,
156
157 pub completion_tx: Option<oneshot::Sender<Result<()>>>,
159
160 pub is_read_only: bool,
162
163 pub is_single_shard: bool,
165}
166
167#[derive(Debug, Clone, Copy)]
169pub enum Vote {
170 Yes,
172 No(AbortReason),
174}
175
176#[derive(Debug, Clone, Copy)]
178pub enum AbortReason {
179 LockConflict,
181 ValidationFailure,
183 Timeout,
185 NodeFailure,
187 Other,
189}
190
191pub struct TransactionLog {
193 entries: Vec<LogEntry>,
195
196 log_path: Option<String>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct LogEntry {
203 pub timestamp: SystemTime,
204 pub transaction_id: TransactionId,
205 pub event: LogEvent,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub enum LogEvent {
211 Started,
213 PrepareStarted { participants: Vec<ShardId> },
215 ParticipantVoted { shard: ShardId, vote: bool },
217 GlobalDecision { commit: bool },
219 Completed,
221}
222
223#[derive(Debug)]
225pub enum ParticipantMessage {
226 Prepare {
228 transaction_id: TransactionId,
229 operations: Vec<TransactionOp>,
230 reply_tx: oneshot::Sender<Vote>,
231 },
232
233 Commit { transaction_id: TransactionId },
235
236 Abort { transaction_id: TransactionId },
238}
239
240pub struct LockManager {
242 transaction_locks: Arc<DashMap<TransactionId, HashSet<LockId>>>,
244
245 wait_graph: Arc<RwLock<HashMap<TransactionId, HashSet<TransactionId>>>>,
247
248 lock_table: Arc<DashMap<LockId, LockInfo>>,
250}
251
252#[derive(Debug, Clone, Hash, Eq, PartialEq)]
254pub struct LockId {
255 pub shard_id: ShardId,
256 pub resource: String,
257}
258
259#[derive(Debug, Clone)]
261pub struct LockInfo {
262 pub holder: Option<TransactionId>,
263 pub waiters: Vec<TransactionId>,
264 pub lock_type: LockType,
265}
266
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub enum LockType {
270 Shared,
272 Exclusive,
274}
275
276impl TransactionCoordinator {
277 pub fn new(config: TransactionConfig, shard_manager: Arc<ShardManager>) -> Self {
279 let (participant_tx, _participant_rx) = mpsc::unbounded_channel();
280
281 Self {
282 config,
283 transactions: Arc::new(DashMap::new()),
284 shard_manager,
285 transaction_log: Arc::new(RwLock::new(TransactionLog::new())),
286 lock_manager: Arc::new(LockManager::new()),
287 participant_tx,
288 }
289 }
290
291 pub async fn begin_transaction(&self) -> Result<TransactionId> {
293 let transaction_id = Uuid::new_v4();
294 let (completion_tx, _completion_rx) = oneshot::channel();
295
296 let transaction = Transaction {
297 id: transaction_id,
298 state: Arc::new(RwLock::new(TransactionState::Active)),
299 operations: Arc::new(RwLock::new(Vec::new())),
300 participants: Arc::new(RwLock::new(HashSet::new())),
301 votes: Arc::new(DashMap::new()),
302 start_time: Instant::now(),
303 completion_tx: Some(completion_tx),
304 is_read_only: true, is_single_shard: true, };
307
308 self.transactions.insert(transaction_id, transaction);
309
310 self.log_event(LogEntry {
312 timestamp: SystemTime::now(),
313 transaction_id,
314 event: LogEvent::Started,
315 });
316
317 Ok(transaction_id)
318 }
319
320 pub async fn add_operation(
322 &self,
323 transaction_id: TransactionId,
324 operation: TransactionOp,
325 ) -> Result<()> {
326 let transaction = self
327 .transactions
328 .get(&transaction_id)
329 .ok_or_else(|| anyhow!("Transaction not found"))?;
330
331 if *transaction.state.read() != TransactionState::Active {
333 return Err(anyhow!("Transaction is not active"));
334 }
335
336 let affected_shards = self.get_affected_shards(&operation)?;
338
339 {
341 let mut ops = transaction.operations.write();
342 ops.push(operation.clone());
343
344 if matches!(
346 operation,
347 TransactionOp::Insert(_) | TransactionOp::Remove(_)
348 ) {
349 let state = transaction.state.write();
350 drop(state); }
353
354 let mut participants = transaction.participants.write();
356 for shard in affected_shards {
357 participants.insert(shard);
358 }
359
360 if participants.len() > 1 {
362 }
364 }
365
366 Ok(())
367 }
368
369 pub async fn commit_transaction(&self, transaction_id: TransactionId) -> Result<()> {
371 let transaction = self
372 .transactions
373 .get(&transaction_id)
374 .ok_or_else(|| anyhow!("Transaction not found"))?;
375
376 if transaction.start_time.elapsed() > self.config.timeout {
378 self.abort_transaction(transaction_id).await?;
379 return Err(anyhow!("Transaction timeout"));
380 }
381
382 if transaction.is_read_only && self.config.enable_read_only_optimization {
384 self.complete_transaction(transaction_id, true).await?;
386 return Ok(());
387 }
388
389 if transaction.is_single_shard && self.config.enable_single_shard_optimization {
390 return self.commit_single_shard(transaction_id).await;
392 }
393
394 self.two_phase_commit(transaction_id).await
396 }
397
398 async fn two_phase_commit(&self, transaction_id: TransactionId) -> Result<()> {
400 let prepare_result = self.prepare_phase(transaction_id).await?;
402
403 if prepare_result {
405 self.commit_phase(transaction_id).await
406 } else {
407 self.abort_phase(transaction_id).await
408 }
409 }
410
411 async fn prepare_phase(&self, transaction_id: TransactionId) -> Result<bool> {
413 let transaction = self
414 .transactions
415 .get(&transaction_id)
416 .ok_or_else(|| anyhow!("Transaction not found"))?;
417
418 *transaction.state.write() = TransactionState::Preparing;
420
421 let participants = transaction.participants.read().clone();
422 let operations = transaction.operations.read().clone();
423
424 self.log_event(LogEntry {
426 timestamp: SystemTime::now(),
427 transaction_id,
428 event: LogEvent::PrepareStarted {
429 participants: participants.iter().copied().collect(),
430 },
431 });
432
433 let mut prepare_futures = Vec::new();
435
436 for shard_id in participants {
437 let (reply_tx, _reply_rx) = oneshot::channel();
438
439 let _message = ParticipantMessage::Prepare {
440 transaction_id,
441 operations: self.filter_operations_for_shard(shard_id, &operations),
442 reply_tx,
443 };
444
445 let vote = self.simulate_participant_vote(shard_id, &operations);
448
449 let (sim_tx, sim_rx) = oneshot::channel();
451 let _ = sim_tx.send(vote);
452
453 prepare_futures
454 .push(async move { sim_rx.await.unwrap_or(Vote::No(AbortReason::NodeFailure)) });
455 }
456
457 let votes = if self.config.enable_parallel_prepare {
459 futures::future::join_all(prepare_futures).await
460 } else {
461 let mut votes = Vec::new();
462 for future in prepare_futures {
463 votes.push(future.await);
464 }
465 votes
466 };
467
468 let mut all_yes = true;
470 for (i, vote) in votes.iter().enumerate() {
471 let shard_id = *transaction
472 .participants
473 .read()
474 .iter()
475 .nth(i)
476 .expect("participant index should be valid");
477 transaction.votes.insert(shard_id, *vote);
478
479 self.log_event(LogEntry {
481 timestamp: SystemTime::now(),
482 transaction_id,
483 event: LogEvent::ParticipantVoted {
484 shard: shard_id,
485 vote: matches!(vote, Vote::Yes),
486 },
487 });
488
489 if !matches!(vote, Vote::Yes) {
490 all_yes = false;
491 }
492 }
493
494 if all_yes {
496 *transaction.state.write() = TransactionState::Prepared;
497 }
498
499 self.log_event(LogEntry {
501 timestamp: SystemTime::now(),
502 transaction_id,
503 event: LogEvent::GlobalDecision { commit: all_yes },
504 });
505
506 Ok(all_yes)
507 }
508
509 async fn commit_phase(&self, transaction_id: TransactionId) -> Result<()> {
511 let transaction = self
512 .transactions
513 .get(&transaction_id)
514 .ok_or_else(|| anyhow!("Transaction not found"))?;
515
516 *transaction.state.write() = TransactionState::Committing;
518
519 let participants = transaction.participants.read().clone();
520
521 for shard_id in participants {
523 let _message = ParticipantMessage::Commit { transaction_id };
524 self.simulate_participant_commit(shard_id, transaction_id)?;
526 }
527
528 self.complete_transaction(transaction_id, true).await
530 }
531
532 async fn abort_phase(&self, transaction_id: TransactionId) -> Result<()> {
534 let transaction = self
535 .transactions
536 .get(&transaction_id)
537 .ok_or_else(|| anyhow!("Transaction not found"))?;
538
539 let participants = transaction.participants.read().clone();
540
541 for shard_id in participants {
543 let _message = ParticipantMessage::Abort { transaction_id };
544 self.simulate_participant_abort(shard_id, transaction_id)?;
546 }
547
548 self.complete_transaction(transaction_id, false).await
550 }
551
552 pub async fn abort_transaction(&self, transaction_id: TransactionId) -> Result<()> {
554 self.abort_phase(transaction_id).await
555 }
556
557 async fn complete_transaction(
559 &self,
560 transaction_id: TransactionId,
561 committed: bool,
562 ) -> Result<()> {
563 if let Some((_, transaction)) = self.transactions.remove(&transaction_id) {
564 *transaction.state.write() = if committed {
566 TransactionState::Committed
567 } else {
568 TransactionState::Aborted
569 };
570
571 self.log_event(LogEntry {
573 timestamp: SystemTime::now(),
574 transaction_id,
575 event: LogEvent::Completed,
576 });
577
578 if let Some(tx) = transaction.completion_tx {
580 let _ = tx.send(Ok(()));
581 }
582
583 self.lock_manager.release_transaction_locks(transaction_id);
585 }
586
587 Ok(())
588 }
589
590 async fn commit_single_shard(&self, transaction_id: TransactionId) -> Result<()> {
592 let transaction = self
593 .transactions
594 .get(&transaction_id)
595 .ok_or_else(|| anyhow!("Transaction not found"))?;
596
597 let shard_id = *transaction
598 .participants
599 .read()
600 .iter()
601 .next()
602 .ok_or_else(|| anyhow!("No participants"))?;
603
604 let _message = ParticipantMessage::Commit { transaction_id };
606 self.simulate_participant_commit(shard_id, transaction_id)?;
607
608 self.complete_transaction(transaction_id, true).await
609 }
610
611 fn get_affected_shards(&self, operation: &TransactionOp) -> Result<Vec<ShardId>> {
613 match operation {
614 TransactionOp::Insert(triple) | TransactionOp::Remove(triple) => {
615 let t = self.deserialize_triple(triple)?;
616 Ok(vec![self.shard_manager.get_shard_for_triple(&t)])
617 }
618 TransactionOp::Read(_query) => {
619 Ok((0..16).collect()) }
623 }
624 }
625
626 fn filter_operations_for_shard(
628 &self,
629 shard_id: ShardId,
630 operations: &[TransactionOp],
631 ) -> Vec<TransactionOp> {
632 operations
633 .iter()
634 .filter(|op| match self.get_affected_shards(op) {
635 Ok(shards) => shards.contains(&shard_id),
636 Err(_) => false,
637 })
638 .cloned()
639 .collect()
640 }
641
642 fn simulate_participant_vote(&self, _shard_id: ShardId, _operations: &[TransactionOp]) -> Vote {
644 if {
646 let mut rng = Random::default();
647 rng.random::<f32>()
648 } < 0.95
649 {
650 Vote::Yes
651 } else {
652 Vote::No(AbortReason::LockConflict)
653 }
654 }
655
656 fn simulate_participant_commit(
658 &self,
659 _shard_id: ShardId,
660 _transaction_id: TransactionId,
661 ) -> Result<()> {
662 Ok(())
664 }
665
666 fn simulate_participant_abort(
668 &self,
669 _shard_id: ShardId,
670 _transaction_id: TransactionId,
671 ) -> Result<()> {
672 Ok(())
674 }
675
676 fn log_event(&self, entry: LogEntry) {
678 self.transaction_log.write().add_entry(entry);
679 }
680
681 fn deserialize_triple(&self, st: &SerializableTriple) -> Result<Triple> {
683 let subject = NamedNode::new(&st.subject)?;
684 let predicate = NamedNode::new(&st.predicate)?;
685
686 let object = match &st.object_type {
687 ObjectType::NamedNode => crate::model::Object::NamedNode(NamedNode::new(&st.object)?),
688 ObjectType::BlankNode => crate::model::Object::BlankNode(BlankNode::new(&st.object)?),
689 ObjectType::Literal { datatype, language } => {
690 if let Some(lang) = language {
691 crate::model::Object::Literal(Literal::new_language_tagged_literal(
692 &st.object, lang,
693 )?)
694 } else if let Some(dt) = datatype {
695 crate::model::Object::Literal(Literal::new_typed(
696 &st.object,
697 NamedNode::new(dt)?,
698 ))
699 } else {
700 crate::model::Object::Literal(Literal::new(&st.object))
701 }
702 }
703 };
704
705 Ok(Triple::new(subject, predicate, object))
706 }
707}
708
709impl Default for TransactionLog {
710 fn default() -> Self {
711 Self::new()
712 }
713}
714
715impl TransactionLog {
716 pub fn new() -> Self {
718 Self {
719 entries: Vec::new(),
720 log_path: None,
721 }
722 }
723
724 pub fn add_entry(&mut self, entry: LogEntry) {
726 self.entries.push(entry);
727
728 if let Some(_path) = &self.log_path {
730 }
732 }
733
734 pub fn get_transaction_entries(&self, transaction_id: TransactionId) -> Vec<&LogEntry> {
736 self.entries
737 .iter()
738 .filter(|e| e.transaction_id == transaction_id)
739 .collect()
740 }
741}
742
743impl Default for LockManager {
744 fn default() -> Self {
745 Self::new()
746 }
747}
748
749impl LockManager {
750 pub fn new() -> Self {
752 Self {
753 transaction_locks: Arc::new(DashMap::new()),
754 wait_graph: Arc::new(RwLock::new(HashMap::new())),
755 lock_table: Arc::new(DashMap::new()),
756 }
757 }
758
759 pub fn acquire_lock(
761 &self,
762 transaction_id: TransactionId,
763 lock_id: LockId,
764 lock_type: LockType,
765 ) -> Result<()> {
766 let mut lock_info = self.lock_table.entry(lock_id.clone()).or_insert(LockInfo {
767 holder: None,
768 waiters: Vec::new(),
769 lock_type: LockType::Shared,
770 });
771
772 let can_grant = match (&lock_info.holder, lock_type) {
774 (None, _) => true,
775 (Some(holder), LockType::Shared) if *holder == transaction_id => true,
776 (Some(_), LockType::Shared) if lock_info.lock_type == LockType::Shared => true,
777 _ => false,
778 };
779
780 if can_grant {
781 lock_info.holder = Some(transaction_id);
782 lock_info.lock_type = lock_type;
783
784 self.transaction_locks
786 .entry(transaction_id)
787 .or_default()
788 .insert(lock_id);
789
790 Ok(())
791 } else {
792 lock_info.waiters.push(transaction_id);
794
795 if let Some(holder) = lock_info.holder {
797 let mut wait_graph = self.wait_graph.write();
798 wait_graph.entry(transaction_id).or_default().insert(holder);
799 }
800
801 Err(anyhow!("Lock not available"))
802 }
803 }
804
805 pub fn release_transaction_locks(&self, transaction_id: TransactionId) {
807 if let Some((_, locks)) = self.transaction_locks.remove(&transaction_id) {
808 for lock_id in locks {
809 self.release_lock(transaction_id, &lock_id);
810 }
811 }
812
813 let mut wait_graph = self.wait_graph.write();
815 wait_graph.remove(&transaction_id);
816 for waiters in wait_graph.values_mut() {
817 waiters.remove(&transaction_id);
818 }
819 }
820
821 fn release_lock(&self, transaction_id: TransactionId, lock_id: &LockId) {
823 if let Some(mut lock_info) = self.lock_table.get_mut(lock_id) {
824 if lock_info.holder == Some(transaction_id) {
825 if let Some(next_holder) = lock_info.waiters.first().copied() {
827 lock_info.holder = Some(next_holder);
828 lock_info.waiters.remove(0);
829
830 let mut wait_graph = self.wait_graph.write();
832 if let Some(waiting_on) = wait_graph.get_mut(&next_holder) {
833 waiting_on.remove(&transaction_id);
834 }
835 } else {
836 lock_info.holder = None;
837 }
838 }
839 }
840 }
841
842 pub fn detect_deadlocks(&self) -> Vec<Vec<TransactionId>> {
844 let wait_graph = self.wait_graph.read();
845 let mut cycles = Vec::new();
846 let mut visited = HashSet::new();
847 let mut rec_stack = HashSet::new();
848
849 for &node in wait_graph.keys() {
850 if !visited.contains(&node) {
851 let mut path = Vec::new();
852 if Self::detect_cycle_dfs(
853 &wait_graph,
854 node,
855 &mut visited,
856 &mut rec_stack,
857 &mut path,
858 &mut cycles,
859 ) {
860 }
862 }
863 }
864
865 cycles
866 }
867
868 fn detect_cycle_dfs(
870 graph: &HashMap<TransactionId, HashSet<TransactionId>>,
871 node: TransactionId,
872 visited: &mut HashSet<TransactionId>,
873 rec_stack: &mut HashSet<TransactionId>,
874 path: &mut Vec<TransactionId>,
875 cycles: &mut Vec<Vec<TransactionId>>,
876 ) -> bool {
877 visited.insert(node);
878 rec_stack.insert(node);
879 path.push(node);
880
881 if let Some(neighbors) = graph.get(&node) {
882 for &neighbor in neighbors {
883 if !visited.contains(&neighbor) {
884 if Self::detect_cycle_dfs(graph, neighbor, visited, rec_stack, path, cycles) {
885 return true;
886 }
887 } else if rec_stack.contains(&neighbor) {
888 let cycle_start = path
890 .iter()
891 .position(|&n| n == neighbor)
892 .expect("neighbor should exist in path when cycle detected");
893 cycles.push(path[cycle_start..].to_vec());
894 return true;
895 }
896 }
897 }
898
899 path.pop();
900 rec_stack.remove(&node);
901 false
902 }
903}
904
905use futures;
907
908#[cfg(test)]
909mod tests {
910 use super::*;
911 use crate::distributed::sharding::ShardingStrategy;
912
913 #[tokio::test]
914 #[ignore] async fn test_basic_transaction() {
916 use tokio::time::{timeout, Duration};
917
918 let config = TransactionConfig {
919 timeout: Duration::from_secs(5),
920 ..Default::default()
921 };
922 let shard_config = crate::distributed::sharding::ShardingConfig::default();
923 let shard_manager = Arc::new(ShardManager::new(shard_config, ShardingStrategy::Hash));
924 let coordinator = TransactionCoordinator::new(config, shard_manager);
925
926 let tx_id = timeout(Duration::from_secs(2), coordinator.begin_transaction())
928 .await
929 .expect("begin_transaction timed out")
930 .expect("begin_transaction failed");
931
932 let op = TransactionOp::Insert(SerializableTriple {
934 subject: "http://example.org/s".to_string(),
935 predicate: "http://example.org/p".to_string(),
936 object: "value".to_string(),
937 object_type: ObjectType::Literal {
938 datatype: None,
939 language: None,
940 },
941 });
942
943 timeout(Duration::from_secs(2), coordinator.add_operation(tx_id, op))
944 .await
945 .expect("add_operation timed out")
946 .expect("add_operation failed");
947
948 let transaction = coordinator
950 .transactions
951 .get(&tx_id)
952 .expect("Transaction should exist");
953 {
954 let participants = transaction.participants.read();
955 assert!(
956 !participants.is_empty(),
957 "Transaction should have participants after adding operation"
958 );
959 println!("Participants: {:?}", *participants);
960 } timeout(
964 Duration::from_secs(2),
965 coordinator.commit_transaction(tx_id),
966 )
967 .await
968 .expect("commit_transaction timed out")
969 .expect("commit_transaction failed");
970 }
971
972 #[test]
973 fn test_lock_manager() {
974 let lock_manager = LockManager::new();
975 let tx1 = Uuid::new_v4();
976 let tx2 = Uuid::new_v4();
977
978 let lock1 = LockId {
979 shard_id: 0,
980 resource: "resource1".to_string(),
981 };
982
983 assert!(lock_manager
985 .acquire_lock(tx1, lock1.clone(), LockType::Exclusive)
986 .is_ok());
987
988 assert!(lock_manager
990 .acquire_lock(tx2, lock1.clone(), LockType::Shared)
991 .is_err());
992
993 lock_manager.release_transaction_locks(tx1);
995
996 assert!(lock_manager
998 .acquire_lock(tx2, lock1, LockType::Shared)
999 .is_ok());
1000 }
1001
1002 #[test]
1003 fn test_deadlock_detection() {
1004 let lock_manager = LockManager::new();
1005
1006 let mut wait_graph = lock_manager.wait_graph.write();
1008 let tx1 = Uuid::new_v4();
1009 let tx2 = Uuid::new_v4();
1010 let tx3 = Uuid::new_v4();
1011
1012 wait_graph.insert(tx1, vec![tx2].into_iter().collect());
1013 wait_graph.insert(tx2, vec![tx3].into_iter().collect());
1014 wait_graph.insert(tx3, vec![tx1].into_iter().collect());
1015 drop(wait_graph);
1016
1017 let cycles = lock_manager.detect_deadlocks();
1018 assert_eq!(cycles.len(), 1);
1019 assert_eq!(cycles[0].len(), 3);
1020 }
1021}