Skip to main content

laminar_core/sink/
two_phase.rs

1//! Two-Phase Commit (2PC) Protocol Implementation (F024)
2//!
3//! Provides distributed transaction coordination across multiple sinks with:
4//! - Coordinator/participant protocol
5//! - Transaction log for crash recovery
6//! - Presumed abort semantics
7//! - Timeout handling
8//!
9//! # Protocol Overview
10//!
11//! ```text
12//! Coordinator                         Participants
13//!     │                                   │
14//!     │──────── PREPARE ─────────────────▶│
15//!     │                                   │
16//!     │◀─────── VOTE (Yes/No) ───────────│
17//!     │                                   │
18//!     │ (Log decision)                    │
19//!     │                                   │
20//!     │──────── COMMIT/ABORT ────────────▶│
21//!     │                                   │
22//!     │◀─────── ACK ─────────────────────│
23//!     │                                   │
24//! ```
25//!
26//! # Presumed Abort
27//!
28//! If the coordinator crashes before logging a COMMIT decision, all participants
29//! will abort the transaction on recovery. This is the "presumed abort" protocol.
30//!
31//! # Recovery
32//!
33//! On coordinator restart:
34//! 1. Read transaction log
35//! 2. For COMMITTED transactions: re-send commit to participants
36//! 3. For PREPARING transactions: abort (presumed abort)
37//! 4. Clean up completed transactions
38
39use std::collections::HashMap;
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::time::{Duration, Instant};
42
43use super::error::SinkError;
44use super::traits::TransactionId;
45
46/// Decision made by the coordinator
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum CoordinatorDecision {
49    /// Preparing - sent prepare to all participants
50    Preparing,
51    /// Committed - all participants voted yes, commit logged
52    Committed,
53    /// Aborted - at least one participant voted no, or timeout
54    Aborted,
55}
56
57impl CoordinatorDecision {
58    /// Serialize to byte
59    #[must_use]
60    pub fn to_byte(&self) -> u8 {
61        match self {
62            Self::Preparing => 0,
63            Self::Committed => 1,
64            Self::Aborted => 2,
65        }
66    }
67
68    /// Deserialize from byte
69    #[must_use]
70    pub fn from_byte(b: u8) -> Option<Self> {
71        match b {
72            0 => Some(Self::Preparing),
73            1 => Some(Self::Committed),
74            2 => Some(Self::Aborted),
75            _ => None,
76        }
77    }
78}
79
80/// Vote from a participant
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ParticipantVote {
83    /// Ready to commit
84    Yes,
85    /// Cannot commit
86    No,
87    /// Timed out waiting for vote
88    Timeout,
89}
90
91/// State of a participant in a transaction
92#[derive(Debug, Clone)]
93pub struct ParticipantState {
94    /// Participant identifier
95    pub id: String,
96    /// Current vote (if received)
97    pub vote: Option<ParticipantVote>,
98    /// Whether prepare was sent
99    pub prepare_sent: bool,
100    /// Whether commit/abort was acknowledged
101    pub acknowledged: bool,
102    /// Last activity timestamp
103    pub last_activity: Instant,
104}
105
106impl ParticipantState {
107    /// Create a new participant state
108    #[must_use]
109    pub fn new(id: impl Into<String>) -> Self {
110        Self {
111            id: id.into(),
112            vote: None,
113            prepare_sent: false,
114            acknowledged: false,
115            last_activity: Instant::now(),
116        }
117    }
118
119    /// Record that prepare was sent
120    pub fn mark_prepare_sent(&mut self) {
121        self.prepare_sent = true;
122        self.last_activity = Instant::now();
123    }
124
125    /// Record participant's vote
126    pub fn record_vote(&mut self, vote: ParticipantVote) {
127        self.vote = Some(vote);
128        self.last_activity = Instant::now();
129    }
130
131    /// Record acknowledgment
132    pub fn mark_acknowledged(&mut self) {
133        self.acknowledged = true;
134        self.last_activity = Instant::now();
135    }
136
137    /// Check if participant has timed out
138    #[must_use]
139    pub fn is_timed_out(&self, timeout: Duration) -> bool {
140        self.last_activity.elapsed() > timeout
141    }
142
143    /// Check if participant voted yes
144    #[must_use]
145    pub fn voted_yes(&self) -> bool {
146        matches!(self.vote, Some(ParticipantVote::Yes))
147    }
148
149    /// Check if participant voted no or timed out
150    #[must_use]
151    pub fn voted_no_or_timeout(&self) -> bool {
152        matches!(
153            self.vote,
154            Some(ParticipantVote::No | ParticipantVote::Timeout)
155        )
156    }
157}
158
159/// Transaction state in the coordinator
160#[derive(Debug, Clone)]
161pub struct TransactionRecord {
162    /// Transaction ID
163    pub tx_id: TransactionId,
164    /// Current decision
165    pub decision: CoordinatorDecision,
166    /// Participant states
167    pub participants: HashMap<String, ParticipantState>,
168    /// Transaction start time
169    pub started_at: Instant,
170    /// When decision was made
171    pub decision_at: Option<Instant>,
172}
173
174impl TransactionRecord {
175    /// Create a new transaction record
176    #[must_use]
177    pub fn new(tx_id: TransactionId, participant_ids: &[String]) -> Self {
178        let participants = participant_ids
179            .iter()
180            .map(|id| (id.clone(), ParticipantState::new(id)))
181            .collect();
182
183        Self {
184            tx_id,
185            decision: CoordinatorDecision::Preparing,
186            participants,
187            started_at: Instant::now(),
188            decision_at: None,
189        }
190    }
191
192    /// Check if all participants have voted yes
193    #[must_use]
194    pub fn all_voted_yes(&self) -> bool {
195        self.participants.values().all(ParticipantState::voted_yes)
196    }
197
198    /// Check if any participant voted no or timed out
199    #[must_use]
200    pub fn any_voted_no_or_timeout(&self) -> bool {
201        self.participants
202            .values()
203            .any(ParticipantState::voted_no_or_timeout)
204    }
205
206    /// Check if all participants have acknowledged
207    #[must_use]
208    pub fn all_acknowledged(&self) -> bool {
209        self.participants.values().all(|p| p.acknowledged)
210    }
211
212    /// Get participants that haven't voted yet
213    #[must_use]
214    pub fn pending_voters(&self) -> Vec<&str> {
215        self.participants
216            .iter()
217            .filter(|(_, p)| p.vote.is_none())
218            .map(|(id, _)| id.as_str())
219            .collect()
220    }
221
222    /// Get participants that haven't acknowledged yet
223    #[must_use]
224    pub fn pending_acknowledgments(&self) -> Vec<&str> {
225        self.participants
226            .iter()
227            .filter(|(_, p)| !p.acknowledged)
228            .map(|(id, _)| id.as_str())
229            .collect()
230    }
231
232    /// Mark decision and timestamp
233    pub fn make_decision(&mut self, decision: CoordinatorDecision) {
234        self.decision = decision;
235        self.decision_at = Some(Instant::now());
236    }
237}
238
239/// Entry in the transaction log
240#[derive(Debug, Clone)]
241pub struct TransactionLogEntry {
242    /// Transaction ID
243    pub tx_id: TransactionId,
244    /// Decision
245    pub decision: CoordinatorDecision,
246    /// Participant IDs
247    pub participants: Vec<String>,
248    /// Timestamp (epoch millis)
249    pub timestamp: u64,
250}
251
252impl TransactionLogEntry {
253    /// Serialize to bytes
254    #[must_use]
255    #[allow(clippy::cast_possible_truncation)] // Wire format uses u32 for string/collection lengths
256    pub fn to_bytes(&self) -> Vec<u8> {
257        let mut bytes = Vec::new();
258
259        // Version
260        bytes.push(1u8);
261
262        // Transaction ID
263        let tx_bytes = self.tx_id.to_bytes();
264        bytes.extend_from_slice(&(tx_bytes.len() as u32).to_le_bytes());
265        bytes.extend_from_slice(&tx_bytes);
266
267        // Decision
268        bytes.push(self.decision.to_byte());
269
270        // Timestamp
271        bytes.extend_from_slice(&self.timestamp.to_le_bytes());
272
273        // Participants
274        bytes.extend_from_slice(&(self.participants.len() as u32).to_le_bytes());
275        for participant in &self.participants {
276            bytes.extend_from_slice(&(participant.len() as u32).to_le_bytes());
277            bytes.extend_from_slice(participant.as_bytes());
278        }
279
280        bytes
281    }
282
283    /// Deserialize from bytes
284    ///
285    /// # Panics
286    ///
287    /// Will not panic — all slice indexing is bounds-checked via `Option` returns.
288    #[must_use]
289    #[allow(clippy::missing_panics_doc)]
290    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
291        if bytes.is_empty() {
292            return None;
293        }
294
295        let mut pos = 0;
296
297        // Version
298        let version = bytes[pos];
299        pos += 1;
300        if version != 1 {
301            return None;
302        }
303
304        // Transaction ID
305        if pos + 4 > bytes.len() {
306            return None;
307        }
308        let tx_len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?) as usize;
309        pos += 4;
310        if pos + tx_len > bytes.len() {
311            return None;
312        }
313        let tx_id = TransactionId::from_bytes(&bytes[pos..pos + tx_len])?;
314        pos += tx_len;
315
316        // Decision
317        if pos >= bytes.len() {
318            return None;
319        }
320        let decision = CoordinatorDecision::from_byte(bytes[pos])?;
321        pos += 1;
322
323        // Timestamp
324        if pos + 8 > bytes.len() {
325            return None;
326        }
327        let timestamp = u64::from_le_bytes(bytes[pos..pos + 8].try_into().ok()?);
328        pos += 8;
329
330        // Participants
331        if pos + 4 > bytes.len() {
332            return None;
333        }
334        let num_participants = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?) as usize;
335        pos += 4;
336
337        let mut participants = Vec::with_capacity(num_participants);
338        for _ in 0..num_participants {
339            if pos + 4 > bytes.len() {
340                return None;
341            }
342            let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().ok()?) as usize;
343            pos += 4;
344            if pos + len > bytes.len() {
345                return None;
346            }
347            let participant = String::from_utf8_lossy(&bytes[pos..pos + len]).to_string();
348            pos += len;
349            participants.push(participant);
350        }
351
352        Some(Self {
353            tx_id,
354            decision,
355            participants,
356            timestamp,
357        })
358    }
359}
360
361/// Durable transaction log for the coordinator
362///
363/// Stores coordinator decisions to enable crash recovery with presumed abort semantics.
364#[derive(Debug)]
365pub struct TransactionLog {
366    /// Log entries (in-memory, should be persisted to storage)
367    entries: Vec<TransactionLogEntry>,
368    /// Index by transaction ID
369    index: HashMap<u64, usize>,
370}
371
372impl TransactionLog {
373    /// Create a new transaction log
374    #[must_use]
375    pub fn new() -> Self {
376        Self {
377            entries: Vec::new(),
378            index: HashMap::new(),
379        }
380    }
381
382    /// Log a coordinator decision
383    #[allow(clippy::cast_possible_truncation)] // Timestamp ms fits i64 for ~292 years from epoch
384    pub fn log_decision(
385        &mut self,
386        tx_id: &TransactionId,
387        decision: CoordinatorDecision,
388        participants: &[String],
389    ) {
390        let entry = TransactionLogEntry {
391            tx_id: tx_id.clone(),
392            decision,
393            participants: participants.to_vec(),
394            timestamp: std::time::SystemTime::now()
395                .duration_since(std::time::UNIX_EPOCH)
396                .unwrap_or_default()
397                .as_millis() as u64,
398        };
399
400        let idx = self.entries.len();
401        self.entries.push(entry);
402        self.index.insert(tx_id.id(), idx);
403    }
404
405    /// Update an existing decision
406    pub fn update_decision(&mut self, tx_id: &TransactionId, decision: CoordinatorDecision) {
407        if let Some(&idx) = self.index.get(&tx_id.id()) {
408            if let Some(entry) = self.entries.get_mut(idx) {
409                entry.decision = decision;
410            }
411        }
412    }
413
414    /// Get the decision for a transaction
415    #[must_use]
416    pub fn get_decision(&self, tx_id: &TransactionId) -> Option<CoordinatorDecision> {
417        self.index
418            .get(&tx_id.id())
419            .and_then(|&idx| self.entries.get(idx))
420            .map(|e| e.decision)
421    }
422
423    /// Get an entry by transaction ID
424    #[must_use]
425    pub fn get_entry(&self, tx_id: &TransactionId) -> Option<&TransactionLogEntry> {
426        self.index
427            .get(&tx_id.id())
428            .and_then(|&idx| self.entries.get(idx))
429    }
430
431    /// Get all entries
432    #[must_use]
433    pub fn entries(&self) -> &[TransactionLogEntry] {
434        &self.entries
435    }
436
437    /// Get transactions that need recovery action.
438    ///
439    /// Returns a tuple of committed and to-abort entries:
440    /// - Committed: need to re-send commit
441    /// - To abort: were in PREPARING state (presumed abort)
442    #[must_use]
443    pub fn get_pending_for_recovery(
444        &self,
445    ) -> (Vec<&TransactionLogEntry>, Vec<&TransactionLogEntry>) {
446        let mut committed = Vec::new();
447        let mut to_abort = Vec::new();
448
449        for entry in &self.entries {
450            match entry.decision {
451                CoordinatorDecision::Committed => committed.push(entry),
452                CoordinatorDecision::Preparing => to_abort.push(entry),
453                CoordinatorDecision::Aborted => {
454                    // Already aborted, no action needed
455                }
456            }
457        }
458
459        (committed, to_abort)
460    }
461
462    /// Remove completed transactions from the log
463    pub fn prune_completed(&mut self, tx_ids: &[TransactionId]) {
464        let ids_to_remove: std::collections::HashSet<u64> =
465            tx_ids.iter().map(TransactionId::id).collect();
466
467        self.entries
468            .retain(|e| !ids_to_remove.contains(&e.tx_id.id()));
469
470        // Rebuild index
471        self.index.clear();
472        for (idx, entry) in self.entries.iter().enumerate() {
473            self.index.insert(entry.tx_id.id(), idx);
474        }
475    }
476
477    /// Serialize the entire log to bytes
478    #[must_use]
479    #[allow(clippy::cast_possible_truncation)] // Wire format uses u32 for entry/string lengths
480    pub fn to_bytes(&self) -> Vec<u8> {
481        let mut bytes = Vec::new();
482
483        // Version
484        bytes.push(1u8);
485
486        // Number of entries
487        bytes.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
488
489        // Each entry
490        for entry in &self.entries {
491            let entry_bytes = entry.to_bytes();
492            bytes.extend_from_slice(&(entry_bytes.len() as u32).to_le_bytes());
493            bytes.extend_from_slice(&entry_bytes);
494        }
495
496        bytes
497    }
498
499    /// Deserialize from bytes
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the bytes are malformed.
504    #[allow(clippy::missing_panics_doc)]
505    pub fn from_bytes(bytes: &[u8]) -> Result<Self, SinkError> {
506        if bytes.is_empty() {
507            return Err(SinkError::CheckpointError(
508                "Empty transaction log".to_string(),
509            ));
510        }
511
512        let mut pos = 0;
513
514        // Version
515        let version = bytes[pos];
516        pos += 1;
517        if version != 1 {
518            return Err(SinkError::CheckpointError(format!(
519                "Unsupported log version: {version}"
520            )));
521        }
522
523        // Number of entries
524        if pos + 4 > bytes.len() {
525            return Err(SinkError::CheckpointError(
526                "Unexpected end of log".to_string(),
527            ));
528        }
529        // SAFETY: Bounds check above guarantees slice is exactly 4 bytes
530        let num_entries = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
531        pos += 4;
532
533        let mut entries = Vec::with_capacity(num_entries);
534        let mut index = HashMap::new();
535
536        for i in 0..num_entries {
537            if pos + 4 > bytes.len() {
538                return Err(SinkError::CheckpointError(
539                    "Unexpected end of log".to_string(),
540                ));
541            }
542            // SAFETY: Bounds check above guarantees slice is exactly 4 bytes
543            let entry_len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
544            pos += 4;
545
546            if pos + entry_len > bytes.len() {
547                return Err(SinkError::CheckpointError(
548                    "Invalid entry length".to_string(),
549                ));
550            }
551            let entry = TransactionLogEntry::from_bytes(&bytes[pos..pos + entry_len])
552                .ok_or_else(|| SinkError::CheckpointError("Invalid log entry".to_string()))?;
553            pos += entry_len;
554
555            index.insert(entry.tx_id.id(), i);
556            entries.push(entry);
557        }
558
559        Ok(Self { entries, index })
560    }
561}
562
563impl Default for TransactionLog {
564    fn default() -> Self {
565        Self::new()
566    }
567}
568
569/// Configuration for the 2PC coordinator
570#[derive(Debug, Clone)]
571pub struct TwoPhaseConfig {
572    /// Timeout for participant prepare response
573    pub prepare_timeout: Duration,
574    /// Timeout for participant commit/abort acknowledgment
575    pub commit_timeout: Duration,
576    /// Maximum number of retries for commit/abort
577    pub max_retries: u32,
578    /// Delay between retries
579    pub retry_delay: Duration,
580}
581
582impl Default for TwoPhaseConfig {
583    fn default() -> Self {
584        Self {
585            prepare_timeout: Duration::from_secs(30),
586            commit_timeout: Duration::from_secs(60),
587            max_retries: 3,
588            retry_delay: Duration::from_millis(100),
589        }
590    }
591}
592
593/// Result of a 2PC transaction
594#[derive(Debug, Clone, PartialEq, Eq)]
595pub enum TwoPhaseResult {
596    /// Transaction committed successfully
597    Committed,
598    /// Transaction aborted (with reason)
599    Aborted(String),
600    /// Transaction timed out
601    Timeout,
602}
603
604/// Two-Phase Commit Coordinator
605///
606/// Coordinates distributed transactions across multiple sinks using the
607/// two-phase commit protocol with presumed abort semantics.
608pub struct TwoPhaseCoordinator {
609    /// Configuration
610    config: TwoPhaseConfig,
611    /// Transaction log for durability
612    log: TransactionLog,
613    /// Active transactions
614    active: HashMap<u64, TransactionRecord>,
615    /// Registered participant IDs
616    participants: Vec<String>,
617    /// Transaction ID counter
618    next_tx_id: AtomicU64,
619}
620
621impl TwoPhaseCoordinator {
622    /// Create a new 2PC coordinator
623    #[must_use]
624    pub fn new(config: TwoPhaseConfig) -> Self {
625        Self {
626            config,
627            log: TransactionLog::new(),
628            active: HashMap::new(),
629            participants: Vec::new(),
630            next_tx_id: AtomicU64::new(1),
631        }
632    }
633
634    /// Create with default configuration
635    #[must_use]
636    pub fn with_defaults() -> Self {
637        Self::new(TwoPhaseConfig::default())
638    }
639
640    /// Register a participant sink
641    pub fn register_participant(&mut self, sink_id: impl Into<String>) {
642        let id = sink_id.into();
643        if !self.participants.contains(&id) {
644            self.participants.push(id);
645        }
646    }
647
648    /// Unregister a participant
649    pub fn unregister_participant(&mut self, sink_id: &str) {
650        self.participants.retain(|id| id != sink_id);
651    }
652
653    /// Get registered participants
654    #[must_use]
655    pub fn participants(&self) -> &[String] {
656        &self.participants
657    }
658
659    /// Begin a new 2PC transaction
660    ///
661    /// # Errors
662    ///
663    /// Returns an error if no participants are registered.
664    pub fn begin_transaction(&mut self) -> Result<TransactionId, SinkError> {
665        if self.participants.is_empty() {
666            return Err(SinkError::ConfigurationError(
667                "No participants registered".to_string(),
668            ));
669        }
670
671        let tx_id = TransactionId::new(self.next_tx_id.fetch_add(1, Ordering::SeqCst));
672        let record = TransactionRecord::new(tx_id.clone(), &self.participants);
673
674        // Log the preparing state
675        self.log
676            .log_decision(&tx_id, CoordinatorDecision::Preparing, &self.participants);
677
678        self.active.insert(tx_id.id(), record);
679
680        Ok(tx_id)
681    }
682
683    /// Get participants that need prepare sent
684    #[must_use]
685    pub fn get_prepare_targets(&self, tx_id: &TransactionId) -> Vec<String> {
686        self.active
687            .get(&tx_id.id())
688            .map(|record| {
689                record
690                    .participants
691                    .iter()
692                    .filter(|(_, p)| !p.prepare_sent)
693                    .map(|(id, _)| id.clone())
694                    .collect()
695            })
696            .unwrap_or_default()
697    }
698
699    /// Mark that prepare was sent to a participant
700    pub fn mark_prepare_sent(&mut self, tx_id: &TransactionId, participant_id: &str) {
701        if let Some(record) = self.active.get_mut(&tx_id.id()) {
702            if let Some(participant) = record.participants.get_mut(participant_id) {
703                participant.mark_prepare_sent();
704            }
705        }
706    }
707
708    /// Record a participant's vote
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if the transaction is not found.
713    pub fn record_vote(
714        &mut self,
715        tx_id: &TransactionId,
716        participant_id: &str,
717        vote: ParticipantVote,
718    ) -> Result<(), SinkError> {
719        let record = self
720            .active
721            .get_mut(&tx_id.id())
722            .ok_or_else(|| SinkError::Internal(format!("Transaction not found: {}", tx_id.id())))?;
723
724        if let Some(participant) = record.participants.get_mut(participant_id) {
725            participant.record_vote(vote);
726        }
727
728        Ok(())
729    }
730
731    /// Check if we can make a decision (all votes received or any no/timeout)
732    #[must_use]
733    pub fn can_decide(&self, tx_id: &TransactionId) -> bool {
734        self.active
735            .get(&tx_id.id())
736            .is_some_and(|record| record.all_voted_yes() || record.any_voted_no_or_timeout())
737    }
738
739    /// Make the commit/abort decision
740    ///
741    /// Returns the decision and logs it durably.
742    ///
743    /// # Errors
744    ///
745    /// Returns an error if the transaction is not found.
746    pub fn decide(&mut self, tx_id: &TransactionId) -> Result<CoordinatorDecision, SinkError> {
747        let record = self
748            .active
749            .get_mut(&tx_id.id())
750            .ok_or_else(|| SinkError::Internal(format!("Transaction not found: {}", tx_id.id())))?;
751
752        let decision = if record.all_voted_yes() {
753            CoordinatorDecision::Committed
754        } else {
755            CoordinatorDecision::Aborted
756        };
757
758        // Update in-memory state
759        record.make_decision(decision);
760
761        // Log the decision durably (CRITICAL for recovery)
762        self.log.update_decision(tx_id, decision);
763
764        Ok(decision)
765    }
766
767    /// Mark that a participant acknowledged the commit/abort
768    pub fn mark_acknowledged(&mut self, tx_id: &TransactionId, participant_id: &str) {
769        if let Some(record) = self.active.get_mut(&tx_id.id()) {
770            if let Some(participant) = record.participants.get_mut(participant_id) {
771                participant.mark_acknowledged();
772            }
773        }
774    }
775
776    /// Check if transaction is complete (all participants acknowledged)
777    #[must_use]
778    pub fn is_complete(&self, tx_id: &TransactionId) -> bool {
779        self.active
780            .get(&tx_id.id())
781            .is_some_and(TransactionRecord::all_acknowledged)
782    }
783
784    /// Complete a transaction and clean up
785    pub fn complete(&mut self, tx_id: &TransactionId) {
786        self.active.remove(&tx_id.id());
787        self.log.prune_completed(std::slice::from_ref(tx_id));
788    }
789
790    /// Get the current decision for a transaction
791    #[must_use]
792    pub fn get_decision(&self, tx_id: &TransactionId) -> Option<CoordinatorDecision> {
793        self.active
794            .get(&tx_id.id())
795            .map(|r| r.decision)
796            .or_else(|| self.log.get_decision(tx_id))
797    }
798
799    /// Check and mark timed out participants
800    pub fn check_timeouts(&mut self, tx_id: &TransactionId) {
801        if let Some(record) = self.active.get_mut(&tx_id.id()) {
802            for participant in record.participants.values_mut() {
803                if participant.vote.is_none()
804                    && participant.is_timed_out(self.config.prepare_timeout)
805                {
806                    participant.record_vote(ParticipantVote::Timeout);
807                }
808            }
809        }
810    }
811
812    /// Get participants that need commit sent
813    #[must_use]
814    pub fn get_commit_targets(&self, tx_id: &TransactionId) -> Vec<String> {
815        self.active
816            .get(&tx_id.id())
817            .map(|record| {
818                record
819                    .participants
820                    .iter()
821                    .filter(|(_, p)| !p.acknowledged)
822                    .map(|(id, _)| id.clone())
823                    .collect()
824            })
825            .unwrap_or_default()
826    }
827
828    /// Get the transaction log for persistence
829    #[must_use]
830    pub fn transaction_log(&self) -> &TransactionLog {
831        &self.log
832    }
833
834    /// Get mutable reference to transaction log
835    pub fn transaction_log_mut(&mut self) -> &mut TransactionLog {
836        &mut self.log
837    }
838
839    /// Get active transaction record
840    #[must_use]
841    pub fn get_transaction(&self, tx_id: &TransactionId) -> Option<&TransactionRecord> {
842        self.active.get(&tx_id.id())
843    }
844
845    /// Recover from a transaction log after restart.
846    ///
847    /// Returns a tuple of committed and to-abort entries that need action.
848    #[must_use]
849    pub fn recover(
850        &mut self,
851        log_bytes: &[u8],
852    ) -> (Vec<TransactionLogEntry>, Vec<TransactionLogEntry>) {
853        match TransactionLog::from_bytes(log_bytes) {
854            Ok(log) => {
855                let (committed, to_abort) = log.get_pending_for_recovery();
856                let committed_owned: Vec<_> = committed.iter().map(|e| (*e).clone()).collect();
857                let abort_owned: Vec<_> = to_abort.iter().map(|e| (*e).clone()).collect();
858                self.log = log;
859                (committed_owned, abort_owned)
860            }
861            Err(_) => (Vec::new(), Vec::new()),
862        }
863    }
864
865    /// Force abort a transaction (for timeout or error)
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the transaction is not found.
870    pub fn force_abort(&mut self, tx_id: &TransactionId) -> Result<(), SinkError> {
871        if let Some(record) = self.active.get_mut(&tx_id.id()) {
872            record.make_decision(CoordinatorDecision::Aborted);
873            self.log
874                .update_decision(tx_id, CoordinatorDecision::Aborted);
875            Ok(())
876        } else {
877            Err(SinkError::Internal(format!(
878                "Transaction not found: {}",
879                tx_id.id()
880            )))
881        }
882    }
883
884    /// Get configuration
885    #[must_use]
886    pub fn config(&self) -> &TwoPhaseConfig {
887        &self.config
888    }
889}
890
891impl Default for TwoPhaseCoordinator {
892    fn default() -> Self {
893        Self::with_defaults()
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900
901    #[test]
902    fn test_coordinator_decision_serialization() {
903        assert_eq!(
904            CoordinatorDecision::from_byte(CoordinatorDecision::Preparing.to_byte()),
905            Some(CoordinatorDecision::Preparing)
906        );
907        assert_eq!(
908            CoordinatorDecision::from_byte(CoordinatorDecision::Committed.to_byte()),
909            Some(CoordinatorDecision::Committed)
910        );
911        assert_eq!(
912            CoordinatorDecision::from_byte(CoordinatorDecision::Aborted.to_byte()),
913            Some(CoordinatorDecision::Aborted)
914        );
915        assert_eq!(CoordinatorDecision::from_byte(99), None);
916    }
917
918    #[test]
919    fn test_participant_state() {
920        let mut state = ParticipantState::new("sink-1");
921        assert!(state.vote.is_none());
922        assert!(!state.prepare_sent);
923        assert!(!state.acknowledged);
924
925        state.mark_prepare_sent();
926        assert!(state.prepare_sent);
927
928        state.record_vote(ParticipantVote::Yes);
929        assert!(state.voted_yes());
930        assert!(!state.voted_no_or_timeout());
931
932        state.mark_acknowledged();
933        assert!(state.acknowledged);
934    }
935
936    #[test]
937    fn test_participant_timeout_vote() {
938        let mut state = ParticipantState::new("sink-1");
939        state.record_vote(ParticipantVote::Timeout);
940        assert!(state.voted_no_or_timeout());
941        assert!(!state.voted_yes());
942    }
943
944    #[test]
945    fn test_transaction_record() {
946        let tx_id = TransactionId::new(1);
947        let participants = vec!["sink-1".to_string(), "sink-2".to_string()];
948        let mut record = TransactionRecord::new(tx_id, &participants);
949
950        assert!(!record.all_voted_yes());
951        assert!(!record.any_voted_no_or_timeout());
952        assert_eq!(record.pending_voters().len(), 2);
953
954        // Vote yes from sink-1
955        record
956            .participants
957            .get_mut("sink-1")
958            .unwrap()
959            .record_vote(ParticipantVote::Yes);
960        assert!(!record.all_voted_yes());
961        assert_eq!(record.pending_voters().len(), 1);
962
963        // Vote yes from sink-2
964        record
965            .participants
966            .get_mut("sink-2")
967            .unwrap()
968            .record_vote(ParticipantVote::Yes);
969        assert!(record.all_voted_yes());
970        assert!(record.pending_voters().is_empty());
971    }
972
973    #[test]
974    fn test_transaction_record_with_no_vote() {
975        let tx_id = TransactionId::new(1);
976        let participants = vec!["sink-1".to_string(), "sink-2".to_string()];
977        let mut record = TransactionRecord::new(tx_id, &participants);
978
979        record
980            .participants
981            .get_mut("sink-1")
982            .unwrap()
983            .record_vote(ParticipantVote::Yes);
984        record
985            .participants
986            .get_mut("sink-2")
987            .unwrap()
988            .record_vote(ParticipantVote::No);
989
990        assert!(!record.all_voted_yes());
991        assert!(record.any_voted_no_or_timeout());
992    }
993
994    #[test]
995    fn test_transaction_log_entry_serialization() {
996        let entry = TransactionLogEntry {
997            tx_id: TransactionId::new(123),
998            decision: CoordinatorDecision::Committed,
999            participants: vec!["sink-1".to_string(), "sink-2".to_string()],
1000            timestamp: 1_706_000_000_000,
1001        };
1002
1003        let bytes = entry.to_bytes();
1004        let restored = TransactionLogEntry::from_bytes(&bytes).unwrap();
1005
1006        assert_eq!(restored.tx_id.id(), 123);
1007        assert_eq!(restored.decision, CoordinatorDecision::Committed);
1008        assert_eq!(restored.participants, vec!["sink-1", "sink-2"]);
1009        assert_eq!(restored.timestamp, 1_706_000_000_000);
1010    }
1011
1012    #[test]
1013    fn test_transaction_log() {
1014        let mut log = TransactionLog::new();
1015
1016        let tx1 = TransactionId::new(1);
1017        let tx2 = TransactionId::new(2);
1018        let participants = vec!["sink-1".to_string()];
1019
1020        log.log_decision(&tx1, CoordinatorDecision::Preparing, &participants);
1021        log.log_decision(&tx2, CoordinatorDecision::Committed, &participants);
1022
1023        assert_eq!(log.get_decision(&tx1), Some(CoordinatorDecision::Preparing));
1024        assert_eq!(log.get_decision(&tx2), Some(CoordinatorDecision::Committed));
1025
1026        // Update decision
1027        log.update_decision(&tx1, CoordinatorDecision::Committed);
1028        assert_eq!(log.get_decision(&tx1), Some(CoordinatorDecision::Committed));
1029    }
1030
1031    #[test]
1032    fn test_transaction_log_recovery() {
1033        let mut log = TransactionLog::new();
1034
1035        let tx1 = TransactionId::new(1);
1036        let tx2 = TransactionId::new(2);
1037        let tx3 = TransactionId::new(3);
1038        let participants = vec!["sink-1".to_string()];
1039
1040        log.log_decision(&tx1, CoordinatorDecision::Preparing, &participants);
1041        log.log_decision(&tx2, CoordinatorDecision::Committed, &participants);
1042        log.log_decision(&tx3, CoordinatorDecision::Aborted, &participants);
1043
1044        let (committed, to_abort) = log.get_pending_for_recovery();
1045
1046        assert_eq!(committed.len(), 1);
1047        assert_eq!(committed[0].tx_id.id(), 2);
1048
1049        assert_eq!(to_abort.len(), 1);
1050        assert_eq!(to_abort[0].tx_id.id(), 1);
1051    }
1052
1053    #[test]
1054    fn test_transaction_log_serialization() {
1055        let mut log = TransactionLog::new();
1056
1057        let tx1 = TransactionId::new(1);
1058        let participants = vec!["sink-1".to_string(), "sink-2".to_string()];
1059
1060        log.log_decision(&tx1, CoordinatorDecision::Committed, &participants);
1061
1062        let bytes = log.to_bytes();
1063        let restored = TransactionLog::from_bytes(&bytes).unwrap();
1064
1065        assert_eq!(
1066            restored.get_decision(&tx1),
1067            Some(CoordinatorDecision::Committed)
1068        );
1069        assert_eq!(restored.entries().len(), 1);
1070    }
1071
1072    #[test]
1073    fn test_coordinator_begin_transaction() {
1074        let mut coord = TwoPhaseCoordinator::with_defaults();
1075        coord.register_participant("sink-1");
1076        coord.register_participant("sink-2");
1077
1078        let tx_id = coord.begin_transaction().unwrap();
1079        assert!(coord.get_transaction(&tx_id).is_some());
1080        assert_eq!(
1081            coord.get_decision(&tx_id),
1082            Some(CoordinatorDecision::Preparing)
1083        );
1084    }
1085
1086    #[test]
1087    fn test_coordinator_no_participants() {
1088        let mut coord = TwoPhaseCoordinator::with_defaults();
1089        let result = coord.begin_transaction();
1090        assert!(matches!(result, Err(SinkError::ConfigurationError(_))));
1091    }
1092
1093    #[test]
1094    fn test_coordinator_full_commit_flow() {
1095        let mut coord = TwoPhaseCoordinator::with_defaults();
1096        coord.register_participant("sink-1");
1097        coord.register_participant("sink-2");
1098
1099        // Begin transaction
1100        let tx_id = coord.begin_transaction().unwrap();
1101
1102        // Mark prepare sent
1103        coord.mark_prepare_sent(&tx_id, "sink-1");
1104        coord.mark_prepare_sent(&tx_id, "sink-2");
1105
1106        // Record votes
1107        coord
1108            .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1109            .unwrap();
1110        assert!(!coord.can_decide(&tx_id));
1111
1112        coord
1113            .record_vote(&tx_id, "sink-2", ParticipantVote::Yes)
1114            .unwrap();
1115        assert!(coord.can_decide(&tx_id));
1116
1117        // Make decision
1118        let decision = coord.decide(&tx_id).unwrap();
1119        assert_eq!(decision, CoordinatorDecision::Committed);
1120
1121        // Record acknowledgments
1122        coord.mark_acknowledged(&tx_id, "sink-1");
1123        assert!(!coord.is_complete(&tx_id));
1124
1125        coord.mark_acknowledged(&tx_id, "sink-2");
1126        assert!(coord.is_complete(&tx_id));
1127
1128        // Complete
1129        coord.complete(&tx_id);
1130        assert!(coord.get_transaction(&tx_id).is_none());
1131    }
1132
1133    #[test]
1134    fn test_coordinator_abort_on_no_vote() {
1135        let mut coord = TwoPhaseCoordinator::with_defaults();
1136        coord.register_participant("sink-1");
1137        coord.register_participant("sink-2");
1138
1139        let tx_id = coord.begin_transaction().unwrap();
1140
1141        coord
1142            .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1143            .unwrap();
1144        coord
1145            .record_vote(&tx_id, "sink-2", ParticipantVote::No)
1146            .unwrap();
1147
1148        assert!(coord.can_decide(&tx_id));
1149
1150        let decision = coord.decide(&tx_id).unwrap();
1151        assert_eq!(decision, CoordinatorDecision::Aborted);
1152    }
1153
1154    #[test]
1155    fn test_coordinator_abort_on_timeout() {
1156        let mut coord = TwoPhaseCoordinator::new(TwoPhaseConfig {
1157            prepare_timeout: Duration::from_millis(1),
1158            ..Default::default()
1159        });
1160        coord.register_participant("sink-1");
1161
1162        let tx_id = coord.begin_transaction().unwrap();
1163        coord.mark_prepare_sent(&tx_id, "sink-1");
1164
1165        // Wait for timeout
1166        std::thread::sleep(Duration::from_millis(5));
1167        coord.check_timeouts(&tx_id);
1168
1169        assert!(coord.can_decide(&tx_id));
1170
1171        let decision = coord.decide(&tx_id).unwrap();
1172        assert_eq!(decision, CoordinatorDecision::Aborted);
1173    }
1174
1175    #[test]
1176    fn test_coordinator_force_abort() {
1177        let mut coord = TwoPhaseCoordinator::with_defaults();
1178        coord.register_participant("sink-1");
1179
1180        let tx_id = coord.begin_transaction().unwrap();
1181        coord.force_abort(&tx_id).unwrap();
1182
1183        assert_eq!(
1184            coord.get_decision(&tx_id),
1185            Some(CoordinatorDecision::Aborted)
1186        );
1187    }
1188
1189    #[test]
1190    fn test_coordinator_recovery() {
1191        // Create coordinator and complete a transaction
1192        let mut coord1 = TwoPhaseCoordinator::with_defaults();
1193        coord1.register_participant("sink-1");
1194
1195        let tx_id = coord1.begin_transaction().unwrap();
1196        coord1
1197            .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1198            .unwrap();
1199        coord1.decide(&tx_id).unwrap();
1200
1201        // Serialize the log
1202        let log_bytes = coord1.transaction_log().to_bytes();
1203
1204        // Create new coordinator and recover
1205        let mut coord2 = TwoPhaseCoordinator::with_defaults();
1206        let (committed, to_abort) = coord2.recover(&log_bytes);
1207
1208        assert_eq!(committed.len(), 1);
1209        assert!(to_abort.is_empty());
1210    }
1211
1212    #[test]
1213    fn test_coordinator_recovery_presumed_abort() {
1214        // Create coordinator with preparing transaction
1215        let mut coord1 = TwoPhaseCoordinator::with_defaults();
1216        coord1.register_participant("sink-1");
1217
1218        let _tx_id = coord1.begin_transaction().unwrap();
1219        // Don't complete - simulates crash during prepare
1220
1221        // Serialize the log
1222        let log_bytes = coord1.transaction_log().to_bytes();
1223
1224        // Create new coordinator and recover
1225        let mut coord2 = TwoPhaseCoordinator::with_defaults();
1226        let (committed, to_abort) = coord2.recover(&log_bytes);
1227
1228        // Presumed abort: preparing transactions should be aborted
1229        assert!(committed.is_empty());
1230        assert_eq!(to_abort.len(), 1);
1231    }
1232
1233    #[test]
1234    fn test_get_prepare_and_commit_targets() {
1235        let mut coord = TwoPhaseCoordinator::with_defaults();
1236        coord.register_participant("sink-1");
1237        coord.register_participant("sink-2");
1238
1239        let tx_id = coord.begin_transaction().unwrap();
1240
1241        // Initially all need prepare
1242        let targets = coord.get_prepare_targets(&tx_id);
1243        assert_eq!(targets.len(), 2);
1244
1245        // Mark one as sent
1246        coord.mark_prepare_sent(&tx_id, "sink-1");
1247        let targets = coord.get_prepare_targets(&tx_id);
1248        assert_eq!(targets.len(), 1);
1249        assert_eq!(targets[0], "sink-2");
1250
1251        // After voting and decision, get commit targets
1252        coord.mark_prepare_sent(&tx_id, "sink-2");
1253        coord
1254            .record_vote(&tx_id, "sink-1", ParticipantVote::Yes)
1255            .unwrap();
1256        coord
1257            .record_vote(&tx_id, "sink-2", ParticipantVote::Yes)
1258            .unwrap();
1259        coord.decide(&tx_id).unwrap();
1260
1261        let commit_targets = coord.get_commit_targets(&tx_id);
1262        assert_eq!(commit_targets.len(), 2);
1263
1264        coord.mark_acknowledged(&tx_id, "sink-1");
1265        let commit_targets = coord.get_commit_targets(&tx_id);
1266        assert_eq!(commit_targets.len(), 1);
1267        assert_eq!(commit_targets[0], "sink-2");
1268    }
1269
1270    #[test]
1271    fn test_config_values() {
1272        let config = TwoPhaseConfig::default();
1273        assert_eq!(config.prepare_timeout, Duration::from_secs(30));
1274        assert_eq!(config.commit_timeout, Duration::from_secs(60));
1275        assert_eq!(config.max_retries, 3);
1276    }
1277
1278    #[test]
1279    fn test_participant_registration() {
1280        let mut coord = TwoPhaseCoordinator::with_defaults();
1281
1282        coord.register_participant("sink-1");
1283        coord.register_participant("sink-2");
1284        coord.register_participant("sink-1"); // Duplicate
1285
1286        assert_eq!(coord.participants().len(), 2);
1287
1288        coord.unregister_participant("sink-1");
1289        assert_eq!(coord.participants().len(), 1);
1290        assert_eq!(coord.participants()[0], "sink-2");
1291    }
1292}