1use std::collections::HashMap;
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::time::{Duration, Instant};
42
43use super::error::SinkError;
44use super::traits::TransactionId;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum CoordinatorDecision {
49 Preparing,
51 Committed,
53 Aborted,
55}
56
57impl CoordinatorDecision {
58 #[must_use]
60 pub fn to_byte(&self) -> u8 {
61 match self {
62 Self::Preparing => 0,
63 Self::Committed => 1,
64 Self::Aborted => 2,
65 }
66 }
67
68 #[must_use]
70 pub fn from_byte(b: u8) -> Option<Self> {
71 match b {
72 0 => Some(Self::Preparing),
73 1 => Some(Self::Committed),
74 2 => Some(Self::Aborted),
75 _ => None,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ParticipantVote {
83 Yes,
85 No,
87 Timeout,
89}
90
91#[derive(Debug, Clone)]
93pub struct ParticipantState {
94 pub id: String,
96 pub vote: Option<ParticipantVote>,
98 pub prepare_sent: bool,
100 pub acknowledged: bool,
102 pub last_activity: Instant,
104}
105
106impl ParticipantState {
107 #[must_use]
109 pub fn new(id: impl Into<String>) -> Self {
110 Self {
111 id: id.into(),
112 vote: None,
113 prepare_sent: false,
114 acknowledged: false,
115 last_activity: Instant::now(),
116 }
117 }
118
119 pub fn mark_prepare_sent(&mut self) {
121 self.prepare_sent = true;
122 self.last_activity = Instant::now();
123 }
124
125 pub fn record_vote(&mut self, vote: ParticipantVote) {
127 self.vote = Some(vote);
128 self.last_activity = Instant::now();
129 }
130
131 pub fn mark_acknowledged(&mut self) {
133 self.acknowledged = true;
134 self.last_activity = Instant::now();
135 }
136
137 #[must_use]
139 pub fn is_timed_out(&self, timeout: Duration) -> bool {
140 self.last_activity.elapsed() > timeout
141 }
142
143 #[must_use]
145 pub fn voted_yes(&self) -> bool {
146 matches!(self.vote, Some(ParticipantVote::Yes))
147 }
148
149 #[must_use]
151 pub fn voted_no_or_timeout(&self) -> bool {
152 matches!(
153 self.vote,
154 Some(ParticipantVote::No | ParticipantVote::Timeout)
155 )
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct TransactionRecord {
162 pub tx_id: TransactionId,
164 pub decision: CoordinatorDecision,
166 pub participants: HashMap<String, ParticipantState>,
168 pub started_at: Instant,
170 pub decision_at: Option<Instant>,
172}
173
174impl TransactionRecord {
175 #[must_use]
177 pub fn new(tx_id: TransactionId, participant_ids: &[String]) -> Self {
178 let participants = participant_ids
179 .iter()
180 .map(|id| (id.clone(), ParticipantState::new(id)))
181 .collect();
182
183 Self {
184 tx_id,
185 decision: CoordinatorDecision::Preparing,
186 participants,
187 started_at: Instant::now(),
188 decision_at: None,
189 }
190 }
191
192 #[must_use]
194 pub fn all_voted_yes(&self) -> bool {
195 self.participants.values().all(ParticipantState::voted_yes)
196 }
197
198 #[must_use]
200 pub fn any_voted_no_or_timeout(&self) -> bool {
201 self.participants
202 .values()
203 .any(ParticipantState::voted_no_or_timeout)
204 }
205
206 #[must_use]
208 pub fn all_acknowledged(&self) -> bool {
209 self.participants.values().all(|p| p.acknowledged)
210 }
211
212 #[must_use]
214 pub fn pending_voters(&self) -> Vec<&str> {
215 self.participants
216 .iter()
217 .filter(|(_, p)| p.vote.is_none())
218 .map(|(id, _)| id.as_str())
219 .collect()
220 }
221
222 #[must_use]
224 pub fn pending_acknowledgments(&self) -> Vec<&str> {
225 self.participants
226 .iter()
227 .filter(|(_, p)| !p.acknowledged)
228 .map(|(id, _)| id.as_str())
229 .collect()
230 }
231
232 pub fn make_decision(&mut self, decision: CoordinatorDecision) {
234 self.decision = decision;
235 self.decision_at = Some(Instant::now());
236 }
237}
238
239#[derive(Debug, Clone)]
241pub struct TransactionLogEntry {
242 pub tx_id: TransactionId,
244 pub decision: CoordinatorDecision,
246 pub participants: Vec<String>,
248 pub timestamp: u64,
250}
251
252impl TransactionLogEntry {
253 #[must_use]
255 #[allow(clippy::cast_possible_truncation)] pub fn to_bytes(&self) -> Vec<u8> {
257 let mut bytes = Vec::new();
258
259 bytes.push(1u8);
261
262 let tx_bytes = self.tx_id.to_bytes();
264 bytes.extend_from_slice(&(tx_bytes.len() as u32).to_le_bytes());
265 bytes.extend_from_slice(&tx_bytes);
266
267 bytes.push(self.decision.to_byte());
269
270 bytes.extend_from_slice(&self.timestamp.to_le_bytes());
272
273 bytes.extend_from_slice(&(self.participants.len() as u32).to_le_bytes());
275 for participant in &self.participants {
276 bytes.extend_from_slice(&(participant.len() as u32).to_le_bytes());
277 bytes.extend_from_slice(participant.as_bytes());
278 }
279
280 bytes
281 }
282
283 #[must_use]
289 #[allow(clippy::missing_panics_doc)]
290 pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
291 if bytes.is_empty() {
292 return None;
293 }
294
295 let mut pos = 0;
296
297 let version = bytes[pos];
299 pos += 1;
300 if version != 1 {
301 return None;
302 }
303
304 if pos + 4 > bytes.len() {
306 return None;
307 }
308 let tx_len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?) as usize;
309 pos += 4;
310 if pos + tx_len > bytes.len() {
311 return None;
312 }
313 let tx_id = TransactionId::from_bytes(&bytes[pos..pos + tx_len])?;
314 pos += tx_len;
315
316 if pos >= bytes.len() {
318 return None;
319 }
320 let decision = CoordinatorDecision::from_byte(bytes[pos])?;
321 pos += 1;
322
323 if pos + 8 > bytes.len() {
325 return None;
326 }
327 let timestamp = u64::from_le_bytes(bytes[pos..pos + 8].try_into().ok()?);
328 pos += 8;
329
330 if pos + 4 > bytes.len() {
332 return None;
333 }
334 let num_participants = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?) as usize;
335 pos += 4;
336
337 let mut participants = Vec::with_capacity(num_participants);
338 for _ in 0..num_participants {
339 if pos + 4 > bytes.len() {
340 return None;
341 }
342 let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?) as usize;
343 pos += 4;
344 if pos + len > bytes.len() {
345 return None;
346 }
347 let participant = String::from_utf8_lossy(&bytes[pos..pos + len]).to_string();
348 pos += len;
349 participants.push(participant);
350 }
351
352 Some(Self {
353 tx_id,
354 decision,
355 participants,
356 timestamp,
357 })
358 }
359}
360
361#[derive(Debug)]
365pub struct TransactionLog {
366 entries: Vec<TransactionLogEntry>,
368 index: HashMap<u64, usize>,
370}
371
372impl TransactionLog {
373 #[must_use]
375 pub fn new() -> Self {
376 Self {
377 entries: Vec::new(),
378 index: HashMap::new(),
379 }
380 }
381
382 #[allow(clippy::cast_possible_truncation)] pub fn log_decision(
385 &mut self,
386 tx_id: &TransactionId,
387 decision: CoordinatorDecision,
388 participants: &[String],
389 ) {
390 let entry = TransactionLogEntry {
391 tx_id: tx_id.clone(),
392 decision,
393 participants: participants.to_vec(),
394 timestamp: std::time::SystemTime::now()
395 .duration_since(std::time::UNIX_EPOCH)
396 .unwrap_or_default()
397 .as_millis() as u64,
398 };
399
400 let idx = self.entries.len();
401 self.entries.push(entry);
402 self.index.insert(tx_id.id(), idx);
403 }
404
405 pub fn update_decision(&mut self, tx_id: &TransactionId, decision: CoordinatorDecision) {
407 if let Some(&idx) = self.index.get(&tx_id.id()) {
408 if let Some(entry) = self.entries.get_mut(idx) {
409 entry.decision = decision;
410 }
411 }
412 }
413
414 #[must_use]
416 pub fn get_decision(&self, tx_id: &TransactionId) -> Option<CoordinatorDecision> {
417 self.index
418 .get(&tx_id.id())
419 .and_then(|&idx| self.entries.get(idx))
420 .map(|e| e.decision)
421 }
422
423 #[must_use]
425 pub fn get_entry(&self, tx_id: &TransactionId) -> Option<&TransactionLogEntry> {
426 self.index
427 .get(&tx_id.id())
428 .and_then(|&idx| self.entries.get(idx))
429 }
430
431 #[must_use]
433 pub fn entries(&self) -> &[TransactionLogEntry] {
434 &self.entries
435 }
436
437 #[must_use]
443 pub fn get_pending_for_recovery(
444 &self,
445 ) -> (Vec<&TransactionLogEntry>, Vec<&TransactionLogEntry>) {
446 let mut committed = Vec::new();
447 let mut to_abort = Vec::new();
448
449 for entry in &self.entries {
450 match entry.decision {
451 CoordinatorDecision::Committed => committed.push(entry),
452 CoordinatorDecision::Preparing => to_abort.push(entry),
453 CoordinatorDecision::Aborted => {
454 }
456 }
457 }
458
459 (committed, to_abort)
460 }
461
462 pub fn prune_completed(&mut self, tx_ids: &[TransactionId]) {
464 let ids_to_remove: std::collections::HashSet<u64> =
465 tx_ids.iter().map(TransactionId::id).collect();
466
467 self.entries
468 .retain(|e| !ids_to_remove.contains(&e.tx_id.id()));
469
470 self.index.clear();
472 for (idx, entry) in self.entries.iter().enumerate() {
473 self.index.insert(entry.tx_id.id(), idx);
474 }
475 }
476
477 #[must_use]
479 #[allow(clippy::cast_possible_truncation)] pub fn to_bytes(&self) -> Vec<u8> {
481 let mut bytes = Vec::new();
482
483 bytes.push(1u8);
485
486 bytes.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
488
489 for entry in &self.entries {
491 let entry_bytes = entry.to_bytes();
492 bytes.extend_from_slice(&(entry_bytes.len() as u32).to_le_bytes());
493 bytes.extend_from_slice(&entry_bytes);
494 }
495
496 bytes
497 }
498
499 #[allow(clippy::missing_panics_doc)]
505 pub fn from_bytes(bytes: &[u8]) -> Result<Self, SinkError> {
506 if bytes.is_empty() {
507 return Err(SinkError::CheckpointError(
508 "Empty transaction log".to_string(),
509 ));
510 }
511
512 let mut pos = 0;
513
514 let version = bytes[pos];
516 pos += 1;
517 if version != 1 {
518 return Err(SinkError::CheckpointError(format!(
519 "Unsupported log version: {version}"
520 )));
521 }
522
523 if pos + 4 > bytes.len() {
525 return Err(SinkError::CheckpointError(
526 "Unexpected end of log".to_string(),
527 ));
528 }
529 let num_entries = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
531 pos += 4;
532
533 let mut entries = Vec::with_capacity(num_entries);
534 let mut index = HashMap::new();
535
536 for i in 0..num_entries {
537 if pos + 4 > bytes.len() {
538 return Err(SinkError::CheckpointError(
539 "Unexpected end of log".to_string(),
540 ));
541 }
542 let entry_len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
544 pos += 4;
545
546 if pos + entry_len > bytes.len() {
547 return Err(SinkError::CheckpointError(
548 "Invalid entry length".to_string(),
549 ));
550 }
551 let entry = TransactionLogEntry::from_bytes(&bytes[pos..pos + entry_len])
552 .ok_or_else(|| SinkError::CheckpointError("Invalid log entry".to_string()))?;
553 pos += entry_len;
554
555 index.insert(entry.tx_id.id(), i);
556 entries.push(entry);
557 }
558
559 Ok(Self { entries, index })
560 }
561}
562
563impl Default for TransactionLog {
564 fn default() -> Self {
565 Self::new()
566 }
567}
568
569#[derive(Debug, Clone)]
571pub struct TwoPhaseConfig {
572 pub prepare_timeout: Duration,
574 pub commit_timeout: Duration,
576 pub max_retries: u32,
578 pub retry_delay: Duration,
580}
581
582impl Default for TwoPhaseConfig {
583 fn default() -> Self {
584 Self {
585 prepare_timeout: Duration::from_secs(30),
586 commit_timeout: Duration::from_secs(60),
587 max_retries: 3,
588 retry_delay: Duration::from_millis(100),
589 }
590 }
591}
592
593#[derive(Debug, Clone, PartialEq, Eq)]
595pub enum TwoPhaseResult {
596 Committed,
598 Aborted(String),
600 Timeout,
602}
603
604pub struct TwoPhaseCoordinator {
609 config: TwoPhaseConfig,
611 log: TransactionLog,
613 active: HashMap<u64, TransactionRecord>,
615 participants: Vec<String>,
617 next_tx_id: AtomicU64,
619}
620
621impl TwoPhaseCoordinator {
622 #[must_use]
624 pub fn new(config: TwoPhaseConfig) -> Self {
625 Self {
626 config,
627 log: TransactionLog::new(),
628 active: HashMap::new(),
629 participants: Vec::new(),
630 next_tx_id: AtomicU64::new(1),
631 }
632 }
633
634 #[must_use]
636 pub fn with_defaults() -> Self {
637 Self::new(TwoPhaseConfig::default())
638 }
639
640 pub fn register_participant(&mut self, sink_id: impl Into<String>) {
642 let id = sink_id.into();
643 if !self.participants.contains(&id) {
644 self.participants.push(id);
645 }
646 }
647
648 pub fn unregister_participant(&mut self, sink_id: &str) {
650 self.participants.retain(|id| id != sink_id);
651 }
652
653 #[must_use]
655 pub fn participants(&self) -> &[String] {
656 &self.participants
657 }
658
659 pub fn begin_transaction(&mut self) -> Result<TransactionId, SinkError> {
665 if self.participants.is_empty() {
666 return Err(SinkError::ConfigurationError(
667 "No participants registered".to_string(),
668 ));
669 }
670
671 let tx_id = TransactionId::new(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
672 let record = TransactionRecord::new(tx_id.clone(), &self.participants);
673
674 self.log
676 .log_decision(&tx_id, CoordinatorDecision::Preparing, &self.participants);
677
678 self.active.insert(tx_id.id(), record);
679
680 Ok(tx_id)
681 }
682
683 #[must_use]
685 pub fn get_prepare_targets(&self, tx_id: &TransactionId) -> Vec<String> {
686 self.active
687 .get(&tx_id.id())
688 .map(|record| {
689 record
690 .participants
691 .iter()
692 .filter(|(_, p)| !p.prepare_sent)
693 .map(|(id, _)| id.clone())
694 .collect()
695 })
696 .unwrap_or_default()
697 }
698
699 pub fn mark_prepare_sent(&mut self, tx_id: &TransactionId, participant_id: &str) {
701 if let Some(record) = self.active.get_mut(&tx_id.id()) {
702 if let Some(participant) = record.participants.get_mut(participant_id) {
703 participant.mark_prepare_sent();
704 }
705 }
706 }
707
708 pub fn record_vote(
714 &mut self,
715 tx_id: &TransactionId,
716 participant_id: &str,
717 vote: ParticipantVote,
718 ) -> Result<(), SinkError> {
719 let record = self
720 .active
721 .get_mut(&tx_id.id())
722 .ok_or_else(|| SinkError::Internal(format!("Transaction not found: {}", tx_id.id())))?;
723
724 if let Some(participant) = record.participants.get_mut(participant_id) {
725 participant.record_vote(vote);
726 }
727
728 Ok(())
729 }
730
731 #[must_use]
733 pub fn can_decide(&self, tx_id: &TransactionId) -> bool {
734 self.active
735 .get(&tx_id.id())
736 .is_some_and(|record| record.all_voted_yes() || record.any_voted_no_or_timeout())
737 }
738
739 pub fn decide(&mut self, tx_id: &TransactionId) -> Result<CoordinatorDecision, SinkError> {
747 let record = self
748 .active
749 .get_mut(&tx_id.id())
750 .ok_or_else(|| SinkError::Internal(format!("Transaction not found: {}", tx_id.id())))?;
751
752 let decision = if record.all_voted_yes() {
753 CoordinatorDecision::Committed
754 } else {
755 CoordinatorDecision::Aborted
756 };
757
758 record.make_decision(decision);
760
761 self.log.update_decision(tx_id, decision);
763
764 Ok(decision)
765 }
766
767 pub fn mark_acknowledged(&mut self, tx_id: &TransactionId, participant_id: &str) {
769 if let Some(record) = self.active.get_mut(&tx_id.id()) {
770 if let Some(participant) = record.participants.get_mut(participant_id) {
771 participant.mark_acknowledged();
772 }
773 }
774 }
775
776 #[must_use]
778 pub fn is_complete(&self, tx_id: &TransactionId) -> bool {
779 self.active
780 .get(&tx_id.id())
781 .is_some_and(TransactionRecord::all_acknowledged)
782 }
783
784 pub fn complete(&mut self, tx_id: &TransactionId) {
786 self.active.remove(&tx_id.id());
787 self.log.prune_completed(std::slice::from_ref(tx_id));
788 }
789
790 #[must_use]
792 pub fn get_decision(&self, tx_id: &TransactionId) -> Option<CoordinatorDecision> {
793 self.active
794 .get(&tx_id.id())
795 .map(|r| r.decision)
796 .or_else(|| self.log.get_decision(tx_id))
797 }
798
799 pub fn check_timeouts(&mut self, tx_id: &TransactionId) {
801 if let Some(record) = self.active.get_mut(&tx_id.id()) {
802 for participant in record.participants.values_mut() {
803 if participant.vote.is_none()
804 && participant.is_timed_out(self.config.prepare_timeout)
805 {
806 participant.record_vote(ParticipantVote::Timeout);
807 }
808 }
809 }
810 }
811
812 #[must_use]
814 pub fn get_commit_targets(&self, tx_id: &TransactionId) -> Vec<String> {
815 self.active
816 .get(&tx_id.id())
817 .map(|record| {
818 record
819 .participants
820 .iter()
821 .filter(|(_, p)| !p.acknowledged)
822 .map(|(id, _)| id.clone())
823 .collect()
824 })
825 .unwrap_or_default()
826 }
827
828 #[must_use]
830 pub fn transaction_log(&self) -> &TransactionLog {
831 &self.log
832 }
833
834 pub fn transaction_log_mut(&mut self) -> &mut TransactionLog {
836 &mut self.log
837 }
838
839 #[must_use]
841 pub fn get_transaction(&self, tx_id: &TransactionId) -> Option<&TransactionRecord> {
842 self.active.get(&tx_id.id())
843 }
844
845 #[must_use]
849 pub fn recover(
850 &mut self,
851 log_bytes: &[u8],
852 ) -> (Vec<TransactionLogEntry>, Vec<TransactionLogEntry>) {
853 match TransactionLog::from_bytes(log_bytes) {
854 Ok(log) => {
855 let (committed, to_abort) = log.get_pending_for_recovery();
856 let committed_owned: Vec<_> = committed.iter().map(|e| (*e).clone()).collect();
857 let abort_owned: Vec<_> = to_abort.iter().map(|e| (*e).clone()).collect();
858 self.log = log;
859 (committed_owned, abort_owned)
860 }
861 Err(_) => (Vec::new(), Vec::new()),
862 }
863 }
864
865 pub fn force_abort(&mut self, tx_id: &TransactionId) -> Result<(), SinkError> {
871 if let Some(record) = self.active.get_mut(&tx_id.id()) {
872 record.make_decision(CoordinatorDecision::Aborted);
873 self.log
874 .update_decision(tx_id, CoordinatorDecision::Aborted);
875 Ok(())
876 } else {
877 Err(SinkError::Internal(format!(
878 "Transaction not found: {}",
879 tx_id.id()
880 )))
881 }
882 }
883
884 #[must_use]
886 pub fn config(&self) -> &TwoPhaseConfig {
887 &self.config
888 }
889}
890
891impl Default for TwoPhaseCoordinator {
892 fn default() -> Self {
893 Self::with_defaults()
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900
901 #[test]
902 fn test_coordinator_decision_serialization() {
903 assert_eq!(
904 CoordinatorDecision::from_byte(CoordinatorDecision::Preparing.to_byte()),
905 Some(CoordinatorDecision::Preparing)
906 );
907 assert_eq!(
908 CoordinatorDecision::from_byte(CoordinatorDecision::Committed.to_byte()),
909 Some(CoordinatorDecision::Committed)
910 );
911 assert_eq!(
912 CoordinatorDecision::from_byte(CoordinatorDecision::Aborted.to_byte()),
913 Some(CoordinatorDecision::Aborted)
914 );
915 assert_eq!(CoordinatorDecision::from_byte(99), None);
916 }
917
918 #[test]
919 fn test_participant_state() {
920 let mut state = ParticipantState::new("sink-1");
921 assert!(state.vote.is_none());
922 assert!(!state.prepare_sent);
923 assert!(!state.acknowledged);
924
925 state.mark_prepare_sent();
926 assert!(state.prepare_sent);
927
928 state.record_vote(ParticipantVote::Yes);
929 assert!(state.voted_yes());
930 assert!(!state.voted_no_or_timeout());
931
932 state.mark_acknowledged();
933 assert!(state.acknowledged);
934 }
935
936 #[test]
937 fn test_participant_timeout_vote() {
938 let mut state = ParticipantState::new("sink-1");
939 state.record_vote(ParticipantVote::Timeout);
940 assert!(state.voted_no_or_timeout());
941 assert!(!state.voted_yes());
942 }
943
944 #[test]
945 fn test_transaction_record() {
946 let tx_id = TransactionId::new(1);
947 let participants = vec!["sink-1".to_string(), "sink-2".to_string()];
948 let mut record = TransactionRecord::new(tx_id, &participants);
949
950 assert!(!record.all_voted_yes());
951 assert!(!record.any_voted_no_or_timeout());
952 assert_eq!(record.pending_voters().len(), 2);
953
954 record
956 .participants
957 .get_mut("sink-1")
958 .unwrap()
959 .record_vote(ParticipantVote::Yes);
960 assert!(!record.all_voted_yes());
961 assert_eq!(record.pending_voters().len(), 1);
962
963 record
965 .participants
966 .get_mut("sink-2")
967 .unwrap()
968 .record_vote(ParticipantVote::Yes);
969 assert!(record.all_voted_yes());
970 assert!(record.pending_voters().is_empty());
971 }
972
973 #[test]
974 fn test_transaction_record_with_no_vote() {
975 let tx_id = TransactionId::new(1);
976 let participants = vec!["sink-1".to_string(), "sink-2".to_string()];
977 let mut record = TransactionRecord::new(tx_id, &participants);
978
979 record
980 .participants
981 .get_mut("sink-1")
982 .unwrap()
983 .record_vote(ParticipantVote::Yes);
984 record
985 .participants
986 .get_mut("sink-2")
987 .unwrap()
988 .record_vote(ParticipantVote::No);
989
990 assert!(!record.all_voted_yes());
991 assert!(record.any_voted_no_or_timeout());
992 }
993
994 #[test]
995 fn test_transaction_log_entry_serialization() {
996 let entry = TransactionLogEntry {
997 tx_id: TransactionId::new(123),
998 decision: CoordinatorDecision::Committed,
999 participants: vec!["sink-1".to_string(), "sink-2".to_string()],
1000 timestamp: 1_706_000_000_000,
1001 };
1002
1003 let bytes = entry.to_bytes();
1004 let restored = TransactionLogEntry::from_bytes(&bytes).unwrap();
1005
1006 assert_eq!(restored.tx_id.id(), 123);
1007 assert_eq!(restored.decision, CoordinatorDecision::Committed);
1008 assert_eq!(restored.participants, vec!["sink-1", "sink-2"]);
1009 assert_eq!(restored.timestamp, 1_706_000_000_000);
1010 }
1011
1012 #[test]
1013 fn test_transaction_log() {
1014 let mut log = TransactionLog::new();
1015
1016 let tx1 = TransactionId::new(1);
1017 let tx2 = TransactionId::new(2);
1018 let participants = vec!["sink-1".to_string()];
1019
1020 log.log_decision(&tx1, CoordinatorDecision::Preparing, &participants);
1021 log.log_decision(&tx2, CoordinatorDecision::Committed, &participants);
1022
1023 assert_eq!(log.get_decision(&tx1), Some(CoordinatorDecision::Preparing));
1024 assert_eq!(log.get_decision(&tx2), Some(CoordinatorDecision::Committed));
1025
1026 log.update_decision(&tx1, CoordinatorDecision::Committed);
1028 assert_eq!(log.get_decision(&tx1), Some(CoordinatorDecision::Committed));
1029 }
1030
1031 #[test]
1032 fn test_transaction_log_recovery() {
1033 let mut log = TransactionLog::new();
1034
1035 let tx1 = TransactionId::new(1);
1036 let tx2 = TransactionId::new(2);
1037 let tx3 = TransactionId::new(3);
1038 let participants = vec!["sink-1".to_string()];
1039
1040 log.log_decision(&tx1, CoordinatorDecision::Preparing, &participants);
1041 log.log_decision(&tx2, CoordinatorDecision::Committed, &participants);
1042 log.log_decision(&tx3, CoordinatorDecision::Aborted, &participants);
1043
1044 let (committed, to_abort) = log.get_pending_for_recovery();
1045
1046 assert_eq!(committed.len(), 1);
1047 assert_eq!(committed[0].tx_id.id(), 2);
1048
1049 assert_eq!(to_abort.len(), 1);
1050 assert_eq!(to_abort[0].tx_id.id(), 1);
1051 }
1052
1053 #[test]
1054 fn test_transaction_log_serialization() {
1055 let mut log = TransactionLog::new();
1056
1057 let tx1 = TransactionId::new(1);
1058 let participants = vec!["sink-1".to_string(), "sink-2".to_string()];
1059
1060 log.log_decision(&tx1, CoordinatorDecision::Committed, &participants);
1061
1062 let bytes = log.to_bytes();
1063 let restored = TransactionLog::from_bytes(&bytes).unwrap();
1064
1065 assert_eq!(
1066 restored.get_decision(&tx1),
1067 Some(CoordinatorDecision::Committed)
1068 );
1069 assert_eq!(restored.entries().len(), 1);
1070 }
1071
1072 #[test]
1073 fn test_coordinator_begin_transaction() {
1074 let mut coord = TwoPhaseCoordinator::with_defaults();
1075 coord.register_participant("sink-1");
1076 coord.register_participant("sink-2");
1077
1078 let tx_id = coord.begin_transaction().unwrap();
1079 assert!(coord.get_transaction(&tx_id).is_some());
1080 assert_eq!(
1081 coord.get_decision(&tx_id),
1082 Some(CoordinatorDecision::Preparing)
1083 );
1084 }
1085
1086 #[test]
1087 fn test_coordinator_no_participants() {
1088 let mut coord = TwoPhaseCoordinator::with_defaults();
1089 let result = coord.begin_transaction();
1090 assert!(matches!(result, Err(SinkError::ConfigurationError(_))));
1091 }
1092
1093 #[test]
1094 fn test_coordinator_full_commit_flow() {
1095 let mut coord = TwoPhaseCoordinator::with_defaults();
1096 coord.register_participant("sink-1");
1097 coord.register_participant("sink-2");
1098
1099 let tx_id = coord.begin_transaction().unwrap();
1101
1102 coord.mark_prepare_sent(&tx_id, "sink-1");
1104 coord.mark_prepare_sent(&tx_id, "sink-2");
1105
1106 coord
1108 .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1109 .unwrap();
1110 assert!(!coord.can_decide(&tx_id));
1111
1112 coord
1113 .record_vote(&tx_id, "sink-2", ParticipantVote::Yes)
1114 .unwrap();
1115 assert!(coord.can_decide(&tx_id));
1116
1117 let decision = coord.decide(&tx_id).unwrap();
1119 assert_eq!(decision, CoordinatorDecision::Committed);
1120
1121 coord.mark_acknowledged(&tx_id, "sink-1");
1123 assert!(!coord.is_complete(&tx_id));
1124
1125 coord.mark_acknowledged(&tx_id, "sink-2");
1126 assert!(coord.is_complete(&tx_id));
1127
1128 coord.complete(&tx_id);
1130 assert!(coord.get_transaction(&tx_id).is_none());
1131 }
1132
1133 #[test]
1134 fn test_coordinator_abort_on_no_vote() {
1135 let mut coord = TwoPhaseCoordinator::with_defaults();
1136 coord.register_participant("sink-1");
1137 coord.register_participant("sink-2");
1138
1139 let tx_id = coord.begin_transaction().unwrap();
1140
1141 coord
1142 .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1143 .unwrap();
1144 coord
1145 .record_vote(&tx_id, "sink-2", ParticipantVote::No)
1146 .unwrap();
1147
1148 assert!(coord.can_decide(&tx_id));
1149
1150 let decision = coord.decide(&tx_id).unwrap();
1151 assert_eq!(decision, CoordinatorDecision::Aborted);
1152 }
1153
1154 #[test]
1155 fn test_coordinator_abort_on_timeout() {
1156 let mut coord = TwoPhaseCoordinator::new(TwoPhaseConfig {
1157 prepare_timeout: Duration::from_millis(1),
1158 ..Default::default()
1159 });
1160 coord.register_participant("sink-1");
1161
1162 let tx_id = coord.begin_transaction().unwrap();
1163 coord.mark_prepare_sent(&tx_id, "sink-1");
1164
1165 std::thread::sleep(Duration::from_millis(5));
1167 coord.check_timeouts(&tx_id);
1168
1169 assert!(coord.can_decide(&tx_id));
1170
1171 let decision = coord.decide(&tx_id).unwrap();
1172 assert_eq!(decision, CoordinatorDecision::Aborted);
1173 }
1174
1175 #[test]
1176 fn test_coordinator_force_abort() {
1177 let mut coord = TwoPhaseCoordinator::with_defaults();
1178 coord.register_participant("sink-1");
1179
1180 let tx_id = coord.begin_transaction().unwrap();
1181 coord.force_abort(&tx_id).unwrap();
1182
1183 assert_eq!(
1184 coord.get_decision(&tx_id),
1185 Some(CoordinatorDecision::Aborted)
1186 );
1187 }
1188
1189 #[test]
1190 fn test_coordinator_recovery() {
1191 let mut coord1 = TwoPhaseCoordinator::with_defaults();
1193 coord1.register_participant("sink-1");
1194
1195 let tx_id = coord1.begin_transaction().unwrap();
1196 coord1
1197 .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1198 .unwrap();
1199 coord1.decide(&tx_id).unwrap();
1200
1201 let log_bytes = coord1.transaction_log().to_bytes();
1203
1204 let mut coord2 = TwoPhaseCoordinator::with_defaults();
1206 let (committed, to_abort) = coord2.recover(&log_bytes);
1207
1208 assert_eq!(committed.len(), 1);
1209 assert!(to_abort.is_empty());
1210 }
1211
1212 #[test]
1213 fn test_coordinator_recovery_presumed_abort() {
1214 let mut coord1 = TwoPhaseCoordinator::with_defaults();
1216 coord1.register_participant("sink-1");
1217
1218 let _tx_id = coord1.begin_transaction().unwrap();
1219 let log_bytes = coord1.transaction_log().to_bytes();
1223
1224 let mut coord2 = TwoPhaseCoordinator::with_defaults();
1226 let (committed, to_abort) = coord2.recover(&log_bytes);
1227
1228 assert!(committed.is_empty());
1230 assert_eq!(to_abort.len(), 1);
1231 }
1232
1233 #[test]
1234 fn test_get_prepare_and_commit_targets() {
1235 let mut coord = TwoPhaseCoordinator::with_defaults();
1236 coord.register_participant("sink-1");
1237 coord.register_participant("sink-2");
1238
1239 let tx_id = coord.begin_transaction().unwrap();
1240
1241 let targets = coord.get_prepare_targets(&tx_id);
1243 assert_eq!(targets.len(), 2);
1244
1245 coord.mark_prepare_sent(&tx_id, "sink-1");
1247 let targets = coord.get_prepare_targets(&tx_id);
1248 assert_eq!(targets.len(), 1);
1249 assert_eq!(targets[0], "sink-2");
1250
1251 coord.mark_prepare_sent(&tx_id, "sink-2");
1253 coord
1254 .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1255 .unwrap();
1256 coord
1257 .record_vote(&tx_id, "sink-2", ParticipantVote::Yes)
1258 .unwrap();
1259 coord.decide(&tx_id).unwrap();
1260
1261 let commit_targets = coord.get_commit_targets(&tx_id);
1262 assert_eq!(commit_targets.len(), 2);
1263
1264 coord.mark_acknowledged(&tx_id, "sink-1");
1265 let commit_targets = coord.get_commit_targets(&tx_id);
1266 assert_eq!(commit_targets.len(), 1);
1267 assert_eq!(commit_targets[0], "sink-2");
1268 }
1269
1270 #[test]
1271 fn test_config_values() {
1272 let config = TwoPhaseConfig::default();
1273 assert_eq!(config.prepare_timeout, Duration::from_secs(30));
1274 assert_eq!(config.commit_timeout, Duration::from_secs(60));
1275 assert_eq!(config.max_retries, 3);
1276 }
1277
1278 #[test]
1279 fn test_participant_registration() {
1280 let mut coord = TwoPhaseCoordinator::with_defaults();
1281
1282 coord.register_participant("sink-1");
1283 coord.register_participant("sink-2");
1284 coord.register_participant("sink-1"); assert_eq!(coord.participants().len(), 2);
1287
1288 coord.unregister_participant("sink-1");
1289 assert_eq!(coord.participants().len(), 1);
1290 assert_eq!(coord.participants()[0], "sink-2");
1291 }
1292}