1use crate::idempotent::{ProducerEpoch, ProducerId};
51use parking_lot::RwLock;
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, HashSet};
54use std::io::Write;
55use std::path::{Path, PathBuf};
56use std::sync::atomic::{AtomicU64, Ordering};
57use std::time::{Duration, Instant, SystemTime};
58
59pub type TransactionId = String;
61
62pub const DEFAULT_TRANSACTION_TIMEOUT: Duration = Duration::from_secs(60);
64
65pub const MAX_PENDING_TRANSACTIONS: usize = 5;
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
70pub enum TransactionState {
71 Empty,
73
74 Ongoing,
76
77 PrepareCommit,
79
80 PrepareAbort,
82
83 CompleteCommit,
85
86 CompleteAbort,
88
89 Dead,
91}
92
93impl TransactionState {
94 pub fn is_terminal(&self) -> bool {
96 matches!(
97 self,
98 TransactionState::Empty
99 | TransactionState::CompleteCommit
100 | TransactionState::CompleteAbort
101 | TransactionState::Dead
102 )
103 }
104
105 pub fn is_active(&self) -> bool {
107 matches!(self, TransactionState::Ongoing)
108 }
109
110 pub fn can_commit(&self) -> bool {
112 matches!(self, TransactionState::Ongoing)
113 }
114
115 pub fn can_abort(&self) -> bool {
117 matches!(
118 self,
119 TransactionState::Ongoing
120 | TransactionState::PrepareCommit
121 | TransactionState::PrepareAbort
122 )
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
128pub enum TransactionResult {
129 Ok,
131
132 InvalidTransactionId,
134
135 InvalidTransactionState {
137 current: TransactionState,
138 expected: &'static str,
139 },
140
141 ProducerFenced {
143 expected_epoch: ProducerEpoch,
144 received_epoch: ProducerEpoch,
145 },
146
147 TransactionTimeout,
149
150 TooManyTransactions,
152
153 ConcurrentTransaction,
155
156 PartitionNotInTransaction { topic: String, partition: u32 },
158
159 LogWriteError(String),
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
165pub struct TransactionPartition {
166 pub topic: String,
167 pub partition: u32,
168}
169
170impl TransactionPartition {
171 pub fn new(topic: impl Into<String>, partition: u32) -> Self {
172 Self {
173 topic: topic.into(),
174 partition,
175 }
176 }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct PendingWrite {
182 pub partition: TransactionPartition,
184
185 pub sequence: i32,
187
188 pub offset: u64,
190
191 #[serde(with = "crate::serde_utils::system_time")]
193 pub timestamp: SystemTime,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct TransactionOffsetCommit {
199 pub group_id: String,
201
202 pub offsets: Vec<(TransactionPartition, i64)>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct Transaction {
209 pub txn_id: TransactionId,
211
212 pub producer_id: ProducerId,
214
215 pub producer_epoch: ProducerEpoch,
217
218 pub state: TransactionState,
220
221 pub partitions: HashSet<TransactionPartition>,
223
224 pub pending_writes: Vec<PendingWrite>,
226
227 pub offset_commits: Vec<TransactionOffsetCommit>,
229
230 #[serde(with = "crate::serde_utils::system_time")]
232 pub started_at: SystemTime,
233
234 #[serde(with = "crate::serde_utils::duration")]
236 pub timeout: Duration,
237
238 #[serde(skip)]
240 pub last_activity: Option<Instant>,
241}
242
243impl Transaction {
244 pub fn new(
246 txn_id: TransactionId,
247 producer_id: ProducerId,
248 producer_epoch: ProducerEpoch,
249 timeout: Duration,
250 ) -> Self {
251 Self {
252 txn_id,
253 producer_id,
254 producer_epoch,
255 state: TransactionState::Ongoing,
256 partitions: HashSet::new(),
257 pending_writes: Vec::new(),
258 offset_commits: Vec::new(),
259 started_at: SystemTime::now(),
260 timeout,
261 last_activity: Some(Instant::now()),
262 }
263 }
264
265 pub fn is_timed_out(&self) -> bool {
271 self.last_activity
272 .map(|t| t.elapsed() > self.timeout)
273 .unwrap_or(false)
274 }
275
276 pub fn touch(&mut self) {
278 self.last_activity = Some(Instant::now());
279 }
280
281 pub fn add_partition(&mut self, partition: TransactionPartition) {
283 self.partitions.insert(partition);
284 self.touch();
285 }
286
287 pub fn add_write(&mut self, partition: TransactionPartition, sequence: i32, offset: u64) {
289 self.pending_writes.push(PendingWrite {
290 partition,
291 sequence,
292 offset,
293 timestamp: SystemTime::now(),
294 });
295 self.touch();
296 }
297
298 pub fn add_offset_commit(
300 &mut self,
301 group_id: String,
302 offsets: Vec<(TransactionPartition, i64)>,
303 ) {
304 self.offset_commits
305 .push(TransactionOffsetCommit { group_id, offsets });
306 self.touch();
307 }
308
309 pub fn write_count(&self) -> usize {
311 self.pending_writes.len()
312 }
313
314 pub fn affected_partitions(&self) -> impl Iterator<Item = &TransactionPartition> {
316 self.partitions.iter()
317 }
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
322pub enum TransactionMarker {
323 Commit,
325
326 Abort,
328}
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
334pub enum IsolationLevel {
335 #[default]
338 ReadUncommitted,
339
340 ReadCommitted,
343}
344
345impl IsolationLevel {
346 pub fn as_str(&self) -> &'static str {
348 match self {
349 Self::ReadUncommitted => "read_uncommitted",
350 Self::ReadCommitted => "read_committed",
351 }
352 }
353
354 pub fn from_u8(value: u8) -> Self {
359 match value {
360 1 => Self::ReadCommitted,
361 _ => Self::ReadUncommitted,
362 }
363 }
364
365 pub fn as_u8(&self) -> u8 {
367 match self {
368 Self::ReadUncommitted => 0,
369 Self::ReadCommitted => 1,
370 }
371 }
372}
373
374impl std::str::FromStr for IsolationLevel {
375 type Err = String;
376
377 fn from_str(s: &str) -> Result<Self, Self::Err> {
379 match s.to_lowercase().as_str() {
380 "read_uncommitted" => Ok(Self::ReadUncommitted),
381 "read_committed" => Ok(Self::ReadCommitted),
382 _ => Err(format!("unknown isolation level: {}", s)),
383 }
384 }
385}
386
387impl std::fmt::Display for IsolationLevel {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 write!(f, "{}", self.as_str())
390 }
391}
392
393#[derive(Debug, Clone, Serialize, Deserialize)]
395pub struct AbortedTransaction {
396 pub producer_id: ProducerId,
398 pub first_offset: u64,
400 pub last_offset: u64,
405}
406
407#[derive(Debug, Default)]
411pub struct AbortedTransactionIndex {
412 aborted: RwLock<Vec<AbortedTransaction>>,
414}
415
416impl AbortedTransactionIndex {
417 pub fn new() -> Self {
419 Self::default()
420 }
421
422 pub fn record_abort(&self, producer_id: ProducerId, first_offset: u64, last_offset: u64) {
427 let mut aborted = self.aborted.write();
428 aborted.push(AbortedTransaction {
429 producer_id,
430 first_offset,
431 last_offset,
432 });
433 aborted.sort_by_key(|a| a.first_offset);
435 }
436
437 pub fn get_aborted_in_range(
441 &self,
442 start_offset: u64,
443 end_offset: u64,
444 ) -> Vec<AbortedTransaction> {
445 let aborted = self.aborted.read();
446 aborted
447 .iter()
448 .filter(|a| a.first_offset >= start_offset && a.first_offset <= end_offset)
449 .cloned()
450 .collect()
451 }
452
453 pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
459 let aborted = self.aborted.read();
460 aborted.iter().any(|a| {
461 a.producer_id == producer_id && a.first_offset <= offset && offset <= a.last_offset
462 })
463 }
464
465 pub fn truncate_before(&self, offset: u64) {
467 let mut aborted = self.aborted.write();
468 aborted.retain(|a| a.first_offset >= offset);
469 }
470
471 pub fn len(&self) -> usize {
473 self.aborted.read().len()
474 }
475
476 pub fn is_empty(&self) -> bool {
478 self.len() == 0
479 }
480}
481
482#[derive(Debug, Default)]
484pub struct TransactionStats {
485 transactions_started: AtomicU64,
487
488 transactions_committed: AtomicU64,
490
491 transactions_aborted: AtomicU64,
493
494 transactions_timed_out: AtomicU64,
496
497 active_transactions: AtomicU64,
499}
500
501impl TransactionStats {
502 pub fn new() -> Self {
503 Self::default()
504 }
505
506 pub fn record_start(&self) {
507 self.transactions_started.fetch_add(1, Ordering::Relaxed);
508 self.active_transactions.fetch_add(1, Ordering::Relaxed);
509 }
510
511 pub fn record_commit(&self) {
512 self.transactions_committed.fetch_add(1, Ordering::Relaxed);
513 self.active_transactions.fetch_sub(1, Ordering::Relaxed);
514 }
515
516 pub fn record_abort(&self) {
517 self.transactions_aborted.fetch_add(1, Ordering::Relaxed);
518 self.active_transactions.fetch_sub(1, Ordering::Relaxed);
519 }
520
521 pub fn record_timeout(&self) {
522 self.transactions_timed_out.fetch_add(1, Ordering::Relaxed);
523 self.active_transactions.fetch_sub(1, Ordering::Relaxed);
524 }
525
526 pub fn transactions_started(&self) -> u64 {
527 self.transactions_started.load(Ordering::Relaxed)
528 }
529
530 pub fn transactions_committed(&self) -> u64 {
531 self.transactions_committed.load(Ordering::Relaxed)
532 }
533
534 pub fn transactions_aborted(&self) -> u64 {
535 self.transactions_aborted.load(Ordering::Relaxed)
536 }
537
538 pub fn transactions_timed_out(&self) -> u64 {
539 self.transactions_timed_out.load(Ordering::Relaxed)
540 }
541
542 pub fn active_transactions(&self) -> u64 {
543 self.active_transactions.load(Ordering::Relaxed)
544 }
545}
546
547#[derive(Debug, Clone, Serialize, Deserialize)]
549pub struct TransactionStatsSnapshot {
550 pub transactions_started: u64,
551 pub transactions_committed: u64,
552 pub transactions_aborted: u64,
553 pub transactions_timed_out: u64,
554 pub active_transactions: u64,
555}
556
557impl From<&TransactionStats> for TransactionStatsSnapshot {
558 fn from(stats: &TransactionStats) -> Self {
559 Self {
560 transactions_started: stats.transactions_started(),
561 transactions_committed: stats.transactions_committed(),
562 transactions_aborted: stats.transactions_aborted(),
563 transactions_timed_out: stats.transactions_timed_out(),
564 active_transactions: stats.active_transactions(),
565 }
566 }
567}
568
569#[derive(Debug, Clone, Serialize, Deserialize)]
578pub enum TransactionLogEntry {
579 Begin {
581 txn_id: TransactionId,
582 producer_id: ProducerId,
583 producer_epoch: ProducerEpoch,
584 timeout_ms: u64,
585 },
586 AddPartition {
588 txn_id: TransactionId,
589 producer_id: ProducerId,
590 partition: TransactionPartition,
591 },
592 RecordWrite {
594 txn_id: TransactionId,
595 producer_id: ProducerId,
596 partition: TransactionPartition,
597 sequence: i32,
598 offset: u64,
599 },
600 PrepareCommit {
602 txn_id: TransactionId,
603 producer_id: ProducerId,
604 },
605 CompleteCommit {
607 txn_id: TransactionId,
608 producer_id: ProducerId,
609 },
610 PrepareAbort {
612 txn_id: TransactionId,
613 producer_id: ProducerId,
614 },
615 CompleteAbort {
617 txn_id: TransactionId,
618 producer_id: ProducerId,
619 },
620 TimedOut {
622 txn_id: TransactionId,
623 producer_id: ProducerId,
624 },
625 OffsetCommit {
627 txn_id: TransactionId,
628 producer_id: ProducerId,
629 group_id: String,
630 offsets: Vec<(TransactionPartition, i64)>,
631 },
632}
633
634pub struct TransactionLog {
641 path: PathBuf,
643 writer: parking_lot::Mutex<Option<std::io::BufWriter<std::fs::File>>>,
645}
646
647impl TransactionLog {
648 pub fn open(path: impl AsRef<Path>) -> crate::Result<Self> {
650 let path = path.as_ref().to_path_buf();
651 if let Some(parent) = path.parent() {
652 std::fs::create_dir_all(parent)?;
653 }
654 let file = std::fs::OpenOptions::new()
655 .create(true)
656 .append(true)
657 .open(&path)?;
658 Ok(Self {
659 path,
660 writer: parking_lot::Mutex::new(Some(std::io::BufWriter::new(file))),
661 })
662 }
663
664 pub fn noop() -> Self {
667 Self {
668 path: PathBuf::new(),
669 writer: parking_lot::Mutex::new(None),
670 }
671 }
672
673 pub fn append(&self, entry: &TransactionLogEntry) -> crate::Result<()> {
675 let mut guard = self.writer.lock();
676 let writer = match guard.as_mut() {
677 Some(w) => w,
678 None => return Ok(()), };
680
681 let data = postcard::to_allocvec(entry).map_err(|e| crate::Error::Other(e.to_string()))?;
682
683 let mut hasher = crc32fast::Hasher::new();
685 hasher.update(&data);
686 let crc = hasher.finalize();
687
688 writer.write_all(&crc.to_be_bytes())?;
689 writer.write_all(&(data.len() as u32).to_be_bytes())?;
690 writer.write_all(&data)?;
691 writer.flush()?;
692 writer.get_ref().sync_data()?;
696 Ok(())
697 }
698
699 pub fn read_all(path: impl AsRef<Path>) -> crate::Result<Vec<TransactionLogEntry>> {
701 let path = path.as_ref();
702 if !path.exists() {
703 return Ok(Vec::new());
704 }
705 let data = std::fs::read(path)?;
706 let mut entries = Vec::new();
707 let mut pos = 0;
708
709 while pos + 8 <= data.len() {
710 let crc = u32::from_be_bytes(data[pos..pos + 4].try_into().unwrap());
711 let len = u32::from_be_bytes(data[pos + 4..pos + 8].try_into().unwrap()) as usize;
712 pos += 8;
713
714 if pos + len > data.len() {
715 break; }
717
718 let payload = &data[pos..pos + len];
719
720 let mut hasher = crc32fast::Hasher::new();
722 hasher.update(payload);
723 if hasher.finalize() != crc {
724 break; }
726
727 match postcard::from_bytes::<TransactionLogEntry>(payload) {
728 Ok(entry) => entries.push(entry),
729 Err(_) => break, }
731 pos += len;
732 }
733
734 Ok(entries)
735 }
736
737 pub fn truncate(&self) -> crate::Result<()> {
739 let mut guard = self.writer.lock();
740 if guard.is_none() {
741 return Ok(());
742 }
743 *guard = None;
745 let file = std::fs::OpenOptions::new()
746 .create(true)
747 .write(true)
748 .truncate(true)
749 .open(&self.path)?;
750 *guard = Some(std::io::BufWriter::new(file));
751 Ok(())
752 }
753}
754
755pub struct TransactionCoordinator {
760 transactions: RwLock<HashMap<(ProducerId, TransactionId), Transaction>>,
763
764 producer_transactions: RwLock<HashMap<ProducerId, TransactionId>>,
766
767 default_timeout: Duration,
769
770 stats: TransactionStats,
772
773 aborted_index: AbortedTransactionIndex,
775
776 txn_log: TransactionLog,
780}
781
782impl Default for TransactionCoordinator {
783 fn default() -> Self {
784 Self::new()
785 }
786}
787
788impl TransactionCoordinator {
789 pub fn new() -> Self {
791 Self {
792 transactions: RwLock::new(HashMap::new()),
793 producer_transactions: RwLock::new(HashMap::new()),
794 default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
795 stats: TransactionStats::new(),
796 aborted_index: AbortedTransactionIndex::new(),
797 txn_log: TransactionLog::noop(),
798 }
799 }
800
801 pub fn with_timeout(timeout: Duration) -> Self {
803 Self {
804 transactions: RwLock::new(HashMap::new()),
805 producer_transactions: RwLock::new(HashMap::new()),
806 default_timeout: timeout,
807 stats: TransactionStats::new(),
808 aborted_index: AbortedTransactionIndex::new(),
809 txn_log: TransactionLog::noop(),
810 }
811 }
812
813 pub fn with_persistence(path: impl AsRef<Path>) -> crate::Result<Self> {
818 let txn_log = TransactionLog::open(path)?;
819 Ok(Self {
820 transactions: RwLock::new(HashMap::new()),
821 producer_transactions: RwLock::new(HashMap::new()),
822 default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
823 stats: TransactionStats::new(),
824 aborted_index: AbortedTransactionIndex::new(),
825 txn_log,
826 })
827 }
828
829 pub fn recover(path: impl AsRef<Path>) -> crate::Result<Self> {
835 let path = path.as_ref();
836 let entries = TransactionLog::read_all(path)?;
837 let txn_log = TransactionLog::open(path)?;
838
839 let coord = Self {
840 transactions: RwLock::new(HashMap::new()),
841 producer_transactions: RwLock::new(HashMap::new()),
842 default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
843 stats: TransactionStats::new(),
844 aborted_index: AbortedTransactionIndex::new(),
845 txn_log,
846 };
847
848 let mut transactions = coord.transactions.write();
850 let mut producer_txns = coord.producer_transactions.write();
851
852 for entry in entries {
853 match entry {
854 TransactionLogEntry::Begin {
855 txn_id,
856 producer_id,
857 producer_epoch,
858 timeout_ms,
859 } => {
860 let txn = Transaction::new(
861 txn_id.clone(),
862 producer_id,
863 producer_epoch,
864 Duration::from_millis(timeout_ms),
865 );
866 transactions.insert((producer_id, txn_id.clone()), txn);
867 producer_txns.insert(producer_id, txn_id);
868 }
869 TransactionLogEntry::AddPartition {
870 txn_id,
871 producer_id,
872 partition,
873 } => {
874 if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
875 txn.partitions.insert(partition);
876 }
877 }
878 TransactionLogEntry::RecordWrite {
879 txn_id,
880 producer_id,
881 partition,
882 sequence,
883 offset,
884 } => {
885 if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
886 txn.pending_writes.push(PendingWrite {
887 partition,
888 sequence,
889 offset,
890 timestamp: SystemTime::now(),
891 });
892 }
893 }
894 TransactionLogEntry::PrepareCommit {
895 txn_id,
896 producer_id,
897 } => {
898 if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
899 txn.state = TransactionState::PrepareCommit;
900 }
901 }
902 TransactionLogEntry::CompleteCommit {
903 txn_id,
904 producer_id,
905 } => {
906 transactions.remove(&(producer_id, txn_id.clone()));
907 producer_txns.remove(&producer_id);
908 }
909 TransactionLogEntry::PrepareAbort {
910 txn_id,
911 producer_id,
912 } => {
913 if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
914 txn.state = TransactionState::PrepareAbort;
915 }
916 }
917 TransactionLogEntry::CompleteAbort {
918 txn_id,
919 producer_id,
920 } => {
921 if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
922 let first = txn.pending_writes.iter().map(|w| w.offset).min();
925 let last = txn.pending_writes.iter().map(|w| w.offset).max();
926 if let (Some(f), Some(l)) = (first, last) {
927 coord.aborted_index.record_abort(producer_id, f, l);
928 }
929 }
930 transactions.remove(&(producer_id, txn_id.clone()));
931 producer_txns.remove(&producer_id);
932 }
933 TransactionLogEntry::TimedOut {
934 txn_id,
935 producer_id,
936 } => {
937 if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
938 let first = txn.pending_writes.iter().map(|w| w.offset).min();
940 let last = txn.pending_writes.iter().map(|w| w.offset).max();
941 if let (Some(f), Some(l)) = (first, last) {
942 coord.aborted_index.record_abort(producer_id, f, l);
943 }
944 }
945 transactions.remove(&(producer_id, txn_id.clone()));
946 producer_txns.remove(&producer_id);
947 }
948 TransactionLogEntry::OffsetCommit {
949 txn_id,
950 producer_id,
951 group_id,
952 offsets,
953 } => {
954 if let Some(txn) = transactions.get_mut(&(producer_id, txn_id)) {
955 txn.add_offset_commit(group_id, offsets);
956 }
957 }
958 }
959 }
960
961 drop(transactions);
962 drop(producer_txns);
963
964 let active = coord.active_count();
965 if active > 0 {
966 tracing::warn!(
967 "Transaction coordinator recovered {} in-doubt transactions from log",
968 active
969 );
970 }
971
972 Ok(coord)
973 }
974
975 pub fn stats(&self) -> &TransactionStats {
977 &self.stats
978 }
979
980 pub fn begin_transaction(
982 &self,
983 txn_id: TransactionId,
984 producer_id: ProducerId,
985 producer_epoch: ProducerEpoch,
986 timeout: Option<Duration>,
987 ) -> TransactionResult {
988 let mut transactions = self.transactions.write();
990 let mut producer_txns = self.producer_transactions.write();
991
992 if let Some(existing_txn_id) = producer_txns.get(&producer_id) {
994 if existing_txn_id != &txn_id {
995 return TransactionResult::ConcurrentTransaction;
996 }
997 if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
999 if txn.producer_epoch != producer_epoch {
1000 return TransactionResult::ProducerFenced {
1001 expected_epoch: txn.producer_epoch,
1002 received_epoch: producer_epoch,
1003 };
1004 }
1005 if txn.state.is_active() {
1006 return TransactionResult::Ok; }
1008 }
1009 }
1010
1011 let active_count = transactions
1013 .values()
1014 .filter(|t| t.state.is_active())
1015 .count();
1016 if active_count >= MAX_PENDING_TRANSACTIONS {
1017 return TransactionResult::TooManyTransactions;
1018 }
1019
1020 let txn = Transaction::new(
1022 txn_id.clone(),
1023 producer_id,
1024 producer_epoch,
1025 timeout.unwrap_or(self.default_timeout),
1026 );
1027
1028 if let Err(e) = self.txn_log.append(&TransactionLogEntry::Begin {
1031 txn_id: txn_id.clone(),
1032 producer_id,
1033 producer_epoch,
1034 timeout_ms: timeout.unwrap_or(self.default_timeout).as_millis() as u64,
1035 }) {
1036 tracing::error!(producer_id, "Transaction log write failed on begin: {}", e);
1037 return TransactionResult::LogWriteError(e.to_string());
1038 }
1039
1040 transactions.insert((producer_id, txn_id.clone()), txn);
1041 producer_txns.insert(producer_id, txn_id);
1042
1043 self.stats.record_start();
1044 TransactionResult::Ok
1045 }
1046
1047 pub fn add_partitions_to_transaction(
1049 &self,
1050 txn_id: &TransactionId,
1051 producer_id: ProducerId,
1052 producer_epoch: ProducerEpoch,
1053 partitions: Vec<TransactionPartition>,
1054 ) -> TransactionResult {
1055 let mut transactions = self.transactions.write();
1056
1057 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1058 Some(t) => t,
1059 None => return TransactionResult::InvalidTransactionId,
1060 };
1061
1062 if txn.producer_epoch != producer_epoch {
1064 return TransactionResult::ProducerFenced {
1065 expected_epoch: txn.producer_epoch,
1066 received_epoch: producer_epoch,
1067 };
1068 }
1069
1070 if !txn.state.is_active() {
1072 return TransactionResult::InvalidTransactionState {
1073 current: txn.state,
1074 expected: "Ongoing",
1075 };
1076 }
1077
1078 if txn.is_timed_out() {
1080 txn.state = TransactionState::Dead;
1081 self.stats.record_timeout();
1082 return TransactionResult::TransactionTimeout;
1083 }
1084
1085 for partition in &partitions {
1089 if let Err(e) = self.txn_log.append(&TransactionLogEntry::AddPartition {
1090 txn_id: txn_id.clone(),
1091 producer_id,
1092 partition: partition.clone(),
1093 }) {
1094 tracing::error!(
1095 producer_id,
1096 "Transaction log write failed on add_partition: {}",
1097 e
1098 );
1099 return TransactionResult::LogWriteError(e.to_string());
1100 }
1101 }
1102
1103 for partition in partitions {
1105 txn.add_partition(partition);
1106 }
1107
1108 TransactionResult::Ok
1109 }
1110
1111 pub fn validate_transaction_write(
1115 &self,
1116 txn_id: &TransactionId,
1117 producer_id: ProducerId,
1118 producer_epoch: ProducerEpoch,
1119 partition: &TransactionPartition,
1120 ) -> TransactionResult {
1121 let mut transactions = self.transactions.write();
1122
1123 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1124 Some(t) => t,
1125 None => return TransactionResult::InvalidTransactionId,
1126 };
1127
1128 if txn.producer_epoch != producer_epoch {
1129 return TransactionResult::ProducerFenced {
1130 expected_epoch: txn.producer_epoch,
1131 received_epoch: producer_epoch,
1132 };
1133 }
1134
1135 if !txn.state.is_active() {
1136 return TransactionResult::InvalidTransactionState {
1137 current: txn.state,
1138 expected: "Ongoing",
1139 };
1140 }
1141
1142 if txn.is_timed_out() {
1143 txn.state = TransactionState::Dead;
1144 self.stats.record_timeout();
1145 return TransactionResult::TransactionTimeout;
1146 }
1147
1148 if !txn.partitions.contains(partition) {
1149 return TransactionResult::PartitionNotInTransaction {
1150 topic: partition.topic.clone(),
1151 partition: partition.partition,
1152 };
1153 }
1154
1155 TransactionResult::Ok
1156 }
1157
1158 pub fn add_write_to_transaction(
1160 &self,
1161 txn_id: &TransactionId,
1162 producer_id: ProducerId,
1163 producer_epoch: ProducerEpoch,
1164 partition: TransactionPartition,
1165 sequence: i32,
1166 offset: u64,
1167 ) -> TransactionResult {
1168 let mut transactions = self.transactions.write();
1169
1170 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1171 Some(t) => t,
1172 None => return TransactionResult::InvalidTransactionId,
1173 };
1174
1175 if txn.producer_epoch != producer_epoch {
1177 return TransactionResult::ProducerFenced {
1178 expected_epoch: txn.producer_epoch,
1179 received_epoch: producer_epoch,
1180 };
1181 }
1182
1183 if !txn.state.is_active() {
1185 return TransactionResult::InvalidTransactionState {
1186 current: txn.state,
1187 expected: "Ongoing",
1188 };
1189 }
1190
1191 if txn.is_timed_out() {
1193 txn.state = TransactionState::Dead;
1194 self.stats.record_timeout();
1195 return TransactionResult::TransactionTimeout;
1196 }
1197
1198 if !txn.partitions.contains(&partition) {
1200 return TransactionResult::PartitionNotInTransaction {
1201 topic: partition.topic,
1202 partition: partition.partition,
1203 };
1204 }
1205
1206 if let Err(e) = self.txn_log.append(&TransactionLogEntry::RecordWrite {
1208 txn_id: txn_id.clone(),
1209 producer_id,
1210 partition: partition.clone(),
1211 sequence,
1212 offset,
1213 }) {
1214 tracing::error!(
1215 producer_id,
1216 offset,
1217 "Transaction log write failed on record_write: {}",
1218 e
1219 );
1220 return TransactionResult::LogWriteError(e.to_string());
1221 }
1222
1223 txn.add_write(partition, sequence, offset);
1225
1226 TransactionResult::Ok
1227 }
1228
1229 pub fn add_offsets_to_transaction(
1231 &self,
1232 txn_id: &TransactionId,
1233 producer_id: ProducerId,
1234 producer_epoch: ProducerEpoch,
1235 group_id: String,
1236 offsets: Vec<(TransactionPartition, i64)>,
1237 ) -> TransactionResult {
1238 let mut transactions = self.transactions.write();
1239
1240 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1241 Some(t) => t,
1242 None => return TransactionResult::InvalidTransactionId,
1243 };
1244
1245 if txn.producer_epoch != producer_epoch {
1247 return TransactionResult::ProducerFenced {
1248 expected_epoch: txn.producer_epoch,
1249 received_epoch: producer_epoch,
1250 };
1251 }
1252
1253 if !txn.state.is_active() {
1255 return TransactionResult::InvalidTransactionState {
1256 current: txn.state,
1257 expected: "Ongoing",
1258 };
1259 }
1260
1261 if txn.is_timed_out() {
1263 txn.state = TransactionState::Dead;
1264 self.stats.record_timeout();
1265 return TransactionResult::TransactionTimeout;
1266 }
1267
1268 if let Err(e) = self.txn_log.append(&TransactionLogEntry::OffsetCommit {
1272 txn_id: txn_id.clone(),
1273 producer_id,
1274 group_id: group_id.clone(),
1275 offsets: offsets.clone(),
1276 }) {
1277 tracing::error!(
1278 producer_id,
1279 "Transaction log write failed on offset_commit: {}",
1280 e
1281 );
1282 return TransactionResult::LogWriteError(e.to_string());
1283 }
1284
1285 txn.add_offset_commit(group_id, offsets);
1287
1288 TransactionResult::Ok
1289 }
1290
1291 pub fn prepare_commit(
1295 &self,
1296 txn_id: &TransactionId,
1297 producer_id: ProducerId,
1298 producer_epoch: ProducerEpoch,
1299 ) -> Result<Transaction, TransactionResult> {
1300 let mut transactions = self.transactions.write();
1301
1302 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1303 Some(t) => t,
1304 None => return Err(TransactionResult::InvalidTransactionId),
1305 };
1306
1307 if txn.producer_epoch != producer_epoch {
1309 return Err(TransactionResult::ProducerFenced {
1310 expected_epoch: txn.producer_epoch,
1311 received_epoch: producer_epoch,
1312 });
1313 }
1314
1315 if !txn.state.can_commit() {
1317 return Err(TransactionResult::InvalidTransactionState {
1318 current: txn.state,
1319 expected: "Ongoing",
1320 });
1321 }
1322
1323 if txn.is_timed_out() {
1325 txn.state = TransactionState::Dead;
1326 self.stats.record_timeout();
1327 return Err(TransactionResult::TransactionTimeout);
1328 }
1329
1330 txn.state = TransactionState::PrepareCommit;
1332 txn.touch();
1333
1334 if let Err(e) = self.txn_log.append(&TransactionLogEntry::PrepareCommit {
1336 txn_id: txn_id.clone(),
1337 producer_id,
1338 }) {
1339 tracing::error!(
1340 producer_id,
1341 "Transaction log write failed on prepare_commit: {}",
1342 e
1343 );
1344 txn.state = TransactionState::Ongoing;
1346 return Err(TransactionResult::LogWriteError(e.to_string()));
1347 }
1348
1349 Ok(txn.clone())
1350 }
1351
1352 pub fn complete_commit(
1354 &self,
1355 txn_id: &TransactionId,
1356 producer_id: ProducerId,
1357 ) -> TransactionResult {
1358 let mut transactions = self.transactions.write();
1359 let mut producer_txns = self.producer_transactions.write();
1360
1361 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1362 Some(t) => t,
1363 None => return TransactionResult::InvalidTransactionId,
1364 };
1365
1366 if txn.state != TransactionState::PrepareCommit {
1367 return TransactionResult::InvalidTransactionState {
1368 current: txn.state,
1369 expected: "PrepareCommit",
1370 };
1371 }
1372
1373 if let Err(e) = self.txn_log.append(&TransactionLogEntry::CompleteCommit {
1375 txn_id: txn_id.clone(),
1376 producer_id,
1377 }) {
1378 tracing::error!(
1379 producer_id,
1380 "Transaction log write failed on complete_commit: {}",
1381 e
1382 );
1383 return TransactionResult::LogWriteError(e.to_string());
1384 }
1385
1386 txn.state = TransactionState::CompleteCommit;
1388
1389 transactions.remove(&(producer_id, txn_id.clone()));
1391 producer_txns.remove(&producer_id);
1392
1393 self.stats.record_commit();
1394 TransactionResult::Ok
1395 }
1396
1397 pub fn prepare_abort(
1399 &self,
1400 txn_id: &TransactionId,
1401 producer_id: ProducerId,
1402 producer_epoch: ProducerEpoch,
1403 ) -> Result<Transaction, TransactionResult> {
1404 let mut transactions = self.transactions.write();
1405
1406 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1407 Some(t) => t,
1408 None => return Err(TransactionResult::InvalidTransactionId),
1409 };
1410
1411 if txn.producer_epoch != producer_epoch {
1413 return Err(TransactionResult::ProducerFenced {
1414 expected_epoch: txn.producer_epoch,
1415 received_epoch: producer_epoch,
1416 });
1417 }
1418
1419 if !txn.state.can_abort() {
1421 return Err(TransactionResult::InvalidTransactionState {
1422 current: txn.state,
1423 expected: "Ongoing or PrepareCommit",
1424 });
1425 }
1426
1427 txn.state = TransactionState::PrepareAbort;
1429 txn.touch();
1430
1431 if let Err(e) = self.txn_log.append(&TransactionLogEntry::PrepareAbort {
1433 txn_id: txn_id.clone(),
1434 producer_id,
1435 }) {
1436 tracing::error!(
1437 producer_id,
1438 "Transaction log write failed on prepare_abort: {}",
1439 e
1440 );
1441 txn.state = TransactionState::Ongoing;
1442 return Err(TransactionResult::LogWriteError(e.to_string()));
1443 }
1444
1445 Ok(txn.clone())
1446 }
1447
1448 pub fn complete_abort(
1450 &self,
1451 txn_id: &TransactionId,
1452 producer_id: ProducerId,
1453 ) -> TransactionResult {
1454 let mut transactions = self.transactions.write();
1455 let mut producer_txns = self.producer_transactions.write();
1456
1457 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
1458 Some(t) => t,
1459 None => return TransactionResult::InvalidTransactionId,
1460 };
1461
1462 if txn.state != TransactionState::PrepareAbort {
1463 return TransactionResult::InvalidTransactionState {
1464 current: txn.state,
1465 expected: "PrepareAbort",
1466 };
1467 }
1468
1469 if let Err(e) = self.txn_log.append(&TransactionLogEntry::CompleteAbort {
1471 txn_id: txn_id.clone(),
1472 producer_id,
1473 }) {
1474 tracing::error!(
1475 producer_id,
1476 "Transaction log write failed on complete_abort: {}",
1477 e
1478 );
1479 return TransactionResult::LogWriteError(e.to_string());
1480 }
1481
1482 txn.state = TransactionState::CompleteAbort;
1484
1485 let first = txn.pending_writes.iter().map(|w| w.offset).min();
1489 let last = txn.pending_writes.iter().map(|w| w.offset).max();
1490 if let (Some(f), Some(l)) = (first, last) {
1491 self.aborted_index.record_abort(producer_id, f, l);
1492 }
1493
1494 transactions.remove(&(producer_id, txn_id.clone()));
1496 producer_txns.remove(&producer_id);
1497
1498 self.stats.record_abort();
1499 TransactionResult::Ok
1500 }
1501
1502 pub fn get_transaction(
1504 &self,
1505 txn_id: &TransactionId,
1506 producer_id: ProducerId,
1507 ) -> Option<Transaction> {
1508 let transactions = self.transactions.read();
1509 transactions.get(&(producer_id, txn_id.clone())).cloned()
1510 }
1511
1512 pub fn has_active_transaction(&self, producer_id: ProducerId) -> bool {
1514 let producer_txns = self.producer_transactions.read();
1515 producer_txns.contains_key(&producer_id)
1516 }
1517
1518 pub fn get_active_transaction_id(&self, producer_id: ProducerId) -> Option<TransactionId> {
1520 let producer_txns = self.producer_transactions.read();
1521 producer_txns.get(&producer_id).cloned()
1522 }
1523
1524 pub fn cleanup_timed_out_transactions(&self) -> Vec<Transaction> {
1532 let mut timed_out = Vec::new();
1533 let mut transactions = self.transactions.write();
1534 let mut producer_txns = self.producer_transactions.write();
1535
1536 let keys_to_remove: Vec<_> = transactions
1537 .iter()
1538 .filter(|(_, txn)| txn.is_timed_out() && !txn.state.is_terminal())
1539 .map(|(k, _)| k.clone())
1540 .collect();
1541
1542 for key in keys_to_remove {
1543 if let Some(txn) = transactions.get(&key) {
1546 if let Err(e) = self.txn_log.append(&TransactionLogEntry::TimedOut {
1547 txn_id: txn.txn_id.clone(),
1548 producer_id: txn.producer_id,
1549 }) {
1550 tracing::error!(txn.producer_id, "Transaction log write failed on timeout: {} — skipping cleanup, will retry", e);
1551 continue;
1552 }
1553 }
1554
1555 if let Some(mut txn) = transactions.remove(&key) {
1556 txn.state = TransactionState::Dead;
1557 producer_txns.remove(&txn.producer_id);
1558
1559 let first = txn.pending_writes.iter().map(|w| w.offset).min();
1563 let last = txn.pending_writes.iter().map(|w| w.offset).max();
1564 if let (Some(f), Some(l)) = (first, last) {
1565 self.aborted_index.record_abort(txn.producer_id, f, l);
1566 }
1567
1568 self.stats.record_timeout();
1569 self.stats.record_abort();
1570 timed_out.push(txn);
1571 }
1572 }
1573
1574 timed_out
1575 }
1576
1577 pub fn active_count(&self) -> usize {
1579 let transactions = self.transactions.read();
1580 transactions
1581 .values()
1582 .filter(|t| !t.state.is_terminal())
1583 .count()
1584 }
1585
1586 pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
1590 self.aborted_index.is_aborted(producer_id, offset)
1591 }
1592
1593 pub fn get_aborted_in_range(
1597 &self,
1598 start_offset: u64,
1599 end_offset: u64,
1600 ) -> Vec<AbortedTransaction> {
1601 self.aborted_index
1602 .get_aborted_in_range(start_offset, end_offset)
1603 }
1604
1605 pub fn aborted_index(&self) -> &AbortedTransactionIndex {
1607 &self.aborted_index
1608 }
1609}
1610
1611#[cfg(test)]
1616mod tests {
1617 use super::*;
1618 use std::str::FromStr;
1619
1620 #[test]
1621 fn test_transaction_state_transitions() {
1622 assert!(TransactionState::Empty.is_terminal());
1624 assert!(TransactionState::CompleteCommit.is_terminal());
1625 assert!(TransactionState::CompleteAbort.is_terminal());
1626 assert!(TransactionState::Dead.is_terminal());
1627
1628 assert!(!TransactionState::Ongoing.is_terminal());
1630 assert!(!TransactionState::PrepareCommit.is_terminal());
1631 assert!(!TransactionState::PrepareAbort.is_terminal());
1632
1633 assert!(TransactionState::Ongoing.can_commit());
1635 assert!(!TransactionState::Empty.can_commit());
1636 assert!(!TransactionState::PrepareCommit.can_commit());
1637
1638 assert!(TransactionState::Ongoing.can_abort());
1640 assert!(TransactionState::PrepareCommit.can_abort());
1641 assert!(TransactionState::PrepareAbort.can_abort());
1642 assert!(!TransactionState::Empty.can_abort());
1643 }
1644
1645 #[test]
1646 fn test_begin_transaction() {
1647 let coordinator = TransactionCoordinator::new();
1648
1649 let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1651 assert_eq!(result, TransactionResult::Ok);
1652
1653 let txn = coordinator.get_transaction(&"txn-1".to_string(), 1);
1655 assert!(txn.is_some());
1656 let txn = txn.unwrap();
1657 assert_eq!(txn.state, TransactionState::Ongoing);
1658 assert_eq!(txn.producer_id, 1);
1659 assert_eq!(txn.producer_epoch, 0);
1660
1661 assert_eq!(coordinator.stats().transactions_started(), 1);
1663 assert_eq!(coordinator.stats().active_transactions(), 1);
1664 }
1665
1666 #[test]
1667 fn test_concurrent_transaction_rejection() {
1668 let coordinator = TransactionCoordinator::new();
1669
1670 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1672
1673 let result = coordinator.begin_transaction("txn-2".to_string(), 1, 0, None);
1675 assert_eq!(result, TransactionResult::ConcurrentTransaction);
1676 }
1677
1678 #[test]
1679 fn test_add_partitions_to_transaction() {
1680 let coordinator = TransactionCoordinator::new();
1681 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1682
1683 let result = coordinator.add_partitions_to_transaction(
1685 &"txn-1".to_string(),
1686 1,
1687 0,
1688 vec![
1689 TransactionPartition::new("topic-1", 0),
1690 TransactionPartition::new("topic-1", 1),
1691 TransactionPartition::new("topic-2", 0),
1692 ],
1693 );
1694 assert_eq!(result, TransactionResult::Ok);
1695
1696 let txn = coordinator
1698 .get_transaction(&"txn-1".to_string(), 1)
1699 .unwrap();
1700 assert_eq!(txn.partitions.len(), 3);
1701 }
1702
1703 #[test]
1704 fn test_add_write_to_transaction() {
1705 let coordinator = TransactionCoordinator::new();
1706 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1707
1708 let partition = TransactionPartition::new("topic-1", 0);
1709 coordinator.add_partitions_to_transaction(
1710 &"txn-1".to_string(),
1711 1,
1712 0,
1713 vec![partition.clone()],
1714 );
1715
1716 let result =
1718 coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1719 assert_eq!(result, TransactionResult::Ok);
1720
1721 let txn = coordinator
1723 .get_transaction(&"txn-1".to_string(), 1)
1724 .unwrap();
1725 assert_eq!(txn.pending_writes.len(), 1);
1726 assert_eq!(txn.pending_writes[0].offset, 100);
1727 assert_eq!(txn.pending_writes[0].sequence, 0);
1728 }
1729
1730 #[test]
1731 fn test_write_to_non_registered_partition() {
1732 let coordinator = TransactionCoordinator::new();
1733 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1734
1735 let result = coordinator.add_write_to_transaction(
1737 &"txn-1".to_string(),
1738 1,
1739 0,
1740 TransactionPartition::new("topic-1", 0),
1741 0,
1742 100,
1743 );
1744
1745 assert!(matches!(
1746 result,
1747 TransactionResult::PartitionNotInTransaction { .. }
1748 ));
1749 }
1750
1751 #[test]
1752 fn test_commit_transaction() {
1753 let coordinator = TransactionCoordinator::new();
1754 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1755
1756 let partition = TransactionPartition::new("topic-1", 0);
1757 coordinator.add_partitions_to_transaction(
1758 &"txn-1".to_string(),
1759 1,
1760 0,
1761 vec![partition.clone()],
1762 );
1763 coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1764
1765 let txn = coordinator.prepare_commit(&"txn-1".to_string(), 1, 0);
1767 assert!(txn.is_ok());
1768 let txn = txn.unwrap();
1769 assert_eq!(txn.state, TransactionState::PrepareCommit);
1770
1771 let result = coordinator.complete_commit(&"txn-1".to_string(), 1);
1773 assert_eq!(result, TransactionResult::Ok);
1774
1775 assert!(coordinator
1777 .get_transaction(&"txn-1".to_string(), 1)
1778 .is_none());
1779 assert!(!coordinator.has_active_transaction(1));
1780
1781 assert_eq!(coordinator.stats().transactions_committed(), 1);
1783 assert_eq!(coordinator.stats().active_transactions(), 0);
1784 }
1785
1786 #[test]
1787 fn test_abort_transaction() {
1788 let coordinator = TransactionCoordinator::new();
1789 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1790
1791 let partition = TransactionPartition::new("topic-1", 0);
1792 coordinator.add_partitions_to_transaction(
1793 &"txn-1".to_string(),
1794 1,
1795 0,
1796 vec![partition.clone()],
1797 );
1798 coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1799
1800 let txn = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1802 assert!(txn.is_ok());
1803
1804 let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1806 assert_eq!(result, TransactionResult::Ok);
1807
1808 assert!(coordinator
1810 .get_transaction(&"txn-1".to_string(), 1)
1811 .is_none());
1812
1813 assert_eq!(coordinator.stats().transactions_aborted(), 1);
1815 }
1816
1817 #[test]
1818 fn test_producer_fencing() {
1819 let coordinator = TransactionCoordinator::new();
1820 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1821
1822 let result = coordinator.add_partitions_to_transaction(
1824 &"txn-1".to_string(),
1825 1,
1826 1, vec![TransactionPartition::new("topic-1", 0)],
1828 );
1829
1830 assert!(matches!(
1831 result,
1832 TransactionResult::ProducerFenced {
1833 expected_epoch: 0,
1834 received_epoch: 1
1835 }
1836 ));
1837 }
1838
1839 #[test]
1840 fn test_transaction_timeout() {
1841 let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1843 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1844
1845 std::thread::sleep(Duration::from_millis(5));
1847
1848 let result = coordinator.add_partitions_to_transaction(
1850 &"txn-1".to_string(),
1851 1,
1852 0,
1853 vec![TransactionPartition::new("topic-1", 0)],
1854 );
1855
1856 assert_eq!(result, TransactionResult::TransactionTimeout);
1857 }
1858
1859 #[test]
1860 fn test_cleanup_timed_out_transactions() {
1861 let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1862
1863 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1864 coordinator.begin_transaction("txn-2".to_string(), 2, 0, None);
1865
1866 std::thread::sleep(Duration::from_millis(5));
1868
1869 let timed_out = coordinator.cleanup_timed_out_transactions();
1871 assert_eq!(timed_out.len(), 2);
1872
1873 assert_eq!(coordinator.active_count(), 0);
1875 assert_eq!(coordinator.stats().transactions_timed_out(), 2);
1876 }
1877
1878 #[test]
1879 fn test_add_offsets_to_transaction() {
1880 let coordinator = TransactionCoordinator::new();
1881 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1882
1883 let result = coordinator.add_offsets_to_transaction(
1885 &"txn-1".to_string(),
1886 1,
1887 0,
1888 "consumer-group-1".to_string(),
1889 vec![
1890 (TransactionPartition::new("input-topic", 0), 42),
1891 (TransactionPartition::new("input-topic", 1), 100),
1892 ],
1893 );
1894 assert_eq!(result, TransactionResult::Ok);
1895
1896 let txn = coordinator
1898 .get_transaction(&"txn-1".to_string(), 1)
1899 .unwrap();
1900 assert_eq!(txn.offset_commits.len(), 1);
1901 assert_eq!(txn.offset_commits[0].group_id, "consumer-group-1");
1902 assert_eq!(txn.offset_commits[0].offsets.len(), 2);
1903 }
1904
1905 #[test]
1906 fn test_invalid_state_transitions() {
1907 let coordinator = TransactionCoordinator::new();
1908 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1909
1910 coordinator
1912 .prepare_commit(&"txn-1".to_string(), 1, 0)
1913 .unwrap();
1914
1915 let result = coordinator.add_partitions_to_transaction(
1917 &"txn-1".to_string(),
1918 1,
1919 0,
1920 vec![TransactionPartition::new("topic-1", 0)],
1921 );
1922 assert!(matches!(
1923 result,
1924 TransactionResult::InvalidTransactionState { .. }
1925 ));
1926 }
1927
1928 #[test]
1929 fn test_abort_from_prepare_commit() {
1930 let coordinator = TransactionCoordinator::new();
1931 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1932
1933 coordinator
1935 .prepare_commit(&"txn-1".to_string(), 1, 0)
1936 .unwrap();
1937
1938 let result = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1940 assert!(result.is_ok());
1941
1942 let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1943 assert_eq!(result, TransactionResult::Ok);
1944 }
1945
1946 #[test]
1947 fn test_transaction_partition_hash() {
1948 let p1 = TransactionPartition::new("topic", 0);
1949 let p2 = TransactionPartition::new("topic", 0);
1950 let p3 = TransactionPartition::new("topic", 1);
1951
1952 assert_eq!(p1, p2);
1953 assert_ne!(p1, p3);
1954
1955 let mut set = HashSet::new();
1956 set.insert(p1.clone());
1957 set.insert(p2); set.insert(p3);
1959 assert_eq!(set.len(), 2);
1960 }
1961
1962 #[test]
1963 fn test_resume_same_transaction() {
1964 let coordinator = TransactionCoordinator::new();
1965
1966 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1968
1969 let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1971 assert_eq!(result, TransactionResult::Ok);
1972
1973 assert_eq!(coordinator.active_count(), 1);
1975 assert_eq!(coordinator.stats().transactions_started(), 1);
1976 }
1977
1978 #[test]
1979 fn test_stats_snapshot() {
1980 let coordinator = TransactionCoordinator::new();
1981 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1982 coordinator
1983 .prepare_commit(&"txn-1".to_string(), 1, 0)
1984 .unwrap();
1985 coordinator.complete_commit(&"txn-1".to_string(), 1);
1986
1987 let snapshot: TransactionStatsSnapshot = coordinator.stats().into();
1988 assert_eq!(snapshot.transactions_started, 1);
1989 assert_eq!(snapshot.transactions_committed, 1);
1990 assert_eq!(snapshot.active_transactions, 0);
1991 }
1992
1993 #[test]
1998 fn test_isolation_level_from_u8() {
1999 assert_eq!(IsolationLevel::from_u8(0), IsolationLevel::ReadUncommitted);
2000 assert_eq!(IsolationLevel::from_u8(1), IsolationLevel::ReadCommitted);
2001 assert_eq!(IsolationLevel::from_u8(2), IsolationLevel::ReadUncommitted); assert_eq!(
2003 IsolationLevel::from_u8(255),
2004 IsolationLevel::ReadUncommitted
2005 );
2006 }
2007
2008 #[test]
2009 fn test_isolation_level_as_u8() {
2010 assert_eq!(IsolationLevel::ReadUncommitted.as_u8(), 0);
2011 assert_eq!(IsolationLevel::ReadCommitted.as_u8(), 1);
2012 }
2013
2014 #[test]
2015 fn test_isolation_level_from_str() {
2016 assert_eq!(
2017 IsolationLevel::from_str("read_uncommitted").unwrap(),
2018 IsolationLevel::ReadUncommitted
2019 );
2020 assert_eq!(
2021 IsolationLevel::from_str("read_committed").unwrap(),
2022 IsolationLevel::ReadCommitted
2023 );
2024 assert_eq!(
2025 IsolationLevel::from_str("READ_UNCOMMITTED").unwrap(),
2026 IsolationLevel::ReadUncommitted
2027 );
2028 assert_eq!(
2029 IsolationLevel::from_str("READ_COMMITTED").unwrap(),
2030 IsolationLevel::ReadCommitted
2031 );
2032 assert!(IsolationLevel::from_str("invalid").is_err());
2033 }
2034
2035 #[test]
2036 fn test_isolation_level_default() {
2037 assert_eq!(IsolationLevel::default(), IsolationLevel::ReadUncommitted);
2038 }
2039
2040 #[test]
2045 fn test_aborted_transaction_index_basic() {
2046 let index = AbortedTransactionIndex::new();
2047 assert!(index.is_empty());
2048 assert_eq!(index.len(), 0);
2049
2050 index.record_abort(1, 100, 200);
2052 assert!(!index.is_empty());
2053 assert_eq!(index.len(), 1);
2054
2055 assert!(index.is_aborted(1, 100)); assert!(index.is_aborted(1, 150)); assert!(index.is_aborted(1, 200)); assert!(!index.is_aborted(1, 201)); assert!(!index.is_aborted(1, 50)); assert!(!index.is_aborted(2, 100)); }
2063
2064 #[test]
2065 fn test_aborted_transaction_index_multiple() {
2066 let index = AbortedTransactionIndex::new();
2067
2068 index.record_abort(1, 100, 199);
2070 index.record_abort(1, 300, 399);
2071 index.record_abort(2, 200, 299);
2072
2073 assert_eq!(index.len(), 3);
2074
2075 assert!(index.is_aborted(1, 100));
2077 assert!(index.is_aborted(1, 150)); assert!(!index.is_aborted(1, 250)); assert!(index.is_aborted(1, 300));
2080 assert!(index.is_aborted(1, 399)); assert!(!index.is_aborted(1, 400)); assert!(index.is_aborted(2, 200));
2083 assert!(index.is_aborted(2, 250));
2084 assert!(!index.is_aborted(2, 100)); assert!(!index.is_aborted(2, 300)); }
2087
2088 #[test]
2089 fn test_aborted_transaction_index_get_range() {
2090 let index = AbortedTransactionIndex::new();
2091
2092 index.record_abort(1, 100, 199);
2093 index.record_abort(2, 200, 299);
2094 index.record_abort(1, 300, 399);
2095
2096 let in_range = index.get_aborted_in_range(150, 250);
2098 assert_eq!(in_range.len(), 1);
2099 assert_eq!(in_range[0].producer_id, 2);
2100 assert_eq!(in_range[0].first_offset, 200);
2101
2102 let in_range = index.get_aborted_in_range(0, 500);
2104 assert_eq!(in_range.len(), 3);
2105
2106 let in_range = index.get_aborted_in_range(400, 500);
2108 assert_eq!(in_range.len(), 0);
2109 }
2110
2111 #[test]
2112 fn test_aborted_transaction_index_truncate() {
2113 let index = AbortedTransactionIndex::new();
2114
2115 index.record_abort(1, 100, 199);
2116 index.record_abort(2, 200, 299);
2117 index.record_abort(1, 300, 399);
2118
2119 assert_eq!(index.len(), 3);
2120
2121 index.truncate_before(200);
2123
2124 assert_eq!(index.len(), 2);
2125
2126 assert!(!index.is_aborted(1, 150)); assert!(index.is_aborted(2, 200));
2129 assert!(index.is_aborted(1, 300));
2130 }
2131
2132 #[test]
2133 fn test_coordinator_is_aborted() {
2134 let coordinator = TransactionCoordinator::new();
2135
2136 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
2138
2139 coordinator.add_partitions_to_transaction(
2141 &"txn-1".to_string(),
2142 1,
2143 0,
2144 vec![TransactionPartition::new("test-topic", 0)],
2145 );
2146 coordinator.add_write_to_transaction(
2147 &"txn-1".to_string(),
2148 1,
2149 0,
2150 TransactionPartition::new("test-topic", 0),
2151 0,
2152 100, );
2154
2155 assert!(!coordinator.is_aborted(1, 100));
2157
2158 coordinator
2160 .prepare_abort(&"txn-1".to_string(), 1, 0)
2161 .unwrap();
2162 coordinator.complete_abort(&"txn-1".to_string(), 1);
2163
2164 assert!(coordinator.is_aborted(1, 100));
2166 assert!(!coordinator.is_aborted(1, 150));
2168 assert!(!coordinator.is_aborted(1, 50)); assert!(!coordinator.is_aborted(2, 100)); }
2171
2172 #[test]
2177 fn test_transaction_log_round_trip() {
2178 let dir = tempfile::tempdir().unwrap();
2179 let log_path = dir.path().join("txn.log");
2180
2181 let log = TransactionLog::open(&log_path).unwrap();
2183 log.append(&TransactionLogEntry::Begin {
2184 txn_id: "txn-1".to_string(),
2185 producer_id: 42,
2186 producer_epoch: 0,
2187 timeout_ms: 30000,
2188 })
2189 .unwrap();
2190 log.append(&TransactionLogEntry::AddPartition {
2191 txn_id: "txn-1".to_string(),
2192 producer_id: 42,
2193 partition: TransactionPartition::new("topic-a", 0),
2194 })
2195 .unwrap();
2196 log.append(&TransactionLogEntry::RecordWrite {
2197 txn_id: "txn-1".to_string(),
2198 producer_id: 42,
2199 partition: TransactionPartition::new("topic-a", 0),
2200 sequence: 0,
2201 offset: 100,
2202 })
2203 .unwrap();
2204 drop(log);
2205
2206 let entries = TransactionLog::read_all(&log_path).unwrap();
2208 assert_eq!(entries.len(), 3);
2209 assert!(
2210 matches!(&entries[0], TransactionLogEntry::Begin { txn_id, producer_id, .. } if txn_id == "txn-1" && *producer_id == 42)
2211 );
2212 assert!(
2213 matches!(&entries[1], TransactionLogEntry::AddPartition { partition, .. } if partition.topic == "topic-a" && partition.partition == 0)
2214 );
2215 assert!(
2216 matches!(&entries[2], TransactionLogEntry::RecordWrite { offset, .. } if *offset == 100)
2217 );
2218 }
2219
2220 #[test]
2221 fn test_transaction_log_crc_corruption_detection() {
2222 let dir = tempfile::tempdir().unwrap();
2223 let log_path = dir.path().join("txn.log");
2224
2225 let log = TransactionLog::open(&log_path).unwrap();
2226 log.append(&TransactionLogEntry::Begin {
2227 txn_id: "txn-1".to_string(),
2228 producer_id: 1,
2229 producer_epoch: 0,
2230 timeout_ms: 5000,
2231 })
2232 .unwrap();
2233 log.append(&TransactionLogEntry::PrepareCommit {
2234 txn_id: "txn-1".to_string(),
2235 producer_id: 1,
2236 })
2237 .unwrap();
2238 drop(log);
2239
2240 let mut data = std::fs::read(&log_path).unwrap();
2242 assert!(data.len() > 10);
2243 data[10] ^= 0xFF; std::fs::write(&log_path, &data).unwrap();
2245
2246 let entries = TransactionLog::read_all(&log_path).unwrap();
2248 assert!(entries.len() <= 1);
2251 }
2252
2253 #[test]
2254 fn test_coordinator_with_persistence_commit_flow() {
2255 let dir = tempfile::tempdir().unwrap();
2256 let log_path = dir.path().join("txn.log");
2257
2258 let coord = TransactionCoordinator::with_persistence(&log_path).unwrap();
2260 assert_eq!(
2261 coord.begin_transaction("txn-1".to_string(), 1, 0, None),
2262 TransactionResult::Ok
2263 );
2264 assert_eq!(
2265 coord.add_partitions_to_transaction(
2266 &"txn-1".to_string(),
2267 1,
2268 0,
2269 vec![TransactionPartition::new("topic", 0)],
2270 ),
2271 TransactionResult::Ok
2272 );
2273 assert_eq!(
2274 coord.add_write_to_transaction(
2275 &"txn-1".to_string(),
2276 1,
2277 0,
2278 TransactionPartition::new("topic", 0),
2279 0,
2280 500,
2281 ),
2282 TransactionResult::Ok
2283 );
2284 coord.prepare_commit(&"txn-1".to_string(), 1, 0).unwrap();
2285 assert_eq!(
2286 coord.complete_commit(&"txn-1".to_string(), 1),
2287 TransactionResult::Ok
2288 );
2289
2290 let entries = TransactionLog::read_all(&log_path).unwrap();
2293 assert_eq!(entries.len(), 5);
2294 }
2295
2296 #[test]
2297 fn test_coordinator_recovery_from_crash() {
2298 let dir = tempfile::tempdir().unwrap();
2299 let log_path = dir.path().join("txn.log");
2300
2301 {
2303 let coord = TransactionCoordinator::with_persistence(&log_path).unwrap();
2304 coord.begin_transaction("txn-1".to_string(), 1, 0, None);
2305 coord.add_partitions_to_transaction(
2306 &"txn-1".to_string(),
2307 1,
2308 0,
2309 vec![TransactionPartition::new("topic", 0)],
2310 );
2311 coord.add_write_to_transaction(
2312 &"txn-1".to_string(),
2313 1,
2314 0,
2315 TransactionPartition::new("topic", 0),
2316 0,
2317 42,
2318 );
2319 coord.prepare_commit(&"txn-1".to_string(), 1, 0).unwrap();
2320 }
2322
2323 let coord = TransactionCoordinator::recover(&log_path).unwrap();
2325
2326 let txn = coord.get_transaction(&"txn-1".to_string(), 1);
2328 assert!(txn.is_some(), "Transaction should be recovered from WAL");
2329 let txn = txn.unwrap();
2330 assert_eq!(txn.state, TransactionState::PrepareCommit);
2331 assert_eq!(txn.pending_writes.len(), 1);
2332 assert_eq!(txn.pending_writes[0].offset, 42);
2333
2334 assert_eq!(
2336 coord.complete_commit(&"txn-1".to_string(), 1),
2337 TransactionResult::Ok
2338 );
2339 assert_eq!(coord.active_count(), 0);
2340 }
2341
2342 #[test]
2343 fn test_coordinator_recovery_abort_flow() {
2344 let dir = tempfile::tempdir().unwrap();
2345 let log_path = dir.path().join("txn.log");
2346
2347 {
2349 let coord = TransactionCoordinator::with_persistence(&log_path).unwrap();
2350 coord.begin_transaction("txn-a".to_string(), 10, 0, None);
2351 coord.add_partitions_to_transaction(
2352 &"txn-a".to_string(),
2353 10,
2354 0,
2355 vec![TransactionPartition::new("t", 0)],
2356 );
2357 coord.add_write_to_transaction(
2358 &"txn-a".to_string(),
2359 10,
2360 0,
2361 TransactionPartition::new("t", 0),
2362 0,
2363 200,
2364 );
2365 coord.prepare_abort(&"txn-a".to_string(), 10, 0).unwrap();
2366 coord.complete_abort(&"txn-a".to_string(), 10);
2367 }
2368
2369 let coord = TransactionCoordinator::recover(&log_path).unwrap();
2371 assert_eq!(coord.active_count(), 0);
2372 assert!(coord.is_aborted(10, 200));
2374 }
2375
2376 #[test]
2377 fn test_transaction_log_noop_is_silent() {
2378 let log = TransactionLog::noop();
2379 assert!(log
2381 .append(&TransactionLogEntry::Begin {
2382 txn_id: "x".to_string(),
2383 producer_id: 1,
2384 producer_epoch: 0,
2385 timeout_ms: 1000,
2386 })
2387 .is_ok());
2388 }
2389
2390 #[test]
2391 fn test_log_write_error_propagated() {
2392 let coord = TransactionCoordinator::new(); let result = coord.begin_transaction("txn-ok".to_string(), 1, 0, None);
2397 assert_eq!(result, TransactionResult::Ok);
2398 }
2399}