Skip to main content

rivven_core/
transaction.rs

1//! Native Transaction Support
2//!
3//! Provides exactly-once semantics with cross-topic atomic writes.
4//!
5//! ## Transaction Protocol
6//!
7//! ```text
8//! Producer                     Transaction Coordinator            Partitions
9//!    │                                   │                            │
10//!    │─── InitProducerId ───────────────>│                            │
11//!    │<── PID=123, Epoch=0 ──────────────│                            │
12//!    │                                   │                            │
13//!    │─── BeginTransaction(TxnId) ──────>│                            │
14//!    │<── OK ────────────────────────────│                            │
15//!    │                                   │                            │
16//!    │─── AddPartitionsToTxn(p1,p2) ────>│                            │
17//!    │<── OK ────────────────────────────│                            │
18//!    │                                   │                            │
19//!    │─── Produce(p1, PID, Seq) ──────────────────────────────────────>│
20//!    │<── OK ───────────────────────────────────────────────────────────│
21//!    │                                   │                            │
22//!    │─── Produce(p2, PID, Seq) ──────────────────────────────────────>│
23//!    │<── OK ───────────────────────────────────────────────────────────│
24//!    │                                   │                            │
25//!    │─── CommitTransaction(TxnId) ─────>│                            │
26//!    │                                   │─── WriteTxnMarker(COMMIT) ─>│
27//!    │                                   │<── OK ─────────────────────│
28//!    │<── OK ────────────────────────────│                            │
29//! ```
30//!
31//! ## Transaction States
32//!
33//! ```text
34//! Empty ──────> Ongoing ──────> PrepareCommit ──────> CompleteCommit
35//!                  │                  │                     │
36//!                  │                  v                     v
37//!                  └───────> PrepareAbort ───────> CompleteAbort
38//!                                    │                     │
39//!                                    └─────────────────────┘
40//! ```
41//!
42//! ## Exactly-Once Guarantees
43//!
44//! 1. **Atomic Writes**: All messages in a transaction are committed or aborted together
45//! 2. **Consumer Isolation**: Consumers only see committed messages (read_committed)
46//! 3. **Fencing**: Old producer instances are fenced via epoch
47//! 4. **Durability**: Transaction state is persisted before acknowledgment
48//!
49
50use crate::idempotent::{ProducerEpoch, ProducerId};
51use parking_lot::RwLock;
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, HashSet};
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::time::{Duration, Instant, SystemTime};
56
57/// Unique identifier for a transaction
58pub type TransactionId = String;
59
60/// Transaction timeout default (1 minute)
61pub const DEFAULT_TRANSACTION_TIMEOUT: Duration = Duration::from_secs(60);
62
63/// Maximum pending transactions per producer
64pub const MAX_PENDING_TRANSACTIONS: usize = 5;
65
66/// Transaction state machine
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
68pub enum TransactionState {
69    /// No active transaction
70    Empty,
71
72    /// Transaction in progress, accepting writes
73    Ongoing,
74
75    /// Preparing to commit (2PC phase 1)
76    PrepareCommit,
77
78    /// Preparing to abort (2PC phase 1)
79    PrepareAbort,
80
81    /// Commit complete (2PC phase 2)
82    CompleteCommit,
83
84    /// Abort complete (2PC phase 2)
85    CompleteAbort,
86
87    /// Transaction has expired without completion
88    Dead,
89}
90
91impl TransactionState {
92    /// Check if transaction is in a terminal state
93    pub fn is_terminal(&self) -> bool {
94        matches!(
95            self,
96            TransactionState::Empty
97                | TransactionState::CompleteCommit
98                | TransactionState::CompleteAbort
99                | TransactionState::Dead
100        )
101    }
102
103    /// Check if transaction is still active (can accept writes)
104    pub fn is_active(&self) -> bool {
105        matches!(self, TransactionState::Ongoing)
106    }
107
108    /// Check if transaction can transition to commit
109    pub fn can_commit(&self) -> bool {
110        matches!(self, TransactionState::Ongoing)
111    }
112
113    /// Check if transaction can transition to abort
114    pub fn can_abort(&self) -> bool {
115        matches!(
116            self,
117            TransactionState::Ongoing
118                | TransactionState::PrepareCommit
119                | TransactionState::PrepareAbort
120        )
121    }
122}
123
124/// Result of a transaction operation
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum TransactionResult {
127    /// Operation succeeded
128    Ok,
129
130    /// Transaction ID is invalid or not found
131    InvalidTransactionId,
132
133    /// Transaction is in wrong state for this operation
134    InvalidTransactionState {
135        current: TransactionState,
136        expected: &'static str,
137    },
138
139    /// Producer ID/epoch mismatch
140    ProducerFenced {
141        expected_epoch: ProducerEpoch,
142        received_epoch: ProducerEpoch,
143    },
144
145    /// Transaction has timed out
146    TransactionTimeout,
147
148    /// Too many pending transactions
149    TooManyTransactions,
150
151    /// Concurrent modification detected
152    ConcurrentTransaction,
153
154    /// Partition not part of transaction
155    PartitionNotInTransaction { topic: String, partition: u32 },
156}
157
158/// A partition involved in a transaction
159#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
160pub struct TransactionPartition {
161    pub topic: String,
162    pub partition: u32,
163}
164
165impl TransactionPartition {
166    pub fn new(topic: impl Into<String>, partition: u32) -> Self {
167        Self {
168            topic: topic.into(),
169            partition,
170        }
171    }
172}
173
174/// Pending write in a transaction (not yet committed)
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct PendingWrite {
177    /// Target partition
178    pub partition: TransactionPartition,
179
180    /// Sequence number for this write
181    pub sequence: i32,
182
183    /// Offset assigned by the partition leader
184    pub offset: u64,
185
186    /// Write timestamp
187    #[serde(with = "crate::serde_utils::system_time")]
188    pub timestamp: SystemTime,
189}
190
191/// Consumer offset to be committed with the transaction
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct TransactionOffsetCommit {
194    /// Consumer group
195    pub group_id: String,
196
197    /// Topic-partition-offset triples
198    pub offsets: Vec<(TransactionPartition, i64)>,
199}
200
201/// Active transaction state
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct Transaction {
204    /// Transaction ID (unique per producer)
205    pub txn_id: TransactionId,
206
207    /// Producer ID owning this transaction
208    pub producer_id: ProducerId,
209
210    /// Producer epoch (for fencing)
211    pub producer_epoch: ProducerEpoch,
212
213    /// Current state
214    pub state: TransactionState,
215
216    /// Partitions involved in this transaction
217    pub partitions: HashSet<TransactionPartition>,
218
219    /// Pending writes (not yet committed)
220    pub pending_writes: Vec<PendingWrite>,
221
222    /// Consumer offsets to commit with this transaction
223    pub offset_commits: Vec<TransactionOffsetCommit>,
224
225    /// Transaction start time
226    #[serde(with = "crate::serde_utils::system_time")]
227    pub started_at: SystemTime,
228
229    /// Transaction timeout
230    #[serde(with = "crate::serde_utils::duration")]
231    pub timeout: Duration,
232
233    /// Last activity timestamp
234    #[serde(skip)]
235    pub last_activity: Option<Instant>,
236}
237
238impl Transaction {
239    /// Create a new transaction
240    pub fn new(
241        txn_id: TransactionId,
242        producer_id: ProducerId,
243        producer_epoch: ProducerEpoch,
244        timeout: Duration,
245    ) -> Self {
246        Self {
247            txn_id,
248            producer_id,
249            producer_epoch,
250            state: TransactionState::Ongoing,
251            partitions: HashSet::new(),
252            pending_writes: Vec::new(),
253            offset_commits: Vec::new(),
254            started_at: SystemTime::now(),
255            timeout,
256            last_activity: Some(Instant::now()),
257        }
258    }
259
260    /// Check if transaction has timed out
261    pub fn is_timed_out(&self) -> bool {
262        self.last_activity
263            .map(|t| t.elapsed() > self.timeout)
264            .unwrap_or(true)
265    }
266
267    /// Update last activity timestamp
268    pub fn touch(&mut self) {
269        self.last_activity = Some(Instant::now());
270    }
271
272    /// Add a partition to the transaction
273    pub fn add_partition(&mut self, partition: TransactionPartition) {
274        self.partitions.insert(partition);
275        self.touch();
276    }
277
278    /// Record a pending write
279    pub fn add_write(&mut self, partition: TransactionPartition, sequence: i32, offset: u64) {
280        self.pending_writes.push(PendingWrite {
281            partition,
282            sequence,
283            offset,
284            timestamp: SystemTime::now(),
285        });
286        self.touch();
287    }
288
289    /// Add consumer offset commit
290    pub fn add_offset_commit(
291        &mut self,
292        group_id: String,
293        offsets: Vec<(TransactionPartition, i64)>,
294    ) {
295        self.offset_commits
296            .push(TransactionOffsetCommit { group_id, offsets });
297        self.touch();
298    }
299
300    /// Get total number of writes
301    pub fn write_count(&self) -> usize {
302        self.pending_writes.len()
303    }
304
305    /// Get all affected partitions
306    pub fn affected_partitions(&self) -> impl Iterator<Item = &TransactionPartition> {
307        self.partitions.iter()
308    }
309}
310
311/// Transaction marker type written to partition logs
312#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
313pub enum TransactionMarker {
314    /// Transaction committed
315    Commit,
316
317    /// Transaction aborted
318    Abort,
319}
320
321/// Consumer isolation level
322///
323/// Controls whether consumers can see uncommitted transactional messages.
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
325pub enum IsolationLevel {
326    /// Read all messages, including those from aborted transactions.
327    /// This is the default for backward compatibility.
328    #[default]
329    ReadUncommitted,
330
331    /// Only read messages from committed transactions.
332    /// Messages from aborted transactions are filtered out.
333    ReadCommitted,
334}
335
336impl IsolationLevel {
337    /// Convert to string (Kafka-compatible)
338    pub fn as_str(&self) -> &'static str {
339        match self {
340            Self::ReadUncommitted => "read_uncommitted",
341            Self::ReadCommitted => "read_committed",
342        }
343    }
344
345    /// Convert from u8 (wire protocol)
346    /// 0 = read_uncommitted (default)
347    /// 1 = read_committed
348    /// Other values default to read_uncommitted
349    pub fn from_u8(value: u8) -> Self {
350        match value {
351            1 => Self::ReadCommitted,
352            _ => Self::ReadUncommitted,
353        }
354    }
355
356    /// Convert to u8 (wire protocol)
357    pub fn as_u8(&self) -> u8 {
358        match self {
359            Self::ReadUncommitted => 0,
360            Self::ReadCommitted => 1,
361        }
362    }
363}
364
365impl std::str::FromStr for IsolationLevel {
366    type Err = String;
367
368    /// Parse from string (Kafka-compatible)
369    fn from_str(s: &str) -> Result<Self, Self::Err> {
370        match s.to_lowercase().as_str() {
371            "read_uncommitted" => Ok(Self::ReadUncommitted),
372            "read_committed" => Ok(Self::ReadCommitted),
373            _ => Err(format!("unknown isolation level: {}", s)),
374        }
375    }
376}
377
378impl std::fmt::Display for IsolationLevel {
379    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380        write!(f, "{}", self.as_str())
381    }
382}
383
384/// Record of an aborted transaction for consumer filtering
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct AbortedTransaction {
387    /// Producer ID that aborted
388    pub producer_id: ProducerId,
389    /// First offset of the aborted transaction in this partition
390    pub first_offset: u64,
391}
392
393/// Index of aborted transactions for a partition
394///
395/// Used for efficient filtering when `isolation.level=read_committed`
396#[derive(Debug, Default)]
397pub struct AbortedTransactionIndex {
398    /// Aborted transactions sorted by first_offset
399    aborted: RwLock<Vec<AbortedTransaction>>,
400}
401
402impl AbortedTransactionIndex {
403    /// Create a new empty index
404    pub fn new() -> Self {
405        Self::default()
406    }
407
408    /// Record an aborted transaction
409    pub fn record_abort(&self, producer_id: ProducerId, first_offset: u64) {
410        let mut aborted = self.aborted.write();
411        aborted.push(AbortedTransaction {
412            producer_id,
413            first_offset,
414        });
415        // Keep sorted by first_offset for efficient lookup
416        aborted.sort_by_key(|a| a.first_offset);
417    }
418
419    /// Get aborted transactions that overlap with a range of offsets
420    ///
421    /// Returns aborted transactions whose first_offset is within [start_offset, end_offset]
422    pub fn get_aborted_in_range(
423        &self,
424        start_offset: u64,
425        end_offset: u64,
426    ) -> Vec<AbortedTransaction> {
427        let aborted = self.aborted.read();
428        aborted
429            .iter()
430            .filter(|a| a.first_offset >= start_offset && a.first_offset <= end_offset)
431            .cloned()
432            .collect()
433    }
434
435    /// Check if a specific producer's message at an offset is from an aborted transaction
436    pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
437        let aborted = self.aborted.read();
438        aborted
439            .iter()
440            .any(|a| a.producer_id == producer_id && a.first_offset <= offset)
441    }
442
443    /// Remove aborted transactions older than a given offset (for log truncation)
444    pub fn truncate_before(&self, offset: u64) {
445        let mut aborted = self.aborted.write();
446        aborted.retain(|a| a.first_offset >= offset);
447    }
448
449    /// Get count of tracked aborted transactions
450    pub fn len(&self) -> usize {
451        self.aborted.read().len()
452    }
453
454    /// Check if index is empty
455    pub fn is_empty(&self) -> bool {
456        self.len() == 0
457    }
458}
459
460/// Statistics for transaction coordinator
461#[derive(Debug, Default)]
462pub struct TransactionStats {
463    /// Total transactions initiated
464    transactions_started: AtomicU64,
465
466    /// Total transactions committed
467    transactions_committed: AtomicU64,
468
469    /// Total transactions aborted
470    transactions_aborted: AtomicU64,
471
472    /// Total transactions timed out
473    transactions_timed_out: AtomicU64,
474
475    /// Currently active transactions
476    active_transactions: AtomicU64,
477}
478
479impl TransactionStats {
480    pub fn new() -> Self {
481        Self::default()
482    }
483
484    pub fn record_start(&self) {
485        self.transactions_started.fetch_add(1, Ordering::Relaxed);
486        self.active_transactions.fetch_add(1, Ordering::Relaxed);
487    }
488
489    pub fn record_commit(&self) {
490        self.transactions_committed.fetch_add(1, Ordering::Relaxed);
491        self.active_transactions.fetch_sub(1, Ordering::Relaxed);
492    }
493
494    pub fn record_abort(&self) {
495        self.transactions_aborted.fetch_add(1, Ordering::Relaxed);
496        self.active_transactions.fetch_sub(1, Ordering::Relaxed);
497    }
498
499    pub fn record_timeout(&self) {
500        self.transactions_timed_out.fetch_add(1, Ordering::Relaxed);
501        self.active_transactions.fetch_sub(1, Ordering::Relaxed);
502    }
503
504    pub fn transactions_started(&self) -> u64 {
505        self.transactions_started.load(Ordering::Relaxed)
506    }
507
508    pub fn transactions_committed(&self) -> u64 {
509        self.transactions_committed.load(Ordering::Relaxed)
510    }
511
512    pub fn transactions_aborted(&self) -> u64 {
513        self.transactions_aborted.load(Ordering::Relaxed)
514    }
515
516    pub fn transactions_timed_out(&self) -> u64 {
517        self.transactions_timed_out.load(Ordering::Relaxed)
518    }
519
520    pub fn active_transactions(&self) -> u64 {
521        self.active_transactions.load(Ordering::Relaxed)
522    }
523}
524
525/// Snapshot of transaction stats for serialization
526#[derive(Debug, Clone, Serialize, Deserialize)]
527pub struct TransactionStatsSnapshot {
528    pub transactions_started: u64,
529    pub transactions_committed: u64,
530    pub transactions_aborted: u64,
531    pub transactions_timed_out: u64,
532    pub active_transactions: u64,
533}
534
535impl From<&TransactionStats> for TransactionStatsSnapshot {
536    fn from(stats: &TransactionStats) -> Self {
537        Self {
538            transactions_started: stats.transactions_started(),
539            transactions_committed: stats.transactions_committed(),
540            transactions_aborted: stats.transactions_aborted(),
541            transactions_timed_out: stats.transactions_timed_out(),
542            active_transactions: stats.active_transactions(),
543        }
544    }
545}
546
547/// Transaction coordinator manages all active transactions
548///
549/// This is a per-broker component that tracks transactions for producers
550/// assigned to this broker as their transaction coordinator.
551pub struct TransactionCoordinator {
552    /// Active transactions by (producer_id, txn_id)
553    transactions: RwLock<HashMap<(ProducerId, TransactionId), Transaction>>,
554
555    /// Producer to transaction mapping (for single-txn-per-producer enforcement)
556    producer_transactions: RwLock<HashMap<ProducerId, TransactionId>>,
557
558    /// Default transaction timeout
559    default_timeout: Duration,
560
561    /// Statistics
562    stats: TransactionStats,
563
564    /// Index of aborted transactions for read_committed filtering
565    aborted_index: AbortedTransactionIndex,
566}
567
568impl Default for TransactionCoordinator {
569    fn default() -> Self {
570        Self::new()
571    }
572}
573
574impl TransactionCoordinator {
575    /// Create a new transaction coordinator
576    pub fn new() -> Self {
577        Self {
578            transactions: RwLock::new(HashMap::new()),
579            producer_transactions: RwLock::new(HashMap::new()),
580            default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
581            stats: TransactionStats::new(),
582            aborted_index: AbortedTransactionIndex::new(),
583        }
584    }
585
586    /// Create with custom default timeout
587    pub fn with_timeout(timeout: Duration) -> Self {
588        Self {
589            transactions: RwLock::new(HashMap::new()),
590            producer_transactions: RwLock::new(HashMap::new()),
591            default_timeout: timeout,
592            stats: TransactionStats::new(),
593            aborted_index: AbortedTransactionIndex::new(),
594        }
595    }
596
597    /// Get statistics
598    pub fn stats(&self) -> &TransactionStats {
599        &self.stats
600    }
601
602    /// Begin a new transaction
603    pub fn begin_transaction(
604        &self,
605        txn_id: TransactionId,
606        producer_id: ProducerId,
607        producer_epoch: ProducerEpoch,
608        timeout: Option<Duration>,
609    ) -> TransactionResult {
610        // Use write locks from the start to prevent TOCTOU races
611        let mut transactions = self.transactions.write();
612        let mut producer_txns = self.producer_transactions.write();
613
614        // Check if producer already has an active transaction
615        if let Some(existing_txn_id) = producer_txns.get(&producer_id) {
616            if existing_txn_id != &txn_id {
617                return TransactionResult::ConcurrentTransaction;
618            }
619            // Same txn_id - check if we're resuming
620            if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
621                if txn.producer_epoch != producer_epoch {
622                    return TransactionResult::ProducerFenced {
623                        expected_epoch: txn.producer_epoch,
624                        received_epoch: producer_epoch,
625                    };
626                }
627                if txn.state.is_active() {
628                    return TransactionResult::Ok; // Already active
629                }
630            }
631        }
632
633        // Enforce MAX_PENDING_TRANSACTIONS limit
634        let active_count = transactions
635            .values()
636            .filter(|t| t.state.is_active())
637            .count();
638        if active_count >= MAX_PENDING_TRANSACTIONS {
639            return TransactionResult::TooManyTransactions;
640        }
641
642        // Create new transaction
643        let txn = Transaction::new(
644            txn_id.clone(),
645            producer_id,
646            producer_epoch,
647            timeout.unwrap_or(self.default_timeout),
648        );
649
650        transactions.insert((producer_id, txn_id.clone()), txn);
651        producer_txns.insert(producer_id, txn_id);
652
653        self.stats.record_start();
654        TransactionResult::Ok
655    }
656
657    /// Add partitions to an active transaction
658    pub fn add_partitions_to_transaction(
659        &self,
660        txn_id: &TransactionId,
661        producer_id: ProducerId,
662        producer_epoch: ProducerEpoch,
663        partitions: Vec<TransactionPartition>,
664    ) -> TransactionResult {
665        let mut transactions = self.transactions.write();
666
667        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
668            Some(t) => t,
669            None => return TransactionResult::InvalidTransactionId,
670        };
671
672        // Validate epoch
673        if txn.producer_epoch != producer_epoch {
674            return TransactionResult::ProducerFenced {
675                expected_epoch: txn.producer_epoch,
676                received_epoch: producer_epoch,
677            };
678        }
679
680        // Check state
681        if !txn.state.is_active() {
682            return TransactionResult::InvalidTransactionState {
683                current: txn.state,
684                expected: "Ongoing",
685            };
686        }
687
688        // Check timeout
689        if txn.is_timed_out() {
690            txn.state = TransactionState::Dead;
691            self.stats.record_timeout();
692            return TransactionResult::TransactionTimeout;
693        }
694
695        // Add partitions
696        for partition in partitions {
697            txn.add_partition(partition);
698        }
699
700        TransactionResult::Ok
701    }
702
703    /// Record a write within a transaction
704    pub fn add_write_to_transaction(
705        &self,
706        txn_id: &TransactionId,
707        producer_id: ProducerId,
708        producer_epoch: ProducerEpoch,
709        partition: TransactionPartition,
710        sequence: i32,
711        offset: u64,
712    ) -> TransactionResult {
713        let mut transactions = self.transactions.write();
714
715        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
716            Some(t) => t,
717            None => return TransactionResult::InvalidTransactionId,
718        };
719
720        // Validate epoch
721        if txn.producer_epoch != producer_epoch {
722            return TransactionResult::ProducerFenced {
723                expected_epoch: txn.producer_epoch,
724                received_epoch: producer_epoch,
725            };
726        }
727
728        // Check state
729        if !txn.state.is_active() {
730            return TransactionResult::InvalidTransactionState {
731                current: txn.state,
732                expected: "Ongoing",
733            };
734        }
735
736        // Check timeout
737        if txn.is_timed_out() {
738            txn.state = TransactionState::Dead;
739            self.stats.record_timeout();
740            return TransactionResult::TransactionTimeout;
741        }
742
743        // Verify partition is part of transaction
744        if !txn.partitions.contains(&partition) {
745            return TransactionResult::PartitionNotInTransaction {
746                topic: partition.topic,
747                partition: partition.partition,
748            };
749        }
750
751        // Record the write
752        txn.add_write(partition, sequence, offset);
753
754        TransactionResult::Ok
755    }
756
757    /// Add consumer offset commit to transaction (for exactly-once consume-transform-produce)
758    pub fn add_offsets_to_transaction(
759        &self,
760        txn_id: &TransactionId,
761        producer_id: ProducerId,
762        producer_epoch: ProducerEpoch,
763        group_id: String,
764        offsets: Vec<(TransactionPartition, i64)>,
765    ) -> TransactionResult {
766        let mut transactions = self.transactions.write();
767
768        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
769            Some(t) => t,
770            None => return TransactionResult::InvalidTransactionId,
771        };
772
773        // Validate epoch
774        if txn.producer_epoch != producer_epoch {
775            return TransactionResult::ProducerFenced {
776                expected_epoch: txn.producer_epoch,
777                received_epoch: producer_epoch,
778            };
779        }
780
781        // Check state
782        if !txn.state.is_active() {
783            return TransactionResult::InvalidTransactionState {
784                current: txn.state,
785                expected: "Ongoing",
786            };
787        }
788
789        // Check timeout
790        if txn.is_timed_out() {
791            txn.state = TransactionState::Dead;
792            self.stats.record_timeout();
793            return TransactionResult::TransactionTimeout;
794        }
795
796        // Add offset commit
797        txn.add_offset_commit(group_id, offsets);
798
799        TransactionResult::Ok
800    }
801
802    /// Prepare to commit a transaction (2PC phase 1)
803    ///
804    /// Returns the transaction data needed for committing to partitions
805    pub fn prepare_commit(
806        &self,
807        txn_id: &TransactionId,
808        producer_id: ProducerId,
809        producer_epoch: ProducerEpoch,
810    ) -> Result<Transaction, TransactionResult> {
811        let mut transactions = self.transactions.write();
812
813        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
814            Some(t) => t,
815            None => return Err(TransactionResult::InvalidTransactionId),
816        };
817
818        // Validate epoch
819        if txn.producer_epoch != producer_epoch {
820            return Err(TransactionResult::ProducerFenced {
821                expected_epoch: txn.producer_epoch,
822                received_epoch: producer_epoch,
823            });
824        }
825
826        // Check state
827        if !txn.state.can_commit() {
828            return Err(TransactionResult::InvalidTransactionState {
829                current: txn.state,
830                expected: "Ongoing",
831            });
832        }
833
834        // Check timeout
835        if txn.is_timed_out() {
836            txn.state = TransactionState::Dead;
837            self.stats.record_timeout();
838            return Err(TransactionResult::TransactionTimeout);
839        }
840
841        // Transition to PrepareCommit
842        txn.state = TransactionState::PrepareCommit;
843        txn.touch();
844
845        Ok(txn.clone())
846    }
847
848    /// Complete the commit (2PC phase 2)
849    pub fn complete_commit(
850        &self,
851        txn_id: &TransactionId,
852        producer_id: ProducerId,
853    ) -> TransactionResult {
854        let mut transactions = self.transactions.write();
855        let mut producer_txns = self.producer_transactions.write();
856
857        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
858            Some(t) => t,
859            None => return TransactionResult::InvalidTransactionId,
860        };
861
862        if txn.state != TransactionState::PrepareCommit {
863            return TransactionResult::InvalidTransactionState {
864                current: txn.state,
865                expected: "PrepareCommit",
866            };
867        }
868
869        txn.state = TransactionState::CompleteCommit;
870
871        // Clean up
872        transactions.remove(&(producer_id, txn_id.clone()));
873        producer_txns.remove(&producer_id);
874
875        self.stats.record_commit();
876        TransactionResult::Ok
877    }
878
879    /// Prepare to abort a transaction (2PC phase 1)
880    pub fn prepare_abort(
881        &self,
882        txn_id: &TransactionId,
883        producer_id: ProducerId,
884        producer_epoch: ProducerEpoch,
885    ) -> Result<Transaction, TransactionResult> {
886        let mut transactions = self.transactions.write();
887
888        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
889            Some(t) => t,
890            None => return Err(TransactionResult::InvalidTransactionId),
891        };
892
893        // Validate epoch
894        if txn.producer_epoch != producer_epoch {
895            return Err(TransactionResult::ProducerFenced {
896                expected_epoch: txn.producer_epoch,
897                received_epoch: producer_epoch,
898            });
899        }
900
901        // Check state - abort is allowed from more states than commit
902        if !txn.state.can_abort() {
903            return Err(TransactionResult::InvalidTransactionState {
904                current: txn.state,
905                expected: "Ongoing or PrepareCommit",
906            });
907        }
908
909        // Transition to PrepareAbort
910        txn.state = TransactionState::PrepareAbort;
911        txn.touch();
912
913        Ok(txn.clone())
914    }
915
916    /// Complete the abort (2PC phase 2)
917    pub fn complete_abort(
918        &self,
919        txn_id: &TransactionId,
920        producer_id: ProducerId,
921    ) -> TransactionResult {
922        let mut transactions = self.transactions.write();
923        let mut producer_txns = self.producer_transactions.write();
924
925        let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
926            Some(t) => t,
927            None => return TransactionResult::InvalidTransactionId,
928        };
929
930        if txn.state != TransactionState::PrepareAbort {
931            return TransactionResult::InvalidTransactionState {
932                current: txn.state,
933                expected: "PrepareAbort",
934            };
935        }
936
937        txn.state = TransactionState::CompleteAbort;
938
939        // Record aborted transaction for read_committed filtering
940        // Use the minimum offset from pending writes as the first_offset
941        if let Some(first_offset) = txn.pending_writes.iter().map(|w| w.offset).min() {
942            self.aborted_index.record_abort(producer_id, first_offset);
943        }
944
945        // Clean up
946        transactions.remove(&(producer_id, txn_id.clone()));
947        producer_txns.remove(&producer_id);
948
949        self.stats.record_abort();
950        TransactionResult::Ok
951    }
952
953    /// Get current transaction state for a producer
954    pub fn get_transaction(
955        &self,
956        txn_id: &TransactionId,
957        producer_id: ProducerId,
958    ) -> Option<Transaction> {
959        let transactions = self.transactions.read();
960        transactions.get(&(producer_id, txn_id.clone())).cloned()
961    }
962
963    /// Check if a producer has an active transaction
964    pub fn has_active_transaction(&self, producer_id: ProducerId) -> bool {
965        let producer_txns = self.producer_transactions.read();
966        producer_txns.contains_key(&producer_id)
967    }
968
969    /// Get active transaction ID for a producer
970    pub fn get_active_transaction_id(&self, producer_id: ProducerId) -> Option<TransactionId> {
971        let producer_txns = self.producer_transactions.read();
972        producer_txns.get(&producer_id).cloned()
973    }
974
975    /// Clean up timed-out transactions
976    pub fn cleanup_timed_out_transactions(&self) -> Vec<Transaction> {
977        let mut timed_out = Vec::new();
978        let mut transactions = self.transactions.write();
979        let mut producer_txns = self.producer_transactions.write();
980
981        let keys_to_remove: Vec<_> = transactions
982            .iter()
983            .filter(|(_, txn)| txn.is_timed_out() && !txn.state.is_terminal())
984            .map(|(k, _)| k.clone())
985            .collect();
986
987        for key in keys_to_remove {
988            if let Some(mut txn) = transactions.remove(&key) {
989                txn.state = TransactionState::Dead;
990                producer_txns.remove(&txn.producer_id);
991                self.stats.record_timeout();
992                timed_out.push(txn);
993            }
994        }
995
996        timed_out
997    }
998
999    /// Get number of active transactions
1000    pub fn active_count(&self) -> usize {
1001        let transactions = self.transactions.read();
1002        transactions
1003            .values()
1004            .filter(|t| !t.state.is_terminal())
1005            .count()
1006    }
1007
1008    /// Check if a producer's message at a given offset is from an aborted transaction
1009    ///
1010    /// Used for read_committed isolation level filtering
1011    pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
1012        self.aborted_index.is_aborted(producer_id, offset)
1013    }
1014
1015    /// Get aborted transactions in a range of offsets
1016    ///
1017    /// Used for FetchResponse to include aborted transaction metadata
1018    pub fn get_aborted_in_range(
1019        &self,
1020        start_offset: u64,
1021        end_offset: u64,
1022    ) -> Vec<AbortedTransaction> {
1023        self.aborted_index
1024            .get_aborted_in_range(start_offset, end_offset)
1025    }
1026
1027    /// Get access to the aborted transaction index
1028    pub fn aborted_index(&self) -> &AbortedTransactionIndex {
1029        &self.aborted_index
1030    }
1031}
1032
1033// ============================================================================
1034// Tests
1035// ============================================================================
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040    use std::str::FromStr;
1041
1042    #[test]
1043    fn test_transaction_state_transitions() {
1044        // Test terminal states
1045        assert!(TransactionState::Empty.is_terminal());
1046        assert!(TransactionState::CompleteCommit.is_terminal());
1047        assert!(TransactionState::CompleteAbort.is_terminal());
1048        assert!(TransactionState::Dead.is_terminal());
1049
1050        // Test active states
1051        assert!(!TransactionState::Ongoing.is_terminal());
1052        assert!(!TransactionState::PrepareCommit.is_terminal());
1053        assert!(!TransactionState::PrepareAbort.is_terminal());
1054
1055        // Test can_commit
1056        assert!(TransactionState::Ongoing.can_commit());
1057        assert!(!TransactionState::Empty.can_commit());
1058        assert!(!TransactionState::PrepareCommit.can_commit());
1059
1060        // Test can_abort
1061        assert!(TransactionState::Ongoing.can_abort());
1062        assert!(TransactionState::PrepareCommit.can_abort());
1063        assert!(TransactionState::PrepareAbort.can_abort());
1064        assert!(!TransactionState::Empty.can_abort());
1065    }
1066
1067    #[test]
1068    fn test_begin_transaction() {
1069        let coordinator = TransactionCoordinator::new();
1070
1071        // Begin first transaction
1072        let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1073        assert_eq!(result, TransactionResult::Ok);
1074
1075        // Verify transaction exists
1076        let txn = coordinator.get_transaction(&"txn-1".to_string(), 1);
1077        assert!(txn.is_some());
1078        let txn = txn.unwrap();
1079        assert_eq!(txn.state, TransactionState::Ongoing);
1080        assert_eq!(txn.producer_id, 1);
1081        assert_eq!(txn.producer_epoch, 0);
1082
1083        // Stats
1084        assert_eq!(coordinator.stats().transactions_started(), 1);
1085        assert_eq!(coordinator.stats().active_transactions(), 1);
1086    }
1087
1088    #[test]
1089    fn test_concurrent_transaction_rejection() {
1090        let coordinator = TransactionCoordinator::new();
1091
1092        // Begin first transaction
1093        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1094
1095        // Try to begin another transaction for same producer
1096        let result = coordinator.begin_transaction("txn-2".to_string(), 1, 0, None);
1097        assert_eq!(result, TransactionResult::ConcurrentTransaction);
1098    }
1099
1100    #[test]
1101    fn test_add_partitions_to_transaction() {
1102        let coordinator = TransactionCoordinator::new();
1103        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1104
1105        // Add partitions
1106        let result = coordinator.add_partitions_to_transaction(
1107            &"txn-1".to_string(),
1108            1,
1109            0,
1110            vec![
1111                TransactionPartition::new("topic-1", 0),
1112                TransactionPartition::new("topic-1", 1),
1113                TransactionPartition::new("topic-2", 0),
1114            ],
1115        );
1116        assert_eq!(result, TransactionResult::Ok);
1117
1118        // Verify partitions added
1119        let txn = coordinator
1120            .get_transaction(&"txn-1".to_string(), 1)
1121            .unwrap();
1122        assert_eq!(txn.partitions.len(), 3);
1123    }
1124
1125    #[test]
1126    fn test_add_write_to_transaction() {
1127        let coordinator = TransactionCoordinator::new();
1128        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1129
1130        let partition = TransactionPartition::new("topic-1", 0);
1131        coordinator.add_partitions_to_transaction(
1132            &"txn-1".to_string(),
1133            1,
1134            0,
1135            vec![partition.clone()],
1136        );
1137
1138        // Record write
1139        let result =
1140            coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1141        assert_eq!(result, TransactionResult::Ok);
1142
1143        // Verify write recorded
1144        let txn = coordinator
1145            .get_transaction(&"txn-1".to_string(), 1)
1146            .unwrap();
1147        assert_eq!(txn.pending_writes.len(), 1);
1148        assert_eq!(txn.pending_writes[0].offset, 100);
1149        assert_eq!(txn.pending_writes[0].sequence, 0);
1150    }
1151
1152    #[test]
1153    fn test_write_to_non_registered_partition() {
1154        let coordinator = TransactionCoordinator::new();
1155        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1156
1157        // Try to write to partition not added to transaction
1158        let result = coordinator.add_write_to_transaction(
1159            &"txn-1".to_string(),
1160            1,
1161            0,
1162            TransactionPartition::new("topic-1", 0),
1163            0,
1164            100,
1165        );
1166
1167        assert!(matches!(
1168            result,
1169            TransactionResult::PartitionNotInTransaction { .. }
1170        ));
1171    }
1172
1173    #[test]
1174    fn test_commit_transaction() {
1175        let coordinator = TransactionCoordinator::new();
1176        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1177
1178        let partition = TransactionPartition::new("topic-1", 0);
1179        coordinator.add_partitions_to_transaction(
1180            &"txn-1".to_string(),
1181            1,
1182            0,
1183            vec![partition.clone()],
1184        );
1185        coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1186
1187        // Prepare commit
1188        let txn = coordinator.prepare_commit(&"txn-1".to_string(), 1, 0);
1189        assert!(txn.is_ok());
1190        let txn = txn.unwrap();
1191        assert_eq!(txn.state, TransactionState::PrepareCommit);
1192
1193        // Complete commit
1194        let result = coordinator.complete_commit(&"txn-1".to_string(), 1);
1195        assert_eq!(result, TransactionResult::Ok);
1196
1197        // Transaction should be removed
1198        assert!(coordinator
1199            .get_transaction(&"txn-1".to_string(), 1)
1200            .is_none());
1201        assert!(!coordinator.has_active_transaction(1));
1202
1203        // Stats
1204        assert_eq!(coordinator.stats().transactions_committed(), 1);
1205        assert_eq!(coordinator.stats().active_transactions(), 0);
1206    }
1207
1208    #[test]
1209    fn test_abort_transaction() {
1210        let coordinator = TransactionCoordinator::new();
1211        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1212
1213        let partition = TransactionPartition::new("topic-1", 0);
1214        coordinator.add_partitions_to_transaction(
1215            &"txn-1".to_string(),
1216            1,
1217            0,
1218            vec![partition.clone()],
1219        );
1220        coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1221
1222        // Prepare abort
1223        let txn = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1224        assert!(txn.is_ok());
1225
1226        // Complete abort
1227        let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1228        assert_eq!(result, TransactionResult::Ok);
1229
1230        // Transaction should be removed
1231        assert!(coordinator
1232            .get_transaction(&"txn-1".to_string(), 1)
1233            .is_none());
1234
1235        // Stats
1236        assert_eq!(coordinator.stats().transactions_aborted(), 1);
1237    }
1238
1239    #[test]
1240    fn test_producer_fencing() {
1241        let coordinator = TransactionCoordinator::new();
1242        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1243
1244        // Try with wrong epoch
1245        let result = coordinator.add_partitions_to_transaction(
1246            &"txn-1".to_string(),
1247            1,
1248            1, // Wrong epoch
1249            vec![TransactionPartition::new("topic-1", 0)],
1250        );
1251
1252        assert!(matches!(
1253            result,
1254            TransactionResult::ProducerFenced {
1255                expected_epoch: 0,
1256                received_epoch: 1
1257            }
1258        ));
1259    }
1260
1261    #[test]
1262    fn test_transaction_timeout() {
1263        // Create coordinator with very short timeout
1264        let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1265        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1266
1267        // Wait for timeout
1268        std::thread::sleep(Duration::from_millis(5));
1269
1270        // Try to add partitions - should fail with timeout
1271        let result = coordinator.add_partitions_to_transaction(
1272            &"txn-1".to_string(),
1273            1,
1274            0,
1275            vec![TransactionPartition::new("topic-1", 0)],
1276        );
1277
1278        assert_eq!(result, TransactionResult::TransactionTimeout);
1279    }
1280
1281    #[test]
1282    fn test_cleanup_timed_out_transactions() {
1283        let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1284
1285        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1286        coordinator.begin_transaction("txn-2".to_string(), 2, 0, None);
1287
1288        // Wait for timeout
1289        std::thread::sleep(Duration::from_millis(5));
1290
1291        // Cleanup
1292        let timed_out = coordinator.cleanup_timed_out_transactions();
1293        assert_eq!(timed_out.len(), 2);
1294
1295        // Transactions should be gone
1296        assert_eq!(coordinator.active_count(), 0);
1297        assert_eq!(coordinator.stats().transactions_timed_out(), 2);
1298    }
1299
1300    #[test]
1301    fn test_add_offsets_to_transaction() {
1302        let coordinator = TransactionCoordinator::new();
1303        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1304
1305        // Add consumer offsets
1306        let result = coordinator.add_offsets_to_transaction(
1307            &"txn-1".to_string(),
1308            1,
1309            0,
1310            "consumer-group-1".to_string(),
1311            vec![
1312                (TransactionPartition::new("input-topic", 0), 42),
1313                (TransactionPartition::new("input-topic", 1), 100),
1314            ],
1315        );
1316        assert_eq!(result, TransactionResult::Ok);
1317
1318        // Verify
1319        let txn = coordinator
1320            .get_transaction(&"txn-1".to_string(), 1)
1321            .unwrap();
1322        assert_eq!(txn.offset_commits.len(), 1);
1323        assert_eq!(txn.offset_commits[0].group_id, "consumer-group-1");
1324        assert_eq!(txn.offset_commits[0].offsets.len(), 2);
1325    }
1326
1327    #[test]
1328    fn test_invalid_state_transitions() {
1329        let coordinator = TransactionCoordinator::new();
1330        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1331
1332        // Prepare commit
1333        coordinator
1334            .prepare_commit(&"txn-1".to_string(), 1, 0)
1335            .unwrap();
1336
1337        // Try to add partitions after prepare - should fail
1338        let result = coordinator.add_partitions_to_transaction(
1339            &"txn-1".to_string(),
1340            1,
1341            0,
1342            vec![TransactionPartition::new("topic-1", 0)],
1343        );
1344        assert!(matches!(
1345            result,
1346            TransactionResult::InvalidTransactionState { .. }
1347        ));
1348    }
1349
1350    #[test]
1351    fn test_abort_from_prepare_commit() {
1352        let coordinator = TransactionCoordinator::new();
1353        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1354
1355        // Prepare commit
1356        coordinator
1357            .prepare_commit(&"txn-1".to_string(), 1, 0)
1358            .unwrap();
1359
1360        // Abort should still be allowed from PrepareCommit
1361        let result = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1362        assert!(result.is_ok());
1363
1364        let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1365        assert_eq!(result, TransactionResult::Ok);
1366    }
1367
1368    #[test]
1369    fn test_transaction_partition_hash() {
1370        let p1 = TransactionPartition::new("topic", 0);
1371        let p2 = TransactionPartition::new("topic", 0);
1372        let p3 = TransactionPartition::new("topic", 1);
1373
1374        assert_eq!(p1, p2);
1375        assert_ne!(p1, p3);
1376
1377        let mut set = HashSet::new();
1378        set.insert(p1.clone());
1379        set.insert(p2); // Should not add (duplicate)
1380        set.insert(p3);
1381        assert_eq!(set.len(), 2);
1382    }
1383
1384    #[test]
1385    fn test_resume_same_transaction() {
1386        let coordinator = TransactionCoordinator::new();
1387
1388        // Begin transaction
1389        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1390
1391        // Try to begin same transaction again - should succeed (idempotent)
1392        let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1393        assert_eq!(result, TransactionResult::Ok);
1394
1395        // Only one transaction should exist
1396        assert_eq!(coordinator.active_count(), 1);
1397        assert_eq!(coordinator.stats().transactions_started(), 1);
1398    }
1399
1400    #[test]
1401    fn test_stats_snapshot() {
1402        let coordinator = TransactionCoordinator::new();
1403        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1404        coordinator
1405            .prepare_commit(&"txn-1".to_string(), 1, 0)
1406            .unwrap();
1407        coordinator.complete_commit(&"txn-1".to_string(), 1);
1408
1409        let snapshot: TransactionStatsSnapshot = coordinator.stats().into();
1410        assert_eq!(snapshot.transactions_started, 1);
1411        assert_eq!(snapshot.transactions_committed, 1);
1412        assert_eq!(snapshot.active_transactions, 0);
1413    }
1414
1415    // =========================================================================
1416    // Isolation Level Tests
1417    // =========================================================================
1418
1419    #[test]
1420    fn test_isolation_level_from_u8() {
1421        assert_eq!(IsolationLevel::from_u8(0), IsolationLevel::ReadUncommitted);
1422        assert_eq!(IsolationLevel::from_u8(1), IsolationLevel::ReadCommitted);
1423        assert_eq!(IsolationLevel::from_u8(2), IsolationLevel::ReadUncommitted); // Invalid defaults
1424        assert_eq!(
1425            IsolationLevel::from_u8(255),
1426            IsolationLevel::ReadUncommitted
1427        );
1428    }
1429
1430    #[test]
1431    fn test_isolation_level_as_u8() {
1432        assert_eq!(IsolationLevel::ReadUncommitted.as_u8(), 0);
1433        assert_eq!(IsolationLevel::ReadCommitted.as_u8(), 1);
1434    }
1435
1436    #[test]
1437    fn test_isolation_level_from_str() {
1438        assert_eq!(
1439            IsolationLevel::from_str("read_uncommitted").unwrap(),
1440            IsolationLevel::ReadUncommitted
1441        );
1442        assert_eq!(
1443            IsolationLevel::from_str("read_committed").unwrap(),
1444            IsolationLevel::ReadCommitted
1445        );
1446        assert_eq!(
1447            IsolationLevel::from_str("READ_UNCOMMITTED").unwrap(),
1448            IsolationLevel::ReadUncommitted
1449        );
1450        assert_eq!(
1451            IsolationLevel::from_str("READ_COMMITTED").unwrap(),
1452            IsolationLevel::ReadCommitted
1453        );
1454        assert!(IsolationLevel::from_str("invalid").is_err());
1455    }
1456
1457    #[test]
1458    fn test_isolation_level_default() {
1459        assert_eq!(IsolationLevel::default(), IsolationLevel::ReadUncommitted);
1460    }
1461
1462    // =========================================================================
1463    // Aborted Transaction Index Tests
1464    // =========================================================================
1465
1466    #[test]
1467    fn test_aborted_transaction_index_basic() {
1468        let index = AbortedTransactionIndex::new();
1469        assert!(index.is_empty());
1470        assert_eq!(index.len(), 0);
1471
1472        // Record an aborted transaction
1473        index.record_abort(1, 100);
1474        assert!(!index.is_empty());
1475        assert_eq!(index.len(), 1);
1476
1477        // Check if offset is from aborted transaction
1478        assert!(index.is_aborted(1, 100)); // first_offset
1479        assert!(index.is_aborted(1, 150)); // within transaction
1480        assert!(!index.is_aborted(1, 50)); // before transaction
1481        assert!(!index.is_aborted(2, 100)); // different producer
1482    }
1483
1484    #[test]
1485    fn test_aborted_transaction_index_multiple() {
1486        let index = AbortedTransactionIndex::new();
1487
1488        // Multiple aborted transactions
1489        index.record_abort(1, 100);
1490        index.record_abort(1, 300);
1491        index.record_abort(2, 200);
1492
1493        assert_eq!(index.len(), 3);
1494
1495        // Check filtering
1496        assert!(index.is_aborted(1, 100));
1497        assert!(index.is_aborted(1, 150)); // between 100 and 300 for producer 1
1498        assert!(index.is_aborted(1, 300));
1499        assert!(index.is_aborted(1, 400)); // after second abort
1500        assert!(index.is_aborted(2, 200));
1501        assert!(index.is_aborted(2, 250));
1502        assert!(!index.is_aborted(2, 100)); // before producer 2's abort
1503    }
1504
1505    #[test]
1506    fn test_aborted_transaction_index_get_range() {
1507        let index = AbortedTransactionIndex::new();
1508
1509        index.record_abort(1, 100);
1510        index.record_abort(2, 200);
1511        index.record_abort(1, 300);
1512
1513        // Get transactions in range
1514        let in_range = index.get_aborted_in_range(150, 250);
1515        assert_eq!(in_range.len(), 1);
1516        assert_eq!(in_range[0].producer_id, 2);
1517        assert_eq!(in_range[0].first_offset, 200);
1518
1519        // Wider range
1520        let in_range = index.get_aborted_in_range(0, 500);
1521        assert_eq!(in_range.len(), 3);
1522
1523        // No transactions in range
1524        let in_range = index.get_aborted_in_range(400, 500);
1525        assert_eq!(in_range.len(), 0);
1526    }
1527
1528    #[test]
1529    fn test_aborted_transaction_index_truncate() {
1530        let index = AbortedTransactionIndex::new();
1531
1532        index.record_abort(1, 100);
1533        index.record_abort(2, 200);
1534        index.record_abort(1, 300);
1535
1536        assert_eq!(index.len(), 3);
1537
1538        // Truncate entries before offset 200
1539        index.truncate_before(200);
1540
1541        assert_eq!(index.len(), 2);
1542
1543        // Only offsets >= 200 should remain
1544        assert!(!index.is_aborted(1, 150)); // old entry removed
1545        assert!(index.is_aborted(2, 200));
1546        assert!(index.is_aborted(1, 300));
1547    }
1548
1549    #[test]
1550    fn test_coordinator_is_aborted() {
1551        let coordinator = TransactionCoordinator::new();
1552
1553        // Start a transaction
1554        coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1555
1556        // Add partition and write
1557        coordinator.add_partitions_to_transaction(
1558            &"txn-1".to_string(),
1559            1,
1560            0,
1561            vec![TransactionPartition::new("test-topic", 0)],
1562        );
1563        coordinator.add_write_to_transaction(
1564            &"txn-1".to_string(),
1565            1,
1566            0,
1567            TransactionPartition::new("test-topic", 0),
1568            0,
1569            100, // offset
1570        );
1571
1572        // Not aborted yet
1573        assert!(!coordinator.is_aborted(1, 100));
1574
1575        // Prepare and complete abort
1576        coordinator
1577            .prepare_abort(&"txn-1".to_string(), 1, 0)
1578            .unwrap();
1579        coordinator.complete_abort(&"txn-1".to_string(), 1);
1580
1581        // Now should be marked as aborted
1582        assert!(coordinator.is_aborted(1, 100));
1583        assert!(coordinator.is_aborted(1, 150)); // messages after first_offset too
1584        assert!(!coordinator.is_aborted(1, 50)); // before first_offset
1585        assert!(!coordinator.is_aborted(2, 100)); // different producer
1586    }
1587}