Skip to main content

rivven_core/
transaction.rs

1//! Native Transaction Support
2//!
3//! Provides exactly-once semantics with cross-topic atomic writes.
4//!
5//! ## Transaction Protocol
6//!
7//! ```text
8//! Producer                     Transaction Coordinator            Partitions
9//!    │                                   │                            │
10//!    │─── InitProducerId ───────────────>│                            │
11//!    │<── PID=123, Epoch=0 ──────────────│                            │
12//!    │                                   │                            │
13//!    │─── BeginTransaction(TxnId) ──────>│                            │
14//!    │<── OK ────────────────────────────│                            │
15//!    │                                   │                            │
16//!    │─── AddPartitionsToTxn(p1,p2) ────>│                            │
17//!    │<── OK ────────────────────────────│                            │
18//!    │                                   │                            │
19//!    │─── Produce(p1, PID, Seq) ──────────────────────────────────────>│
20//!    │<── OK ───────────────────────────────────────────────────────────│
21//!    │                                   │                            │
22//!    │─── Produce(p2, PID, Seq) ──────────────────────────────────────>│
23//!    │<── OK ───────────────────────────────────────────────────────────│
24//!    │                                   │                            │
25//!    │─── CommitTransaction(TxnId) ─────>│                            │
26//!    │                                   │─── WriteTxnMarker(COMMIT) ─>│
27//!    │                                   │<── OK ─────────────────────│
28//!    │<── OK ────────────────────────────│                            │
29//! ```
30//!
31//! ## Transaction States
32//!
33//! ```text
34//! Empty ──────> Ongoing ──────> PrepareCommit ──────> CompleteCommit
35//!                  │                  │                     │
36//!                  │                  v                     v
37//!                  └───────> PrepareAbort ───────> CompleteAbort
38//!                                    │                     │
39//!                                    └─────────────────────┘
40//! ```
41//!
42//! ## Exactly-Once Guarantees
43//!
44//! 1. **Atomic Writes**: All messages in a transaction are committed or aborted together
45//! 2. **Consumer Isolation**: Consumers only see committed messages (read_committed)
46//! 3. **Fencing**: Old producer instances are fenced via epoch
47//! 4. **Durability**: Transaction state is persisted before acknowledgment
48//!
49
50use crate::idempotent::{ProducerEpoch, ProducerId};
51use parking_lot::RwLock;
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, HashSet};
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::{AtomicU64, Ordering};
57use std::time::{Duration, Instant, SystemTime};
58
59/// Unique identifier for a transaction
60pub type TransactionId = String;
61
62/// Transaction timeout default (1 minute)
63pub const DEFAULT_TRANSACTION_TIMEOUT: Duration = Duration::from_secs(60);
64
65/// Maximum pending transactions per producer
66pub const MAX_PENDING_TRANSACTIONS: usize = 5;
67
68/// Transaction state machine
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum TransactionState {
71    /// No active transaction
72    Empty,
73
74    /// Transaction in progress, accepting writes
75    Ongoing,
76
77    /// Preparing to commit (2PC phase 1)
78    PrepareCommit,
79
80    /// Preparing to abort (2PC phase 1)
81    PrepareAbort,
82
83    /// Commit complete (2PC phase 2)
84    CompleteCommit,
85
86    /// Abort complete (2PC phase 2)
87    CompleteAbort,
88
89    /// Transaction has expired without completion
90    Dead,
91}
92
93impl TransactionState {
94    /// Check if transaction is in a terminal state
95    pub fn is_terminal(&self) -> bool {
96        matches!(
97            self,
98            TransactionState::Empty
99                | TransactionState::CompleteCommit
100                | TransactionState::CompleteAbort
101                | TransactionState::Dead
102        )
103    }
104
105    /// Check if transaction is still active (can accept writes)
106    pub fn is_active(&self) -> bool {
107        matches!(self, TransactionState::Ongoing)
108    }
109
110    /// Check if transaction can transition to commit
111    pub fn can_commit(&self) -> bool {
112        matches!(self, TransactionState::Ongoing)
113    }
114
115    /// Check if transaction can transition to abort
116    pub fn can_abort(&self) -> bool {
117        matches!(
118            self,
119            TransactionState::Ongoing
120                | TransactionState::PrepareCommit
121                | TransactionState::PrepareAbort
122        )
123    }
124}
125
126/// Result of a transaction operation
127#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum TransactionResult {
129    /// Operation succeeded
130    Ok,
131
132    /// Transaction ID is invalid or not found
133    InvalidTransactionId,
134
135    /// Transaction is in wrong state for this operation
136    InvalidTransactionState {
137        current: TransactionState,
138        expected: &'static str,
139    },
140
141    /// Producer ID/epoch mismatch
142    ProducerFenced {
143        expected_epoch: ProducerEpoch,
144        received_epoch: ProducerEpoch,
145    },
146
147    /// Transaction has timed out
148    TransactionTimeout,
149
150    /// Too many pending transactions
151    TooManyTransactions,
152
153    /// Concurrent modification detected
154    ConcurrentTransaction,
155
156    /// Partition not part of transaction
157    PartitionNotInTransaction { topic: String, partition: u32 },
158
159    /// Durable log write failed — transaction state may not be recoverable
160    LogWriteError(String),
161}
162
163/// A partition involved in a transaction
164#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
165pub struct TransactionPartition {
166    pub topic: String,
167    pub partition: u32,
168}
169
170impl TransactionPartition {
171    pub fn new(topic: impl Into<String>, partition: u32) -> Self {
172        Self {
173            topic: topic.into(),
174            partition,
175        }
176    }
177}
178
179/// Pending write in a transaction (not yet committed)
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct PendingWrite {
182    /// Target partition
183    pub partition: TransactionPartition,
184
185    /// Sequence number for this write
186    pub sequence: i32,
187
188    /// Offset assigned by the partition leader
189    pub offset: u64,
190
191    /// Write timestamp
192    #[serde(with = "crate::serde_utils::system_time")]
193    pub timestamp: SystemTime,
194}
195
196/// Consumer offset to be committed with the transaction
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct TransactionOffsetCommit {
199    /// Consumer group
200    pub group_id: String,
201
202    /// Topic-partition-offset triples
203    pub offsets: Vec<(TransactionPartition, i64)>,
204}
205
206/// Active transaction state
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct Transaction {
209    /// Transaction ID (unique per producer)
210    pub txn_id: TransactionId,
211
212    /// Producer ID owning this transaction
213    pub producer_id: ProducerId,
214
215    /// Producer epoch (for fencing)
216    pub producer_epoch: ProducerEpoch,
217
218    /// Current state
219    pub state: TransactionState,
220
221    /// Partitions involved in this transaction
222    pub partitions: HashSet<TransactionPartition>,
223
224    /// Pending writes (not yet committed)
225    pub pending_writes: Vec<PendingWrite>,
226
227    /// Consumer offsets to commit with this transaction
228    pub offset_commits: Vec<TransactionOffsetCommit>,
229
230    /// Transaction start time
231    #[serde(with = "crate::serde_utils::system_time")]
232    pub started_at: SystemTime,
233
234    /// Transaction timeout
235    #[serde(with = "crate::serde_utils::duration")]
236    pub timeout: Duration,
237
238    /// Last activity timestamp
239    #[serde(skip)]
240    pub last_activity: Option<Instant>,
241}
242
243impl Transaction {
244    /// Create a new transaction
245    pub fn new(
246        txn_id: TransactionId,
247        producer_id: ProducerId,
248        producer_epoch: ProducerEpoch,
249        timeout: Duration,
250    ) -> Self {
251        Self {
252            txn_id,
253            producer_id,
254            producer_epoch,
255            state: TransactionState::Ongoing,
256            partitions: HashSet::new(),
257            pending_writes: Vec::new(),
258            offset_commits: Vec::new(),
259            started_at: SystemTime::now(),
260            timeout,
261            last_activity: Some(Instant::now()),
262        }
263    }
264
265    /// Check if transaction has timed out
266    ///
267    /// Returns `false` when `last_activity` is `None` (e.g., after
268    /// deserialization), allowing the coordinator to resolve in-doubt
269    /// transactions.
270    pub fn is_timed_out(&self) -> bool {
271        self.last_activity
272            .map(|t| t.elapsed() > self.timeout)
273            .unwrap_or(false)
274    }
275
276    /// Update last activity timestamp
277    pub fn touch(&mut self) {
278        self.last_activity = Some(Instant::now());
279    }
280
281    /// Add a partition to the transaction
282    pub fn add_partition(&mut self, partition: TransactionPartition) {
283        self.partitions.insert(partition);
284        self.touch();
285    }
286
287    /// Record a pending write
288    pub fn add_write(&mut self, partition: TransactionPartition, sequence: i32, offset: u64) {
289        self.pending_writes.push(PendingWrite {
290            partition,
291            sequence,
292            offset,
293            timestamp: SystemTime::now(),
294        });
295        self.touch();
296    }
297
298    /// Add consumer offset commit
299    pub fn add_offset_commit(
300        &mut self,
301        group_id: String,
302        offsets: Vec<(TransactionPartition, i64)>,
303    ) {
304        self.offset_commits
305            .push(TransactionOffsetCommit { group_id, offsets });
306        self.touch();
307    }
308
309    /// Get total number of writes
310    pub fn write_count(&self) -> usize {
311        self.pending_writes.len()
312    }
313
314    /// Get all affected partitions
315    pub fn affected_partitions(&self) -> impl Iterator<Item = &TransactionPartition> {
316        self.partitions.iter()
317    }
318}
319
320/// Transaction marker type written to partition logs
321#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
322pub enum TransactionMarker {
323    /// Transaction committed
324    Commit,
325
326    /// Transaction aborted
327    Abort,
328}
329
330/// Consumer isolation level
331///
332/// Controls whether consumers can see uncommitted transactional messages.
333#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
334pub enum IsolationLevel {
335    /// Read all messages, including those from aborted transactions.
336    /// This is the default for backward compatibility.
337    #[default]
338    ReadUncommitted,
339
340    /// Only read messages from committed transactions.
341    /// Messages from aborted transactions are filtered out.
342    ReadCommitted,
343}
344
345impl IsolationLevel {
346    /// Convert to string (Kafka-compatible)
347    pub fn as_str(&self) -> &'static str {
348        match self {
349            Self::ReadUncommitted => "read_uncommitted",
350            Self::ReadCommitted => "read_committed",
351        }
352    }
353
354    /// Convert from u8 (wire protocol)
355    /// 0 = read_uncommitted (default)
356    /// 1 = read_committed
357    /// Other values default to read_uncommitted
358    pub fn from_u8(value: u8) -> Self {
359        match value {
360            1 => Self::ReadCommitted,
361            _ => Self::ReadUncommitted,
362        }
363    }
364
365    /// Convert to u8 (wire protocol)
366    pub fn as_u8(&self) -> u8 {
367        match self {
368            Self::ReadUncommitted => 0,
369            Self::ReadCommitted => 1,
370        }
371    }
372}
373
374impl std::str::FromStr for IsolationLevel {
375    type Err = String;
376
377    /// Parse from string (Kafka-compatible)
378    fn from_str(s: &str) -> Result<Self, Self::Err> {
379        match s.to_lowercase().as_str() {
380            "read_uncommitted" => Ok(Self::ReadUncommitted),
381            "read_committed" => Ok(Self::ReadCommitted),
382            _ => Err(format!("unknown isolation level: {}", s)),
383        }
384    }
385}
386
387impl std::fmt::Display for IsolationLevel {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        write!(f, "{}", self.as_str())
390    }
391}
392
393/// Record of an aborted transaction for consumer filtering
394#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct AbortedTransaction {
396    /// Producer ID that aborted
397    pub producer_id: ProducerId,
398    /// First offset of the aborted transaction in this partition
399    pub first_offset: u64,
400    /// Last offset of the aborted transaction in this partition (inclusive).
401    /// Without an upper bound, `is_aborted` returns true for all
402    /// offsets >= first_offset, incorrectly filtering committed messages from
403    /// the same producer in later transactions.
404    pub last_offset: u64,
405}
406
407/// Index of aborted transactions for a partition
408///
409/// Used for efficient filtering when `isolation.level=read_committed`
410#[derive(Debug, Default)]
411pub struct AbortedTransactionIndex {
412    /// Aborted transactions sorted by first_offset
413    aborted: RwLock<Vec<AbortedTransaction>>,
414}
415
416impl AbortedTransactionIndex {
417    /// Create a new empty index
418    pub fn new() -> Self {
419        Self::default()
420    }
421
422    /// Record an aborted transaction
423    ///
424    /// Tracks both first and last offset to bound the aborted range.
425    /// Checks `first_offset <= offset <= last_offset` (bounded range).
426    pub fn record_abort(&self, producer_id: ProducerId, first_offset: u64, last_offset: u64) {
427        let mut aborted = self.aborted.write();
428        aborted.push(AbortedTransaction {
429            producer_id,
430            first_offset,
431            last_offset,
432        });
433        // Keep sorted by first_offset for efficient lookup
434        aborted.sort_by_key(|a| a.first_offset);
435    }
436
437    /// Get aborted transactions that overlap with a range of offsets
438    ///
439    /// Returns aborted transactions whose first_offset is within [start_offset, end_offset]
440    pub fn get_aborted_in_range(
441        &self,
442        start_offset: u64,
443        end_offset: u64,
444    ) -> Vec<AbortedTransaction> {
445        let aborted = self.aborted.read();
446        aborted
447            .iter()
448            .filter(|a| a.first_offset >= start_offset && a.first_offset <= end_offset)
449            .cloned()
450            .collect()
451    }
452
453    /// Check if a specific producer's message at an offset is from an aborted transaction
454    ///
455    /// Checks bounded range `first_offset <= offset <= last_offset` to
456    /// avoid false-positives on committed messages from the same producer in later
457    /// transactions.
458    pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
459        let aborted = self.aborted.read();
460        aborted.iter().any(|a| {
461            a.producer_id == producer_id && a.first_offset <= offset && offset <= a.last_offset
462        })
463    }
464
465    /// Remove aborted transactions older than a given offset (for log truncation)
466    pub fn truncate_before(&self, offset: u64) {
467        let mut aborted = self.aborted.write();
468        aborted.retain(|a| a.first_offset >= offset);
469    }
470
471    /// Get count of tracked aborted transactions
472    pub fn len(&self) -> usize {
473        self.aborted.read().len()
474    }
475
476    /// Check if index is empty
477    pub fn is_empty(&self) -> bool {
478        self.len() == 0
479    }
480}
481
482/// Statistics for transaction coordinator
483#[derive(Debug, Default)]
484pub struct TransactionStats {
485    /// Total transactions initiated
486    transactions_started: AtomicU64,
487
488    /// Total transactions committed
489    transactions_committed: AtomicU64,
490
491    /// Total transactions aborted
492    transactions_aborted: AtomicU64,
493
494    /// Total transactions timed out
495    transactions_timed_out: AtomicU64,
496
497    /// Currently active transactions
498    active_transactions: AtomicU64,
499}
500
501impl TransactionStats {
502    pub fn new() -> Self {
503        Self::default()
504    }
505
506    pub fn record_start(&self) {
507        self.transactions_started.fetch_add(1, Ordering::Relaxed);
508        self.active_transactions.fetch_add(1, Ordering::Relaxed);
509    }
510
511    pub fn record_commit(&self) {
512        self.transactions_committed.fetch_add(1, Ordering::Relaxed);
513        self.active_transactions.fetch_sub(1, Ordering::Relaxed);
514    }
515
516    pub fn record_abort(&self) {
517        self.transactions_aborted.fetch_add(1, Ordering::Relaxed);
518        self.active_transactions.fetch_sub(1, Ordering::Relaxed);
519    }
520
521    pub fn record_timeout(&self) {
522        self.transactions_timed_out.fetch_add(1, Ordering::Relaxed);
523        self.active_transactions.fetch_sub(1, Ordering::Relaxed);
524    }
525
526    pub fn transactions_started(&self) -> u64 {
527        self.transactions_started.load(Ordering::Relaxed)
528    }
529
530    pub fn transactions_committed(&self) -> u64 {
531        self.transactions_committed.load(Ordering::Relaxed)
532    }
533
534    pub fn transactions_aborted(&self) -> u64 {
535        self.transactions_aborted.load(Ordering::Relaxed)
536    }
537
538    pub fn transactions_timed_out(&self) -> u64 {
539        self.transactions_timed_out.load(Ordering::Relaxed)
540    }
541
542    pub fn active_transactions(&self) -> u64 {
543        self.active_transactions.load(Ordering::Relaxed)
544    }
545}
546
547/// Snapshot of transaction stats for serialization
548#[derive(Debug, Clone, Serialize, Deserialize)]
549pub struct TransactionStatsSnapshot {
550    pub transactions_started: u64,
551    pub transactions_committed: u64,
552    pub transactions_aborted: u64,
553    pub transactions_timed_out: u64,
554    pub active_transactions: u64,
555}
556
557impl From<&TransactionStats> for TransactionStatsSnapshot {
558    fn from(stats: &TransactionStats) -> Self {
559        Self {
560            transactions_started: stats.transactions_started(),
561            transactions_committed: stats.transactions_committed(),
562            transactions_aborted: stats.transactions_aborted(),
563            transactions_timed_out: stats.transactions_timed_out(),
564            active_transactions: stats.active_transactions(),
565        }
566    }
567}
568
569// ============================================================================
570// Transaction Log — durable persistence for coordinator state
571// ============================================================================
572
573/// A single entry in the transaction log, recording a state transition.
574///
575/// Serialized to postcard and written to a per-coordinator WAL file.
576/// On recovery, entries are replayed in order to reconstruct coordinator state.
577#[derive(Debug, Clone, Serialize, Deserialize)]
578pub enum TransactionLogEntry {
579    /// New transaction started
580    Begin {
581        txn_id: TransactionId,
582        producer_id: ProducerId,
583        producer_epoch: ProducerEpoch,
584        timeout_ms: u64,
585    },
586    /// Partition added to transaction
587    AddPartition {
588        txn_id: TransactionId,
589        producer_id: ProducerId,
590        partition: TransactionPartition,
591    },
592    /// Write recorded in transaction
593    RecordWrite {
594        txn_id: TransactionId,
595        producer_id: ProducerId,
596        partition: TransactionPartition,
597        sequence: i32,
598        offset: u64,
599    },
600    /// Transaction moving to PrepareCommit (2PC phase 1)
601    PrepareCommit {
602        txn_id: TransactionId,
603        producer_id: ProducerId,
604    },
605    /// Transaction committed (2PC phase 2)
606    CompleteCommit {
607        txn_id: TransactionId,
608        producer_id: ProducerId,
609    },
610    /// Transaction moving to PrepareAbort
611    PrepareAbort {
612        txn_id: TransactionId,
613        producer_id: ProducerId,
614    },
615    /// Transaction aborted
616    CompleteAbort {
617        txn_id: TransactionId,
618        producer_id: ProducerId,
619    },
620    /// Transaction timed out (zombie cleanup)
621    TimedOut {
622        txn_id: TransactionId,
623        producer_id: ProducerId,
624    },
625    /// Consumer offset commit added to transaction (exactly-once consume-transform-produce)
626    OffsetCommit {
627        txn_id: TransactionId,
628        producer_id: ProducerId,
629        group_id: String,
630        offsets: Vec<(TransactionPartition, i64)>,
631    },
632}
633
634/// Durable transaction log backed by a file.
635///
636/// Each entry is: [crc32: 4 bytes] [len: 4 bytes] [postcard-encoded data: N bytes]
637///
638/// The log is append-only. On recovery, entries are read sequentially and
639/// replayed to rebuild the `TransactionCoordinator`'s in-memory state.
640pub struct TransactionLog {
641    /// Path to the log file
642    path: PathBuf,
643    /// Append-only writer
644    writer: parking_lot::Mutex<Option<std::io::BufWriter<std::fs::File>>>,
645}
646
647impl TransactionLog {
648    /// Open (or create) a transaction log at the given path.
649    pub fn open(path: impl AsRef<Path>) -> crate::Result<Self> {
650        let path = path.as_ref().to_path_buf();
651        if let Some(parent) = path.parent() {
652            std::fs::create_dir_all(parent)?;
653        }
654        let file = std::fs::OpenOptions::new()
655            .create(true)
656            .append(true)
657            .open(&path)?;
658        Ok(Self {
659            path,
660            writer: parking_lot::Mutex::new(Some(std::io::BufWriter::new(file))),
661        })
662    }
663
664    /// Create an in-memory-only (no-op) transaction log for use when
665    /// persistence is not configured.
666    pub fn noop() -> Self {
667        Self {
668            path: PathBuf::new(),
669            writer: parking_lot::Mutex::new(None),
670        }
671    }
672
673    /// Append an entry to the log.
674    pub fn append(&self, entry: &TransactionLogEntry) -> crate::Result<()> {
675        let mut guard = self.writer.lock();
676        let writer = match guard.as_mut() {
677            Some(w) => w,
678            None => return Ok(()), // noop mode
679        };
680
681        let data = postcard::to_allocvec(entry).map_err(|e| crate::Error::Other(e.to_string()))?;
682
683        // CRC32 of the payload
684        let mut hasher = crc32fast::Hasher::new();
685        hasher.update(&data);
686        let crc = hasher.finalize();
687
688        writer.write_all(&crc.to_be_bytes())?;
689        writer.write_all(&(data.len() as u32).to_be_bytes())?;
690        writer.write_all(&data)?;
691        writer.flush()?;
692        // fsync to guarantee durability before acknowledging.
693        // flush() only pushes BufWriter data to the OS page cache.
694        // Without sync_data(), a power loss can lose the entry.
695        writer.get_ref().sync_data()?;
696        Ok(())
697    }
698
699    /// Read all valid entries from the log for recovery replay.
700    pub fn read_all(path: impl AsRef<Path>) -> crate::Result<Vec<TransactionLogEntry>> {
701        let path = path.as_ref();
702        if !path.exists() {
703            return Ok(Vec::new());
704        }
705        let data = std::fs::read(path)?;
706        let mut entries = Vec::new();
707        let mut pos = 0;
708
709        while pos + 8 <= data.len() {
710            let crc = u32::from_be_bytes(data[pos..pos + 4].try_into().unwrap());
711            let len = u32::from_be_bytes(data[pos + 4..pos + 8].try_into().unwrap()) as usize;
712            pos += 8;
713
714            if pos + len > data.len() {
715                break; // truncated — stop here
716            }
717
718            let payload = &data[pos..pos + len];
719
720            // Verify CRC
721            let mut hasher = crc32fast::Hasher::new();
722            hasher.update(payload);
723            if hasher.finalize() != crc {
724                break; // corrupt — stop here (kafka-style truncation)
725            }
726
727            match postcard::from_bytes::<TransactionLogEntry>(payload) {
728                Ok(entry) => entries.push(entry),
729                Err(_) => break, // corrupt entry
730            }
731            pos += len;
732        }
733
734        Ok(entries)
735    }
736
737    /// Truncate the log file (e.g. after snapshot).
738    pub fn truncate(&self) -> crate::Result<()> {
739        let mut guard = self.writer.lock();
740        if guard.is_none() {
741            return Ok(());
742        }
743        // Close old writer, truncate, reopen
744        *guard = None;
745        let file = std::fs::OpenOptions::new()
746            .create(true)
747            .write(true)
748            .truncate(true)
749            .open(&self.path)?;
750        *guard = Some(std::io::BufWriter::new(file));
751        Ok(())
752    }
753}
754
755/// Transaction coordinator manages all active transactions
756///
757/// This is a per-broker component that tracks transactions for producers
758/// assigned to this broker as their transaction coordinator.
759pub struct TransactionCoordinator {
760    /// Active transactions by (producer_id, txn_id)
761    /// `parking_lot` — O(1) transaction lookup, never held across `.await`.
762    transactions: RwLock<HashMap<(ProducerId, TransactionId), Transaction>>,
763
764    /// Producer to transaction mapping (for single-txn-per-producer enforcement)
765    producer_transactions: RwLock<HashMap<ProducerId, TransactionId>>,
766
767    /// Default transaction timeout
768    default_timeout: Duration,
769
770    /// Statistics
771    stats: TransactionStats,
772
773    /// Index of aborted transactions for read_committed filtering
774    aborted_index: AbortedTransactionIndex,
775
776    /// durable transaction log.
777    /// Persists every state transition before acknowledging, enabling
778    /// recovery of in-doubt transactions after a coordinator crash.
779    txn_log: TransactionLog,
780}
781
782impl Default for TransactionCoordinator {
783    fn default() -> Self {
784        Self::new()
785    }
786}
787
788impl TransactionCoordinator {
789    /// Create a new transaction coordinator (in-memory only, no persistence).
790    pub fn new() -> Self {
791        Self {
792            transactions: RwLock::new(HashMap::new()),
793            producer_transactions: RwLock::new(HashMap::new()),
794            default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
795            stats: TransactionStats::new(),
796            aborted_index: AbortedTransactionIndex::new(),
797            txn_log: TransactionLog::noop(),
798        }
799    }
800
801    /// Create with custom default timeout (in-memory only).
802    pub fn with_timeout(timeout: Duration) -> Self {
803        Self {
804            transactions: RwLock::new(HashMap::new()),
805            producer_transactions: RwLock::new(HashMap::new()),
806            default_timeout: timeout,
807            stats: TransactionStats::new(),
808            aborted_index: AbortedTransactionIndex::new(),
809            txn_log: TransactionLog::noop(),
810        }
811    }
812
813    /// Create a coordinator with durable persistence at the given path.
814    ///
815    /// Transaction state transitions are written to a CRC-protected log file.
816    /// On crash recovery, call [`Self::recover`] before resuming operations.
817    pub fn with_persistence(path: impl AsRef<Path>) -> crate::Result<Self> {
818        let txn_log = TransactionLog::open(path)?;
819        Ok(Self {
820            transactions: RwLock::new(HashMap::new()),
821            producer_transactions: RwLock::new(HashMap::new()),
822            default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
823            stats: TransactionStats::new(),
824            aborted_index: AbortedTransactionIndex::new(),
825            txn_log,
826        })
827    }
828
829    /// Recover coordinator state from the durable log.
830    ///
831    /// Replays all valid log entries to rebuild the in-memory
832    /// transaction map. In-doubt transactions (PrepareCommit/PrepareAbort)
833    /// are left in their prepared state for the caller to resolve.
834    pub fn recover(path: impl AsRef<Path>) -> crate::Result<Self> {
835        let path = path.as_ref();
836        let entries = TransactionLog::read_all(path)?;
837        let txn_log = TransactionLog::open(path)?;
838
839        let coord = Self {
840            transactions: RwLock::new(HashMap::new()),
841            producer_transactions: RwLock::new(HashMap::new()),
842            default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
843            stats: TransactionStats::new(),
844            aborted_index: AbortedTransactionIndex::new(),
845            txn_log,
846        };
847
848        // Replay log entries to rebuild state
849        let mut transactions = coord.transactions.write();
850        let mut producer_txns = coord.producer_transactions.write();
851
852        for entry in entries {
853            match entry {
854                TransactionLogEntry::Begin {
855                    txn_id,
856                    producer_id,
857                    producer_epoch,
858                    timeout_ms,
859                } => {
860                    let txn = Transaction::new(
861                        txn_id.clone(),
862                        producer_id,
863                        producer_epoch,
864                        Duration::from_millis(timeout_ms),
865                    );
866                    transactions.insert((producer_id, txn_id.clone()), txn);
867                    producer_txns.insert(producer_id, txn_id);
868                }
869                TransactionLogEntry::AddPartition {
870                    txn_id,
871                    producer_id,
872                    partition,
873                } => {
874                    if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
875                        txn.partitions.insert(partition);
876                    }
877                }
878                TransactionLogEntry::RecordWrite {
879                    txn_id,
880                    producer_id,
881                    partition,
882                    sequence,
883                    offset,
884                } => {
885                    if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
886                        txn.pending_writes.push(PendingWrite {
887                            partition,
888                            sequence,
889                            offset,
890                            timestamp: SystemTime::now(),
891                        });
892                    }
893                }
894                TransactionLogEntry::PrepareCommit {
895                    txn_id,
896                    producer_id,
897                } => {
898                    if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
899                        txn.state = TransactionState::PrepareCommit;
900                    }
901                }
902                TransactionLogEntry::CompleteCommit {
903                    txn_id,
904                    producer_id,
905                } => {
906                    transactions.remove(&(producer_id, txn_id.clone()));
907                    producer_txns.remove(&producer_id);
908                }
909                TransactionLogEntry::PrepareAbort {
910                    txn_id,
911                    producer_id,
912                } => {
913                    if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
914                        txn.state = TransactionState::PrepareAbort;
915                    }
916                }
917                TransactionLogEntry::CompleteAbort {
918                    txn_id,
919                    producer_id,
920                } => {
921                    if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
922                        // Record aborts for read_committed filtering
923                        // compute bounded range from all pending writes
924                        let first = txn.pending_writes.iter().map(|w| w.offset).min();
925                        let last = txn.pending_writes.iter().map(|w| w.offset).max();
926                        if let (Some(f), Some(l)) = (first, last) {
927                            coord.aborted_index.record_abort(producer_id, f, l);
928                        }
929                    }
930                    transactions.remove(&(producer_id, txn_id.clone()));
931                    producer_txns.remove(&producer_id);
932                }
933                TransactionLogEntry::TimedOut {
934                    txn_id,
935                    producer_id,
936                } => {
937                    if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
938                        // compute bounded range from all pending writes
939                        let first = txn.pending_writes.iter().map(|w| w.offset).min();
940                        let last = txn.pending_writes.iter().map(|w| w.offset).max();
941                        if let (Some(f), Some(l)) = (first, last) {
942                            coord.aborted_index.record_abort(producer_id, f, l);
943                        }
944                    }
945                    transactions.remove(&(producer_id, txn_id.clone()));
946                    producer_txns.remove(&producer_id);
947                }
948                TransactionLogEntry::OffsetCommit {
949                    txn_id,
950                    producer_id,
951                    group_id,
952                    offsets,
953                } => {
954                    if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
955                        txn.add_offset_commit(group_id, offsets);
956                    }
957                }
958            }
959        }
960
961        drop(transactions);
962        drop(producer_txns);
963
964        let active = coord.active_count();
965        if active > 0 {
966            tracing::warn!(
967                "Transaction coordinator recovered {} in-doubt transactions from log",
968                active
969            );
970        }
971
972        Ok(coord)
973    }
974
975    /// Get statistics
976    pub fn stats(&self) -> &TransactionStats {
977        &self.stats
978    }
979
980    /// Begin a new transaction
981    pub fn begin_transaction(
982        &self,
983        txn_id: TransactionId,
984        producer_id: ProducerId,
985        producer_epoch: ProducerEpoch,
986        timeout: Option<Duration>,
987    ) -> TransactionResult {
988        // Use write locks from the start to prevent TOCTOU races
989        let mut transactions = self.transactions.write();
990        let mut producer_txns = self.producer_transactions.write();
991
992        // Check if producer already has an active transaction
993        if let Some(existing_txn_id) = producer_txns.get(&producer_id) {
994            if existing_txn_id != &txn_id {
995                return TransactionResult::ConcurrentTransaction;
996            }
997            // Same txn_id - check if we're resuming
998            if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
999                if txn.producer_epoch != producer_epoch {
1000                    return TransactionResult::ProducerFenced {
1001                        expected_epoch: txn.producer_epoch,
1002                        received_epoch: producer_epoch,
1003                    };
1004                }
1005                if txn.state.is_active() {
1006                    return TransactionResult::Ok; // Already active
1007                }
1008            }
1009        }
1010
1011        // Enforce MAX_PENDING_TRANSACTIONS limit
1012        let active_count = transactions
1013            .values()
1014            .filter(|t| t.state.is_active())
1015            .count();
1016        if active_count >= MAX_PENDING_TRANSACTIONS {
1017            return TransactionResult::TooManyTransactions;
1018        }
1019
1020        // Create new transaction
1021        let txn = Transaction::new(
1022            txn_id.clone(),
1023            producer_id,
1024            producer_epoch,
1025            timeout.unwrap_or(self.default_timeout),
1026        );
1027
1028        // WAL BEFORE in-memory — persist first, then insert.
1029        // If the WAL write fails, no in-memory ghost transaction is created.
1030        if let Err(e) = self.txn_log.append(&TransactionLogEntry::Begin {
1031            txn_id: txn_id.clone(),
1032            producer_id,
1033            producer_epoch,
1034            timeout_ms: timeout.unwrap_or(self.default_timeout).as_millis() as u64,
1035        }) {
1036            tracing::error!(producer_id, "Transaction log write failed on begin: {}", e);
1037            return TransactionResult::LogWriteError(e.to_string());
1038        }
1039
1040        transactions.insert((producer_id, txn_id.clone()), txn);
1041        producer_txns.insert(producer_id, txn_id);
1042
1043        self.stats.record_start();
1044        TransactionResult::Ok
1045    }
1046
1047    /// Add partitions to an active transaction
1048    pub fn add_partitions_to_transaction(
1049        &self,
1050        txn_id: &TransactionId,
1051        producer_id: ProducerId,
1052        producer_epoch: ProducerEpoch,
1053        partitions: Vec<TransactionPartition>,
1054    ) -> TransactionResult {
1055        let mut transactions = self.transactions.write();
1056
1057        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1058            Some(t) => t,
1059            None => return TransactionResult::InvalidTransactionId,
1060        };
1061
1062        // Validate epoch
1063        if txn.producer_epoch != producer_epoch {
1064            return TransactionResult::ProducerFenced {
1065                expected_epoch: txn.producer_epoch,
1066                received_epoch: producer_epoch,
1067            };
1068        }
1069
1070        // Check state
1071        if !txn.state.is_active() {
1072            return TransactionResult::InvalidTransactionState {
1073                current: txn.state,
1074                expected: "Ongoing",
1075            };
1076        }
1077
1078        // Check timeout
1079        if txn.is_timed_out() {
1080            txn.state = TransactionState::Dead;
1081            self.stats.record_timeout();
1082            return TransactionResult::TransactionTimeout;
1083        }
1084
1085        // WAL BEFORE in-memory — persist all partitions first.
1086        // If any WAL write fails mid-way, we return error without modifying
1087        // in-memory state, keeping WAL and memory consistent.
1088        for partition in &partitions {
1089            if let Err(e) = self.txn_log.append(&TransactionLogEntry::AddPartition {
1090                txn_id: txn_id.clone(),
1091                producer_id,
1092                partition: partition.clone(),
1093            }) {
1094                tracing::error!(
1095                    producer_id,
1096                    "Transaction log write failed on add_partition: {}",
1097                    e
1098                );
1099                return TransactionResult::LogWriteError(e.to_string());
1100            }
1101        }
1102
1103        // All WAL writes succeeded — now apply to in-memory state
1104        for partition in partitions {
1105            txn.add_partition(partition);
1106        }
1107
1108        TransactionResult::Ok
1109    }
1110
1111    /// Validate that a write is allowed before physically appending data.
1112    /// Checks transaction existence, epoch, state, timeout, and partition membership
1113    /// WITHOUT recording the write. Call this BEFORE `topic.append()`.
1114    pub fn validate_transaction_write(
1115        &self,
1116        txn_id: &TransactionId,
1117        producer_id: ProducerId,
1118        producer_epoch: ProducerEpoch,
1119        partition: &TransactionPartition,
1120    ) -> TransactionResult {
1121        let mut transactions = self.transactions.write();
1122
1123        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1124            Some(t) => t,
1125            None => return TransactionResult::InvalidTransactionId,
1126        };
1127
1128        if txn.producer_epoch != producer_epoch {
1129            return TransactionResult::ProducerFenced {
1130                expected_epoch: txn.producer_epoch,
1131                received_epoch: producer_epoch,
1132            };
1133        }
1134
1135        if !txn.state.is_active() {
1136            return TransactionResult::InvalidTransactionState {
1137                current: txn.state,
1138                expected: "Ongoing",
1139            };
1140        }
1141
1142        if txn.is_timed_out() {
1143            txn.state = TransactionState::Dead;
1144            self.stats.record_timeout();
1145            return TransactionResult::TransactionTimeout;
1146        }
1147
1148        if !txn.partitions.contains(partition) {
1149            return TransactionResult::PartitionNotInTransaction {
1150                topic: partition.topic.clone(),
1151                partition: partition.partition,
1152            };
1153        }
1154
1155        TransactionResult::Ok
1156    }
1157
1158    /// Record a write within a transaction
1159    pub fn add_write_to_transaction(
1160        &self,
1161        txn_id: &TransactionId,
1162        producer_id: ProducerId,
1163        producer_epoch: ProducerEpoch,
1164        partition: TransactionPartition,
1165        sequence: i32,
1166        offset: u64,
1167    ) -> TransactionResult {
1168        let mut transactions = self.transactions.write();
1169
1170        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1171            Some(t) => t,
1172            None => return TransactionResult::InvalidTransactionId,
1173        };
1174
1175        // Validate epoch
1176        if txn.producer_epoch != producer_epoch {
1177            return TransactionResult::ProducerFenced {
1178                expected_epoch: txn.producer_epoch,
1179                received_epoch: producer_epoch,
1180            };
1181        }
1182
1183        // Check state
1184        if !txn.state.is_active() {
1185            return TransactionResult::InvalidTransactionState {
1186                current: txn.state,
1187                expected: "Ongoing",
1188            };
1189        }
1190
1191        // Check timeout
1192        if txn.is_timed_out() {
1193            txn.state = TransactionState::Dead;
1194            self.stats.record_timeout();
1195            return TransactionResult::TransactionTimeout;
1196        }
1197
1198        // Verify partition is part of transaction
1199        if !txn.partitions.contains(&partition) {
1200            return TransactionResult::PartitionNotInTransaction {
1201                topic: partition.topic,
1202                partition: partition.partition,
1203            };
1204        }
1205
1206        // WAL BEFORE in-memory — persist the write entry first.
1207        if let Err(e) = self.txn_log.append(&TransactionLogEntry::RecordWrite {
1208            txn_id: txn_id.clone(),
1209            producer_id,
1210            partition: partition.clone(),
1211            sequence,
1212            offset,
1213        }) {
1214            tracing::error!(
1215                producer_id,
1216                offset,
1217                "Transaction log write failed on record_write: {}",
1218                e
1219            );
1220            return TransactionResult::LogWriteError(e.to_string());
1221        }
1222
1223        // WAL succeeded — now apply to in-memory state
1224        txn.add_write(partition, sequence, offset);
1225
1226        TransactionResult::Ok
1227    }
1228
1229    /// Add consumer offset commit to transaction (for exactly-once consume-transform-produce)
1230    pub fn add_offsets_to_transaction(
1231        &self,
1232        txn_id: &TransactionId,
1233        producer_id: ProducerId,
1234        producer_epoch: ProducerEpoch,
1235        group_id: String,
1236        offsets: Vec<(TransactionPartition, i64)>,
1237    ) -> TransactionResult {
1238        let mut transactions = self.transactions.write();
1239
1240        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1241            Some(t) => t,
1242            None => return TransactionResult::InvalidTransactionId,
1243        };
1244
1245        // Validate epoch
1246        if txn.producer_epoch != producer_epoch {
1247            return TransactionResult::ProducerFenced {
1248                expected_epoch: txn.producer_epoch,
1249                received_epoch: producer_epoch,
1250            };
1251        }
1252
1253        // Check state
1254        if !txn.state.is_active() {
1255            return TransactionResult::InvalidTransactionState {
1256                current: txn.state,
1257                expected: "Ongoing",
1258            };
1259        }
1260
1261        // Check timeout
1262        if txn.is_timed_out() {
1263            txn.state = TransactionState::Dead;
1264            self.stats.record_timeout();
1265            return TransactionResult::TransactionTimeout;
1266        }
1267
1268        // WAL BEFORE in-memory — persist offset commit first.
1269        // Without this, consumer offsets are lost on crash, breaking
1270        // exactly-once consume-transform-produce semantics.
1271        if let Err(e) = self.txn_log.append(&TransactionLogEntry::OffsetCommit {
1272            txn_id: txn_id.clone(),
1273            producer_id,
1274            group_id: group_id.clone(),
1275            offsets: offsets.clone(),
1276        }) {
1277            tracing::error!(
1278                producer_id,
1279                "Transaction log write failed on offset_commit: {}",
1280                e
1281            );
1282            return TransactionResult::LogWriteError(e.to_string());
1283        }
1284
1285        // WAL succeeded — apply to in-memory state
1286        txn.add_offset_commit(group_id, offsets);
1287
1288        TransactionResult::Ok
1289    }
1290
1291    /// Prepare to commit a transaction (2PC phase 1)
1292    ///
1293    /// Returns the transaction data needed for committing to partitions
1294    pub fn prepare_commit(
1295        &self,
1296        txn_id: &TransactionId,
1297        producer_id: ProducerId,
1298        producer_epoch: ProducerEpoch,
1299    ) -> Result<Transaction, TransactionResult> {
1300        let mut transactions = self.transactions.write();
1301
1302        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1303            Some(t) => t,
1304            None => return Err(TransactionResult::InvalidTransactionId),
1305        };
1306
1307        // Validate epoch
1308        if txn.producer_epoch != producer_epoch {
1309            return Err(TransactionResult::ProducerFenced {
1310                expected_epoch: txn.producer_epoch,
1311                received_epoch: producer_epoch,
1312            });
1313        }
1314
1315        // Check state
1316        if !txn.state.can_commit() {
1317            return Err(TransactionResult::InvalidTransactionState {
1318                current: txn.state,
1319                expected: "Ongoing",
1320            });
1321        }
1322
1323        // Check timeout
1324        if txn.is_timed_out() {
1325            txn.state = TransactionState::Dead;
1326            self.stats.record_timeout();
1327            return Err(TransactionResult::TransactionTimeout);
1328        }
1329
1330        // Transition to PrepareCommit
1331        txn.state = TransactionState::PrepareCommit;
1332        txn.touch();
1333
1334        // persist prepare decision before acknowledging
1335        if let Err(e) = self.txn_log.append(&TransactionLogEntry::PrepareCommit {
1336            txn_id: txn_id.clone(),
1337            producer_id,
1338        }) {
1339            tracing::error!(
1340                producer_id,
1341                "Transaction log write failed on prepare_commit: {}",
1342                e
1343            );
1344            // Revert state — we cannot guarantee durability
1345            txn.state = TransactionState::Ongoing;
1346            return Err(TransactionResult::LogWriteError(e.to_string()));
1347        }
1348
1349        Ok(txn.clone())
1350    }
1351
1352    /// Complete the commit (2PC phase 2)
1353    pub fn complete_commit(
1354        &self,
1355        txn_id: &TransactionId,
1356        producer_id: ProducerId,
1357    ) -> TransactionResult {
1358        let mut transactions = self.transactions.write();
1359        let mut producer_txns = self.producer_transactions.write();
1360
1361        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1362            Some(t) => t,
1363            None => return TransactionResult::InvalidTransactionId,
1364        };
1365
1366        if txn.state != TransactionState::PrepareCommit {
1367            return TransactionResult::InvalidTransactionState {
1368                current: txn.state,
1369                expected: "PrepareCommit",
1370            };
1371        }
1372
1373        // WAL BEFORE state change — persist first, then mutate.
1374        if let Err(e) = self.txn_log.append(&TransactionLogEntry::CompleteCommit {
1375            txn_id: txn_id.clone(),
1376            producer_id,
1377        }) {
1378            tracing::error!(
1379                producer_id,
1380                "Transaction log write failed on complete_commit: {}",
1381                e
1382            );
1383            return TransactionResult::LogWriteError(e.to_string());
1384        }
1385
1386        // WAL succeeded — safe to mutate in-memory state
1387        txn.state = TransactionState::CompleteCommit;
1388
1389        // Clean up
1390        transactions.remove(&(producer_id, txn_id.clone()));
1391        producer_txns.remove(&producer_id);
1392
1393        self.stats.record_commit();
1394        TransactionResult::Ok
1395    }
1396
1397    /// Prepare to abort a transaction (2PC phase 1)
1398    pub fn prepare_abort(
1399        &self,
1400        txn_id: &TransactionId,
1401        producer_id: ProducerId,
1402        producer_epoch: ProducerEpoch,
1403    ) -> Result<Transaction, TransactionResult> {
1404        let mut transactions = self.transactions.write();
1405
1406        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1407            Some(t) => t,
1408            None => return Err(TransactionResult::InvalidTransactionId),
1409        };
1410
1411        // Validate epoch
1412        if txn.producer_epoch != producer_epoch {
1413            return Err(TransactionResult::ProducerFenced {
1414                expected_epoch: txn.producer_epoch,
1415                received_epoch: producer_epoch,
1416            });
1417        }
1418
1419        // Check state - abort is allowed from more states than commit
1420        if !txn.state.can_abort() {
1421            return Err(TransactionResult::InvalidTransactionState {
1422                current: txn.state,
1423                expected: "Ongoing or PrepareCommit",
1424            });
1425        }
1426
1427        // Transition to PrepareAbort
1428        txn.state = TransactionState::PrepareAbort;
1429        txn.touch();
1430
1431        // persist prepare-abort decision
1432        if let Err(e) = self.txn_log.append(&TransactionLogEntry::PrepareAbort {
1433            txn_id: txn_id.clone(),
1434            producer_id,
1435        }) {
1436            tracing::error!(
1437                producer_id,
1438                "Transaction log write failed on prepare_abort: {}",
1439                e
1440            );
1441            txn.state = TransactionState::Ongoing;
1442            return Err(TransactionResult::LogWriteError(e.to_string()));
1443        }
1444
1445        Ok(txn.clone())
1446    }
1447
1448    /// Complete the abort (2PC phase 2)
1449    pub fn complete_abort(
1450        &self,
1451        txn_id: &TransactionId,
1452        producer_id: ProducerId,
1453    ) -> TransactionResult {
1454        let mut transactions = self.transactions.write();
1455        let mut producer_txns = self.producer_transactions.write();
1456
1457        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1458            Some(t) => t,
1459            None => return TransactionResult::InvalidTransactionId,
1460        };
1461
1462        if txn.state != TransactionState::PrepareAbort {
1463            return TransactionResult::InvalidTransactionState {
1464                current: txn.state,
1465                expected: "PrepareAbort",
1466            };
1467        }
1468
1469        // WAL BEFORE state change — persist first, then mutate.
1470        if let Err(e) = self.txn_log.append(&TransactionLogEntry::CompleteAbort {
1471            txn_id: txn_id.clone(),
1472            producer_id,
1473        }) {
1474            tracing::error!(
1475                producer_id,
1476                "Transaction log write failed on complete_abort: {}",
1477                e
1478            );
1479            return TransactionResult::LogWriteError(e.to_string());
1480        }
1481
1482        // WAL succeeded — safe to mutate in-memory state
1483        txn.state = TransactionState::CompleteAbort;
1484
1485        // Record aborted transaction for read_committed filtering
1486        // Track bounded offset range (first, last) so committed
1487        // messages from the same producer at later offsets are not filtered.
1488        let first = txn.pending_writes.iter().map(|w| w.offset).min();
1489        let last = txn.pending_writes.iter().map(|w| w.offset).max();
1490        if let (Some(f), Some(l)) = (first, last) {
1491            self.aborted_index.record_abort(producer_id, f, l);
1492        }
1493
1494        // Clean up
1495        transactions.remove(&(producer_id, txn_id.clone()));
1496        producer_txns.remove(&producer_id);
1497
1498        self.stats.record_abort();
1499        TransactionResult::Ok
1500    }
1501
1502    /// Get current transaction state for a producer
1503    pub fn get_transaction(
1504        &self,
1505        txn_id: &TransactionId,
1506        producer_id: ProducerId,
1507    ) -> Option<Transaction> {
1508        let transactions = self.transactions.read();
1509        transactions.get(&(producer_id, txn_id.clone())).cloned()
1510    }
1511
1512    /// Check if a producer has an active transaction
1513    pub fn has_active_transaction(&self, producer_id: ProducerId) -> bool {
1514        let producer_txns = self.producer_transactions.read();
1515        producer_txns.contains_key(&producer_id)
1516    }
1517
1518    /// Get active transaction ID for a producer
1519    pub fn get_active_transaction_id(&self, producer_id: ProducerId) -> Option<TransactionId> {
1520        let producer_txns = self.producer_transactions.read();
1521        producer_txns.get(&producer_id).cloned()
1522    }
1523
1524    /// Clean up timed-out transactions
1525    ///
1526    /// In addition to marking timed-out transactions as Dead, this
1527    /// now records abort markers in the `AbortedTransactionIndex` for each
1528    /// timed-out transaction's pending writes. This ensures that consumers
1529    /// using `read_committed` isolation will correctly filter data from
1530    /// zombie producers whose transactions timed out.
1531    pub fn cleanup_timed_out_transactions(&self) -> Vec<Transaction> {
1532        let mut timed_out = Vec::new();
1533        let mut transactions = self.transactions.write();
1534        let mut producer_txns = self.producer_transactions.write();
1535
1536        let keys_to_remove: Vec<_> = transactions
1537            .iter()
1538            .filter(|(_, txn)| txn.is_timed_out() && !txn.state.is_terminal())
1539            .map(|(k, _)| k.clone())
1540            .collect();
1541
1542        for key in keys_to_remove {
1543            // WAL BEFORE memory removal.
1544            // If WAL write fails, skip this txn — it will be reaped next cycle.
1545            if let Some(txn) = transactions.get(&key) {
1546                if let Err(e) = self.txn_log.append(&TransactionLogEntry::TimedOut {
1547                    txn_id: txn.txn_id.clone(),
1548                    producer_id: txn.producer_id,
1549                }) {
1550                    tracing::error!(txn.producer_id, "Transaction log write failed on timeout: {} — skipping cleanup, will retry", e);
1551                    continue;
1552                }
1553            }
1554
1555            if let Some(mut txn) = transactions.remove(&key) {
1556                txn.state = TransactionState::Dead;
1557                producer_txns.remove(&txn.producer_id);
1558
1559                // Record abort markers for all pending writes so
1560                // read_committed consumers can filter them out.
1561                // Use bounded offset range instead of per-write entries.
1562                let first = txn.pending_writes.iter().map(|w| w.offset).min();
1563                let last = txn.pending_writes.iter().map(|w| w.offset).max();
1564                if let (Some(f), Some(l)) = (first, last) {
1565                    self.aborted_index.record_abort(txn.producer_id, f, l);
1566                }
1567
1568                self.stats.record_timeout();
1569                self.stats.record_abort();
1570                timed_out.push(txn);
1571            }
1572        }
1573
1574        timed_out
1575    }
1576
1577    /// Get number of active transactions
1578    pub fn active_count(&self) -> usize {
1579        let transactions = self.transactions.read();
1580        transactions
1581            .values()
1582            .filter(|t| !t.state.is_terminal())
1583            .count()
1584    }
1585
1586    /// Check if a producer's message at a given offset is from an aborted transaction
1587    ///
1588    /// Used for read_committed isolation level filtering
1589    pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
1590        self.aborted_index.is_aborted(producer_id, offset)
1591    }
1592
1593    /// Get aborted transactions in a range of offsets
1594    ///
1595    /// Used for FetchResponse to include aborted transaction metadata
1596    pub fn get_aborted_in_range(
1597        &self,
1598        start_offset: u64,
1599        end_offset: u64,
1600    ) -> Vec<AbortedTransaction> {
1601        self.aborted_index
1602            .get_aborted_in_range(start_offset, end_offset)
1603    }
1604
1605    /// Get access to the aborted transaction index
1606    pub fn aborted_index(&self) -> &AbortedTransactionIndex {
1607        &self.aborted_index
1608    }
1609}
1610
1611// ============================================================================
1612// Tests
1613// ============================================================================
1614
1615#[cfg(test)]
1616mod tests {
1617    use super::*;
1618    use std::str::FromStr;
1619
1620    #[test]
1621    fn test_transaction_state_transitions() {
1622        // Test terminal states
1623        assert!(TransactionState::Empty.is_terminal());
1624        assert!(TransactionState::CompleteCommit.is_terminal());
1625        assert!(TransactionState::CompleteAbort.is_terminal());
1626        assert!(TransactionState::Dead.is_terminal());
1627
1628        // Test active states
1629        assert!(!TransactionState::Ongoing.is_terminal());
1630        assert!(!TransactionState::PrepareCommit.is_terminal());
1631        assert!(!TransactionState::PrepareAbort.is_terminal());
1632
1633        // Test can_commit
1634        assert!(TransactionState::Ongoing.can_commit());
1635        assert!(!TransactionState::Empty.can_commit());
1636        assert!(!TransactionState::PrepareCommit.can_commit());
1637
1638        // Test can_abort
1639        assert!(TransactionState::Ongoing.can_abort());
1640        assert!(TransactionState::PrepareCommit.can_abort());
1641        assert!(TransactionState::PrepareAbort.can_abort());
1642        assert!(!TransactionState::Empty.can_abort());
1643    }
1644
1645    #[test]
1646    fn test_begin_transaction() {
1647        let coordinator = TransactionCoordinator::new();
1648
1649        // Begin first transaction
1650        let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1651        assert_eq!(result, TransactionResult::Ok);
1652
1653        // Verify transaction exists
1654        let txn = coordinator.get_transaction(&"txn-1".to_string(), 1);
1655        assert!(txn.is_some());
1656        let txn = txn.unwrap();
1657        assert_eq!(txn.state, TransactionState::Ongoing);
1658        assert_eq!(txn.producer_id, 1);
1659        assert_eq!(txn.producer_epoch, 0);
1660
1661        // Stats
1662        assert_eq!(coordinator.stats().transactions_started(), 1);
1663        assert_eq!(coordinator.stats().active_transactions(), 1);
1664    }
1665
1666    #[test]
1667    fn test_concurrent_transaction_rejection() {
1668        let coordinator = TransactionCoordinator::new();
1669
1670        // Begin first transaction
1671        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1672
1673        // Try to begin another transaction for same producer
1674        let result = coordinator.begin_transaction("txn-2".to_string(), 1, 0, None);
1675        assert_eq!(result, TransactionResult::ConcurrentTransaction);
1676    }
1677
1678    #[test]
1679    fn test_add_partitions_to_transaction() {
1680        let coordinator = TransactionCoordinator::new();
1681        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1682
1683        // Add partitions
1684        let result = coordinator.add_partitions_to_transaction(
1685            &"txn-1".to_string(),
1686            1,
1687            0,
1688            vec![
1689                TransactionPartition::new("topic-1", 0),
1690                TransactionPartition::new("topic-1", 1),
1691                TransactionPartition::new("topic-2", 0),
1692            ],
1693        );
1694        assert_eq!(result, TransactionResult::Ok);
1695
1696        // Verify partitions added
1697        let txn = coordinator
1698            .get_transaction(&"txn-1".to_string(), 1)
1699            .unwrap();
1700        assert_eq!(txn.partitions.len(), 3);
1701    }
1702
1703    #[test]
1704    fn test_add_write_to_transaction() {
1705        let coordinator = TransactionCoordinator::new();
1706        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1707
1708        let partition = TransactionPartition::new("topic-1", 0);
1709        coordinator.add_partitions_to_transaction(
1710            &"txn-1".to_string(),
1711            1,
1712            0,
1713            vec![partition.clone()],
1714        );
1715
1716        // Record write
1717        let result =
1718            coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1719        assert_eq!(result, TransactionResult::Ok);
1720
1721        // Verify write recorded
1722        let txn = coordinator
1723            .get_transaction(&"txn-1".to_string(), 1)
1724            .unwrap();
1725        assert_eq!(txn.pending_writes.len(), 1);
1726        assert_eq!(txn.pending_writes[0].offset, 100);
1727        assert_eq!(txn.pending_writes[0].sequence, 0);
1728    }
1729
1730    #[test]
1731    fn test_write_to_non_registered_partition() {
1732        let coordinator = TransactionCoordinator::new();
1733        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1734
1735        // Try to write to partition not added to transaction
1736        let result = coordinator.add_write_to_transaction(
1737            &"txn-1".to_string(),
1738            1,
1739            0,
1740            TransactionPartition::new("topic-1", 0),
1741            0,
1742            100,
1743        );
1744
1745        assert!(matches!(
1746            result,
1747            TransactionResult::PartitionNotInTransaction { .. }
1748        ));
1749    }
1750
1751    #[test]
1752    fn test_commit_transaction() {
1753        let coordinator = TransactionCoordinator::new();
1754        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1755
1756        let partition = TransactionPartition::new("topic-1", 0);
1757        coordinator.add_partitions_to_transaction(
1758            &"txn-1".to_string(),
1759            1,
1760            0,
1761            vec![partition.clone()],
1762        );
1763        coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1764
1765        // Prepare commit
1766        let txn = coordinator.prepare_commit(&"txn-1".to_string(), 1, 0);
1767        assert!(txn.is_ok());
1768        let txn = txn.unwrap();
1769        assert_eq!(txn.state, TransactionState::PrepareCommit);
1770
1771        // Complete commit
1772        let result = coordinator.complete_commit(&"txn-1".to_string(), 1);
1773        assert_eq!(result, TransactionResult::Ok);
1774
1775        // Transaction should be removed
1776        assert!(coordinator
1777            .get_transaction(&"txn-1".to_string(), 1)
1778            .is_none());
1779        assert!(!coordinator.has_active_transaction(1));
1780
1781        // Stats
1782        assert_eq!(coordinator.stats().transactions_committed(), 1);
1783        assert_eq!(coordinator.stats().active_transactions(), 0);
1784    }
1785
1786    #[test]
1787    fn test_abort_transaction() {
1788        let coordinator = TransactionCoordinator::new();
1789        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1790
1791        let partition = TransactionPartition::new("topic-1", 0);
1792        coordinator.add_partitions_to_transaction(
1793            &"txn-1".to_string(),
1794            1,
1795            0,
1796            vec![partition.clone()],
1797        );
1798        coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1799
1800        // Prepare abort
1801        let txn = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1802        assert!(txn.is_ok());
1803
1804        // Complete abort
1805        let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1806        assert_eq!(result, TransactionResult::Ok);
1807
1808        // Transaction should be removed
1809        assert!(coordinator
1810            .get_transaction(&"txn-1".to_string(), 1)
1811            .is_none());
1812
1813        // Stats
1814        assert_eq!(coordinator.stats().transactions_aborted(), 1);
1815    }
1816
1817    #[test]
1818    fn test_producer_fencing() {
1819        let coordinator = TransactionCoordinator::new();
1820        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1821
1822        // Try with wrong epoch
1823        let result = coordinator.add_partitions_to_transaction(
1824            &"txn-1".to_string(),
1825            1,
1826            1, // Wrong epoch
1827            vec![TransactionPartition::new("topic-1", 0)],
1828        );
1829
1830        assert!(matches!(
1831            result,
1832            TransactionResult::ProducerFenced {
1833                expected_epoch: 0,
1834                received_epoch: 1
1835            }
1836        ));
1837    }
1838
1839    #[test]
1840    fn test_transaction_timeout() {
1841        // Create coordinator with very short timeout
1842        let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1843        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1844
1845        // Wait for timeout
1846        std::thread::sleep(Duration::from_millis(5));
1847
1848        // Try to add partitions - should fail with timeout
1849        let result = coordinator.add_partitions_to_transaction(
1850            &"txn-1".to_string(),
1851            1,
1852            0,
1853            vec![TransactionPartition::new("topic-1", 0)],
1854        );
1855
1856        assert_eq!(result, TransactionResult::TransactionTimeout);
1857    }
1858
1859    #[test]
1860    fn test_cleanup_timed_out_transactions() {
1861        let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1862
1863        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1864        coordinator.begin_transaction("txn-2".to_string(), 2, 0, None);
1865
1866        // Wait for timeout
1867        std::thread::sleep(Duration::from_millis(5));
1868
1869        // Cleanup
1870        let timed_out = coordinator.cleanup_timed_out_transactions();
1871        assert_eq!(timed_out.len(), 2);
1872
1873        // Transactions should be gone
1874        assert_eq!(coordinator.active_count(), 0);
1875        assert_eq!(coordinator.stats().transactions_timed_out(), 2);
1876    }
1877
1878    #[test]
1879    fn test_add_offsets_to_transaction() {
1880        let coordinator = TransactionCoordinator::new();
1881        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1882
1883        // Add consumer offsets
1884        let result = coordinator.add_offsets_to_transaction(
1885            &"txn-1".to_string(),
1886            1,
1887            0,
1888            "consumer-group-1".to_string(),
1889            vec![
1890                (TransactionPartition::new("input-topic", 0), 42),
1891                (TransactionPartition::new("input-topic", 1), 100),
1892            ],
1893        );
1894        assert_eq!(result, TransactionResult::Ok);
1895
1896        // Verify
1897        let txn = coordinator
1898            .get_transaction(&"txn-1".to_string(), 1)
1899            .unwrap();
1900        assert_eq!(txn.offset_commits.len(), 1);
1901        assert_eq!(txn.offset_commits[0].group_id, "consumer-group-1");
1902        assert_eq!(txn.offset_commits[0].offsets.len(), 2);
1903    }
1904
1905    #[test]
1906    fn test_invalid_state_transitions() {
1907        let coordinator = TransactionCoordinator::new();
1908        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1909
1910        // Prepare commit
1911        coordinator
1912            .prepare_commit(&"txn-1".to_string(), 1, 0)
1913            .unwrap();
1914
1915        // Try to add partitions after prepare - should fail
1916        let result = coordinator.add_partitions_to_transaction(
1917            &"txn-1".to_string(),
1918            1,
1919            0,
1920            vec![TransactionPartition::new("topic-1", 0)],
1921        );
1922        assert!(matches!(
1923            result,
1924            TransactionResult::InvalidTransactionState { .. }
1925        ));
1926    }
1927
1928    #[test]
1929    fn test_abort_from_prepare_commit() {
1930        let coordinator = TransactionCoordinator::new();
1931        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1932
1933        // Prepare commit
1934        coordinator
1935            .prepare_commit(&"txn-1".to_string(), 1, 0)
1936            .unwrap();
1937
1938        // Abort should still be allowed from PrepareCommit
1939        let result = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1940        assert!(result.is_ok());
1941
1942        let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1943        assert_eq!(result, TransactionResult::Ok);
1944    }
1945
1946    #[test]
1947    fn test_transaction_partition_hash() {
1948        let p1 = TransactionPartition::new("topic", 0);
1949        let p2 = TransactionPartition::new("topic", 0);
1950        let p3 = TransactionPartition::new("topic", 1);
1951
1952        assert_eq!(p1, p2);
1953        assert_ne!(p1, p3);
1954
1955        let mut set = HashSet::new();
1956        set.insert(p1.clone());
1957        set.insert(p2); // Should not add (duplicate)
1958        set.insert(p3);
1959        assert_eq!(set.len(), 2);
1960    }
1961
1962    #[test]
1963    fn test_resume_same_transaction() {
1964        let coordinator = TransactionCoordinator::new();
1965
1966        // Begin transaction
1967        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1968
1969        // Try to begin same transaction again - should succeed (idempotent)
1970        let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1971        assert_eq!(result, TransactionResult::Ok);
1972
1973        // Only one transaction should exist
1974        assert_eq!(coordinator.active_count(), 1);
1975        assert_eq!(coordinator.stats().transactions_started(), 1);
1976    }
1977
1978    #[test]
1979    fn test_stats_snapshot() {
1980        let coordinator = TransactionCoordinator::new();
1981        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1982        coordinator
1983            .prepare_commit(&"txn-1".to_string(), 1, 0)
1984            .unwrap();
1985        coordinator.complete_commit(&"txn-1".to_string(), 1);
1986
1987        let snapshot: TransactionStatsSnapshot = coordinator.stats().into();
1988        assert_eq!(snapshot.transactions_started, 1);
1989        assert_eq!(snapshot.transactions_committed, 1);
1990        assert_eq!(snapshot.active_transactions, 0);
1991    }
1992
1993    // =========================================================================
1994    // Isolation Level Tests
1995    // =========================================================================
1996
1997    #[test]
1998    fn test_isolation_level_from_u8() {
1999        assert_eq!(IsolationLevel::from_u8(0), IsolationLevel::ReadUncommitted);
2000        assert_eq!(IsolationLevel::from_u8(1), IsolationLevel::ReadCommitted);
2001        assert_eq!(IsolationLevel::from_u8(2), IsolationLevel::ReadUncommitted); // Invalid defaults
2002        assert_eq!(
2003            IsolationLevel::from_u8(255),
2004            IsolationLevel::ReadUncommitted
2005        );
2006    }
2007
2008    #[test]
2009    fn test_isolation_level_as_u8() {
2010        assert_eq!(IsolationLevel::ReadUncommitted.as_u8(), 0);
2011        assert_eq!(IsolationLevel::ReadCommitted.as_u8(), 1);
2012    }
2013
2014    #[test]
2015    fn test_isolation_level_from_str() {
2016        assert_eq!(
2017            IsolationLevel::from_str("read_uncommitted").unwrap(),
2018            IsolationLevel::ReadUncommitted
2019        );
2020        assert_eq!(
2021            IsolationLevel::from_str("read_committed").unwrap(),
2022            IsolationLevel::ReadCommitted
2023        );
2024        assert_eq!(
2025            IsolationLevel::from_str("READ_UNCOMMITTED").unwrap(),
2026            IsolationLevel::ReadUncommitted
2027        );
2028        assert_eq!(
2029            IsolationLevel::from_str("READ_COMMITTED").unwrap(),
2030            IsolationLevel::ReadCommitted
2031        );
2032        assert!(IsolationLevel::from_str("invalid").is_err());
2033    }
2034
2035    #[test]
2036    fn test_isolation_level_default() {
2037        assert_eq!(IsolationLevel::default(), IsolationLevel::ReadUncommitted);
2038    }
2039
2040    // =========================================================================
2041    // Aborted Transaction Index Tests
2042    // =========================================================================
2043
2044    #[test]
2045    fn test_aborted_transaction_index_basic() {
2046        let index = AbortedTransactionIndex::new();
2047        assert!(index.is_empty());
2048        assert_eq!(index.len(), 0);
2049
2050        // Record an aborted transaction spanning offsets 100..=200
2051        index.record_abort(1, 100, 200);
2052        assert!(!index.is_empty());
2053        assert_eq!(index.len(), 1);
2054
2055        // Check if offset is from aborted transaction
2056        assert!(index.is_aborted(1, 100)); // first_offset
2057        assert!(index.is_aborted(1, 150)); // within transaction
2058        assert!(index.is_aborted(1, 200)); // last_offset (inclusive)
2059        assert!(!index.is_aborted(1, 201)); // beyond last_offset → NOT aborted
2060        assert!(!index.is_aborted(1, 50)); // before transaction
2061        assert!(!index.is_aborted(2, 100)); // different producer
2062    }
2063
2064    #[test]
2065    fn test_aborted_transaction_index_multiple() {
2066        let index = AbortedTransactionIndex::new();
2067
2068        // Multiple aborted transactions with bounded ranges
2069        index.record_abort(1, 100, 199);
2070        index.record_abort(1, 300, 399);
2071        index.record_abort(2, 200, 299);
2072
2073        assert_eq!(index.len(), 3);
2074
2075        // Check filtering — ranges are now bounded
2076        assert!(index.is_aborted(1, 100));
2077        assert!(index.is_aborted(1, 150)); // within first abort range
2078        assert!(!index.is_aborted(1, 250)); // between the two abort ranges → committed
2079        assert!(index.is_aborted(1, 300));
2080        assert!(index.is_aborted(1, 399)); // last offset of second abort
2081        assert!(!index.is_aborted(1, 400)); // beyond second abort → NOT aborted
2082        assert!(index.is_aborted(2, 200));
2083        assert!(index.is_aborted(2, 250));
2084        assert!(!index.is_aborted(2, 100)); // before producer 2's abort
2085        assert!(!index.is_aborted(2, 300)); // beyond producer 2's abort range
2086    }
2087
2088    #[test]
2089    fn test_aborted_transaction_index_get_range() {
2090        let index = AbortedTransactionIndex::new();
2091
2092        index.record_abort(1, 100, 199);
2093        index.record_abort(2, 200, 299);
2094        index.record_abort(1, 300, 399);
2095
2096        // Get transactions in range
2097        let in_range = index.get_aborted_in_range(150, 250);
2098        assert_eq!(in_range.len(), 1);
2099        assert_eq!(in_range[0].producer_id, 2);
2100        assert_eq!(in_range[0].first_offset, 200);
2101
2102        // Wider range
2103        let in_range = index.get_aborted_in_range(0, 500);
2104        assert_eq!(in_range.len(), 3);
2105
2106        // No transactions in range
2107        let in_range = index.get_aborted_in_range(400, 500);
2108        assert_eq!(in_range.len(), 0);
2109    }
2110
2111    #[test]
2112    fn test_aborted_transaction_index_truncate() {
2113        let index = AbortedTransactionIndex::new();
2114
2115        index.record_abort(1, 100, 199);
2116        index.record_abort(2, 200, 299);
2117        index.record_abort(1, 300, 399);
2118
2119        assert_eq!(index.len(), 3);
2120
2121        // Truncate entries before offset 200
2122        index.truncate_before(200);
2123
2124        assert_eq!(index.len(), 2);
2125
2126        // Only offsets >= 200 should remain
2127        assert!(!index.is_aborted(1, 150)); // old entry removed
2128        assert!(index.is_aborted(2, 200));
2129        assert!(index.is_aborted(1, 300));
2130    }
2131
2132    #[test]
2133    fn test_coordinator_is_aborted() {
2134        let coordinator = TransactionCoordinator::new();
2135
2136        // Start a transaction
2137        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
2138
2139        // Add partition and write
2140        coordinator.add_partitions_to_transaction(
2141            &"txn-1".to_string(),
2142            1,
2143            0,
2144            vec![TransactionPartition::new("test-topic", 0)],
2145        );
2146        coordinator.add_write_to_transaction(
2147            &"txn-1".to_string(),
2148            1,
2149            0,
2150            TransactionPartition::new("test-topic", 0),
2151            0,
2152            100, // offset
2153        );
2154
2155        // Not aborted yet
2156        assert!(!coordinator.is_aborted(1, 100));
2157
2158        // Prepare and complete abort
2159        coordinator
2160            .prepare_abort(&"txn-1".to_string(), 1, 0)
2161            .unwrap();
2162        coordinator.complete_abort(&"txn-1".to_string(), 1);
2163
2164        // Now should be marked as aborted
2165        assert!(coordinator.is_aborted(1, 100));
2166        // offset 150 is beyond the transaction's last write (100), so NOT aborted
2167        assert!(!coordinator.is_aborted(1, 150));
2168        assert!(!coordinator.is_aborted(1, 50)); // before first_offset
2169        assert!(!coordinator.is_aborted(2, 100)); // different producer
2170    }
2171
2172    // ====================================================================
2173    // Transaction Log Persistence Tests
2174    // ====================================================================
2175
2176    #[test]
2177    fn test_transaction_log_round_trip() {
2178        let dir = tempfile::tempdir().unwrap();
2179        let log_path = dir.path().join("txn.log");
2180
2181        // Write entries to the log
2182        let log = TransactionLog::open(&log_path).unwrap();
2183        log.append(&TransactionLogEntry::Begin {
2184            txn_id: "txn-1".to_string(),
2185            producer_id: 42,
2186            producer_epoch: 0,
2187            timeout_ms: 30000,
2188        })
2189        .unwrap();
2190        log.append(&TransactionLogEntry::AddPartition {
2191            txn_id: "txn-1".to_string(),
2192            producer_id: 42,
2193            partition: TransactionPartition::new("topic-a", 0),
2194        })
2195        .unwrap();
2196        log.append(&TransactionLogEntry::RecordWrite {
2197            txn_id: "txn-1".to_string(),
2198            producer_id: 42,
2199            partition: TransactionPartition::new("topic-a", 0),
2200            sequence: 0,
2201            offset: 100,
2202        })
2203        .unwrap();
2204        drop(log);
2205
2206        // Read back and verify
2207        let entries = TransactionLog::read_all(&log_path).unwrap();
2208        assert_eq!(entries.len(), 3);
2209        assert!(
2210            matches!(&entries[0], TransactionLogEntry::Begin { txn_id, producer_id, .. } if txn_id == "txn-1" && *producer_id == 42)
2211        );
2212        assert!(
2213            matches!(&entries[1], TransactionLogEntry::AddPartition { partition, .. } if partition.topic == "topic-a" && partition.partition == 0)
2214        );
2215        assert!(
2216            matches!(&entries[2], TransactionLogEntry::RecordWrite { offset, .. } if *offset == 100)
2217        );
2218    }
2219
2220    #[test]
2221    fn test_transaction_log_crc_corruption_detection() {
2222        let dir = tempfile::tempdir().unwrap();
2223        let log_path = dir.path().join("txn.log");
2224
2225        let log = TransactionLog::open(&log_path).unwrap();
2226        log.append(&TransactionLogEntry::Begin {
2227            txn_id: "txn-1".to_string(),
2228            producer_id: 1,
2229            producer_epoch: 0,
2230            timeout_ms: 5000,
2231        })
2232        .unwrap();
2233        log.append(&TransactionLogEntry::PrepareCommit {
2234            txn_id: "txn-1".to_string(),
2235            producer_id: 1,
2236        })
2237        .unwrap();
2238        drop(log);
2239
2240        // Corrupt one byte in the middle of the file
2241        let mut data = std::fs::read(&log_path).unwrap();
2242        assert!(data.len() > 10);
2243        data[10] ^= 0xFF; // flip bits in payload
2244        std::fs::write(&log_path, &data).unwrap();
2245
2246        // read_all should stop at first corrupted entry
2247        let entries = TransactionLog::read_all(&log_path).unwrap();
2248        // Should have 0 or 1 entries (first entry's CRC may be corrupted
2249        // depending on where byte 10 falls)
2250        assert!(entries.len() <= 1);
2251    }
2252
2253    #[test]
2254    fn test_coordinator_with_persistence_commit_flow() {
2255        let dir = tempfile::tempdir().unwrap();
2256        let log_path = dir.path().join("txn.log");
2257
2258        // Run a full commit cycle with persistence
2259        let coord = TransactionCoordinator::with_persistence(&log_path).unwrap();
2260        assert_eq!(
2261            coord.begin_transaction("txn-1".to_string(), 1, 0, None),
2262            TransactionResult::Ok
2263        );
2264        assert_eq!(
2265            coord.add_partitions_to_transaction(
2266                &"txn-1".to_string(),
2267                1,
2268                0,
2269                vec![TransactionPartition::new("topic", 0)],
2270            ),
2271            TransactionResult::Ok
2272        );
2273        assert_eq!(
2274            coord.add_write_to_transaction(
2275                &"txn-1".to_string(),
2276                1,
2277                0,
2278                TransactionPartition::new("topic", 0),
2279                0,
2280                500,
2281            ),
2282            TransactionResult::Ok
2283        );
2284        coord.prepare_commit(&"txn-1".to_string(), 1, 0).unwrap();
2285        assert_eq!(
2286            coord.complete_commit(&"txn-1".to_string(), 1),
2287            TransactionResult::Ok
2288        );
2289
2290        // Log should contain Begin, AddPartition, RecordWrite,
2291        // PrepareCommit, CompleteCommit (5 entries)
2292        let entries = TransactionLog::read_all(&log_path).unwrap();
2293        assert_eq!(entries.len(), 5);
2294    }
2295
2296    #[test]
2297    fn test_coordinator_recovery_from_crash() {
2298        let dir = tempfile::tempdir().unwrap();
2299        let log_path = dir.path().join("txn.log");
2300
2301        // Simulate a coordinator that prepares commit then "crashes"
2302        {
2303            let coord = TransactionCoordinator::with_persistence(&log_path).unwrap();
2304            coord.begin_transaction("txn-1".to_string(), 1, 0, None);
2305            coord.add_partitions_to_transaction(
2306                &"txn-1".to_string(),
2307                1,
2308                0,
2309                vec![TransactionPartition::new("topic", 0)],
2310            );
2311            coord.add_write_to_transaction(
2312                &"txn-1".to_string(),
2313                1,
2314                0,
2315                TransactionPartition::new("topic", 0),
2316                0,
2317                42,
2318            );
2319            coord.prepare_commit(&"txn-1".to_string(), 1, 0).unwrap();
2320            // "crash" here — drop without complete_commit
2321        }
2322
2323        // Recover — should reconstruct the in-doubt transaction
2324        let coord = TransactionCoordinator::recover(&log_path).unwrap();
2325
2326        // Transaction should be recovered in PrepareCommit state
2327        let txn = coord.get_transaction(&"txn-1".to_string(), 1);
2328        assert!(txn.is_some(), "Transaction should be recovered from WAL");
2329        let txn = txn.unwrap();
2330        assert_eq!(txn.state, TransactionState::PrepareCommit);
2331        assert_eq!(txn.pending_writes.len(), 1);
2332        assert_eq!(txn.pending_writes[0].offset, 42);
2333
2334        // Should be able to complete the commit after recovery
2335        assert_eq!(
2336            coord.complete_commit(&"txn-1".to_string(), 1),
2337            TransactionResult::Ok
2338        );
2339        assert_eq!(coord.active_count(), 0);
2340    }
2341
2342    #[test]
2343    fn test_coordinator_recovery_abort_flow() {
2344        let dir = tempfile::tempdir().unwrap();
2345        let log_path = dir.path().join("txn.log");
2346
2347        // Write a transaction that was aborted
2348        {
2349            let coord = TransactionCoordinator::with_persistence(&log_path).unwrap();
2350            coord.begin_transaction("txn-a".to_string(), 10, 0, None);
2351            coord.add_partitions_to_transaction(
2352                &"txn-a".to_string(),
2353                10,
2354                0,
2355                vec![TransactionPartition::new("t", 0)],
2356            );
2357            coord.add_write_to_transaction(
2358                &"txn-a".to_string(),
2359                10,
2360                0,
2361                TransactionPartition::new("t", 0),
2362                0,
2363                200,
2364            );
2365            coord.prepare_abort(&"txn-a".to_string(), 10, 0).unwrap();
2366            coord.complete_abort(&"txn-a".to_string(), 10);
2367        }
2368
2369        // Recovery should show this transaction as fully resolved
2370        let coord = TransactionCoordinator::recover(&log_path).unwrap();
2371        assert_eq!(coord.active_count(), 0);
2372        // The abort markers should be in the aborted index
2373        assert!(coord.is_aborted(10, 200));
2374    }
2375
2376    #[test]
2377    fn test_transaction_log_noop_is_silent() {
2378        let log = TransactionLog::noop();
2379        // noop log should accept writes without error
2380        assert!(log
2381            .append(&TransactionLogEntry::Begin {
2382                txn_id: "x".to_string(),
2383                producer_id: 1,
2384                producer_epoch: 0,
2385                timeout_ms: 1000,
2386            })
2387            .is_ok());
2388    }
2389
2390    #[test]
2391    fn test_log_write_error_propagated() {
2392        // Use a coordinator with persistence pointing to a non-writable path
2393        let coord = TransactionCoordinator::new(); // noop log
2394
2395        // noop log writes succeed silently, so begin should work
2396        let result = coord.begin_transaction("txn-ok".to_string(), 1, 0, None);
2397        assert_eq!(result, TransactionResult::Ok);
2398    }
2399}