1use crate::idempotent::{ProducerEpoch, ProducerId};
51use parking_lot::RwLock;
52use serde::{Deserialize, Serialize};
53use std::collections::{HashMap, HashSet};
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::time::{Duration, Instant, SystemTime};
56
57pub type TransactionId = String;
59
60pub const DEFAULT_TRANSACTION_TIMEOUT: Duration = Duration::from_secs(60);
62
63pub const MAX_PENDING_TRANSACTIONS: usize = 5;
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
68pub enum TransactionState {
69 Empty,
71
72 Ongoing,
74
75 PrepareCommit,
77
78 PrepareAbort,
80
81 CompleteCommit,
83
84 CompleteAbort,
86
87 Dead,
89}
90
91impl TransactionState {
92 pub fn is_terminal(&self) -> bool {
94 matches!(
95 self,
96 TransactionState::Empty
97 | TransactionState::CompleteCommit
98 | TransactionState::CompleteAbort
99 | TransactionState::Dead
100 )
101 }
102
103 pub fn is_active(&self) -> bool {
105 matches!(self, TransactionState::Ongoing)
106 }
107
108 pub fn can_commit(&self) -> bool {
110 matches!(self, TransactionState::Ongoing)
111 }
112
113 pub fn can_abort(&self) -> bool {
115 matches!(
116 self,
117 TransactionState::Ongoing
118 | TransactionState::PrepareCommit
119 | TransactionState::PrepareAbort
120 )
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum TransactionResult {
127 Ok,
129
130 InvalidTransactionId,
132
133 InvalidTransactionState {
135 current: TransactionState,
136 expected: &'static str,
137 },
138
139 ProducerFenced {
141 expected_epoch: ProducerEpoch,
142 received_epoch: ProducerEpoch,
143 },
144
145 TransactionTimeout,
147
148 TooManyTransactions,
150
151 ConcurrentTransaction,
153
154 PartitionNotInTransaction { topic: String, partition: u32 },
156}
157
158#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
160pub struct TransactionPartition {
161 pub topic: String,
162 pub partition: u32,
163}
164
165impl TransactionPartition {
166 pub fn new(topic: impl Into<String>, partition: u32) -> Self {
167 Self {
168 topic: topic.into(),
169 partition,
170 }
171 }
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct PendingWrite {
177 pub partition: TransactionPartition,
179
180 pub sequence: i32,
182
183 pub offset: u64,
185
186 #[serde(with = "crate::serde_utils::system_time")]
188 pub timestamp: SystemTime,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct TransactionOffsetCommit {
194 pub group_id: String,
196
197 pub offsets: Vec<(TransactionPartition, i64)>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct Transaction {
204 pub txn_id: TransactionId,
206
207 pub producer_id: ProducerId,
209
210 pub producer_epoch: ProducerEpoch,
212
213 pub state: TransactionState,
215
216 pub partitions: HashSet<TransactionPartition>,
218
219 pub pending_writes: Vec<PendingWrite>,
221
222 pub offset_commits: Vec<TransactionOffsetCommit>,
224
225 #[serde(with = "crate::serde_utils::system_time")]
227 pub started_at: SystemTime,
228
229 #[serde(with = "crate::serde_utils::duration")]
231 pub timeout: Duration,
232
233 #[serde(skip)]
235 pub last_activity: Option<Instant>,
236}
237
238impl Transaction {
239 pub fn new(
241 txn_id: TransactionId,
242 producer_id: ProducerId,
243 producer_epoch: ProducerEpoch,
244 timeout: Duration,
245 ) -> Self {
246 Self {
247 txn_id,
248 producer_id,
249 producer_epoch,
250 state: TransactionState::Ongoing,
251 partitions: HashSet::new(),
252 pending_writes: Vec::new(),
253 offset_commits: Vec::new(),
254 started_at: SystemTime::now(),
255 timeout,
256 last_activity: Some(Instant::now()),
257 }
258 }
259
260 pub fn is_timed_out(&self) -> bool {
262 self.last_activity
263 .map(|t| t.elapsed() > self.timeout)
264 .unwrap_or(true)
265 }
266
267 pub fn touch(&mut self) {
269 self.last_activity = Some(Instant::now());
270 }
271
272 pub fn add_partition(&mut self, partition: TransactionPartition) {
274 self.partitions.insert(partition);
275 self.touch();
276 }
277
278 pub fn add_write(&mut self, partition: TransactionPartition, sequence: i32, offset: u64) {
280 self.pending_writes.push(PendingWrite {
281 partition,
282 sequence,
283 offset,
284 timestamp: SystemTime::now(),
285 });
286 self.touch();
287 }
288
289 pub fn add_offset_commit(
291 &mut self,
292 group_id: String,
293 offsets: Vec<(TransactionPartition, i64)>,
294 ) {
295 self.offset_commits
296 .push(TransactionOffsetCommit { group_id, offsets });
297 self.touch();
298 }
299
300 pub fn write_count(&self) -> usize {
302 self.pending_writes.len()
303 }
304
305 pub fn affected_partitions(&self) -> impl Iterator<Item = &TransactionPartition> {
307 self.partitions.iter()
308 }
309}
310
311#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
313pub enum TransactionMarker {
314 Commit,
316
317 Abort,
319}
320
321#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
325pub enum IsolationLevel {
326 #[default]
329 ReadUncommitted,
330
331 ReadCommitted,
334}
335
336impl IsolationLevel {
337 pub fn as_str(&self) -> &'static str {
339 match self {
340 Self::ReadUncommitted => "read_uncommitted",
341 Self::ReadCommitted => "read_committed",
342 }
343 }
344
345 pub fn from_u8(value: u8) -> Self {
350 match value {
351 1 => Self::ReadCommitted,
352 _ => Self::ReadUncommitted,
353 }
354 }
355
356 pub fn as_u8(&self) -> u8 {
358 match self {
359 Self::ReadUncommitted => 0,
360 Self::ReadCommitted => 1,
361 }
362 }
363}
364
365impl std::str::FromStr for IsolationLevel {
366 type Err = String;
367
368 fn from_str(s: &str) -> Result<Self, Self::Err> {
370 match s.to_lowercase().as_str() {
371 "read_uncommitted" => Ok(Self::ReadUncommitted),
372 "read_committed" => Ok(Self::ReadCommitted),
373 _ => Err(format!("unknown isolation level: {}", s)),
374 }
375 }
376}
377
378impl std::fmt::Display for IsolationLevel {
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 write!(f, "{}", self.as_str())
381 }
382}
383
384#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct AbortedTransaction {
387 pub producer_id: ProducerId,
389 pub first_offset: u64,
391}
392
393#[derive(Debug, Default)]
397pub struct AbortedTransactionIndex {
398 aborted: RwLock<Vec<AbortedTransaction>>,
400}
401
402impl AbortedTransactionIndex {
403 pub fn new() -> Self {
405 Self::default()
406 }
407
408 pub fn record_abort(&self, producer_id: ProducerId, first_offset: u64) {
410 let mut aborted = self.aborted.write();
411 aborted.push(AbortedTransaction {
412 producer_id,
413 first_offset,
414 });
415 aborted.sort_by_key(|a| a.first_offset);
417 }
418
419 pub fn get_aborted_in_range(
423 &self,
424 start_offset: u64,
425 end_offset: u64,
426 ) -> Vec<AbortedTransaction> {
427 let aborted = self.aborted.read();
428 aborted
429 .iter()
430 .filter(|a| a.first_offset >= start_offset && a.first_offset <= end_offset)
431 .cloned()
432 .collect()
433 }
434
435 pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
437 let aborted = self.aborted.read();
438 aborted
439 .iter()
440 .any(|a| a.producer_id == producer_id && a.first_offset <= offset)
441 }
442
443 pub fn truncate_before(&self, offset: u64) {
445 let mut aborted = self.aborted.write();
446 aborted.retain(|a| a.first_offset >= offset);
447 }
448
449 pub fn len(&self) -> usize {
451 self.aborted.read().len()
452 }
453
454 pub fn is_empty(&self) -> bool {
456 self.len() == 0
457 }
458}
459
460#[derive(Debug, Default)]
462pub struct TransactionStats {
463 transactions_started: AtomicU64,
465
466 transactions_committed: AtomicU64,
468
469 transactions_aborted: AtomicU64,
471
472 transactions_timed_out: AtomicU64,
474
475 active_transactions: AtomicU64,
477}
478
479impl TransactionStats {
480 pub fn new() -> Self {
481 Self::default()
482 }
483
484 pub fn record_start(&self) {
485 self.transactions_started.fetch_add(1, Ordering::Relaxed);
486 self.active_transactions.fetch_add(1, Ordering::Relaxed);
487 }
488
489 pub fn record_commit(&self) {
490 self.transactions_committed.fetch_add(1, Ordering::Relaxed);
491 self.active_transactions.fetch_sub(1, Ordering::Relaxed);
492 }
493
494 pub fn record_abort(&self) {
495 self.transactions_aborted.fetch_add(1, Ordering::Relaxed);
496 self.active_transactions.fetch_sub(1, Ordering::Relaxed);
497 }
498
499 pub fn record_timeout(&self) {
500 self.transactions_timed_out.fetch_add(1, Ordering::Relaxed);
501 self.active_transactions.fetch_sub(1, Ordering::Relaxed);
502 }
503
504 pub fn transactions_started(&self) -> u64 {
505 self.transactions_started.load(Ordering::Relaxed)
506 }
507
508 pub fn transactions_committed(&self) -> u64 {
509 self.transactions_committed.load(Ordering::Relaxed)
510 }
511
512 pub fn transactions_aborted(&self) -> u64 {
513 self.transactions_aborted.load(Ordering::Relaxed)
514 }
515
516 pub fn transactions_timed_out(&self) -> u64 {
517 self.transactions_timed_out.load(Ordering::Relaxed)
518 }
519
520 pub fn active_transactions(&self) -> u64 {
521 self.active_transactions.load(Ordering::Relaxed)
522 }
523}
524
525#[derive(Debug, Clone, Serialize, Deserialize)]
527pub struct TransactionStatsSnapshot {
528 pub transactions_started: u64,
529 pub transactions_committed: u64,
530 pub transactions_aborted: u64,
531 pub transactions_timed_out: u64,
532 pub active_transactions: u64,
533}
534
535impl From<&TransactionStats> for TransactionStatsSnapshot {
536 fn from(stats: &TransactionStats) -> Self {
537 Self {
538 transactions_started: stats.transactions_started(),
539 transactions_committed: stats.transactions_committed(),
540 transactions_aborted: stats.transactions_aborted(),
541 transactions_timed_out: stats.transactions_timed_out(),
542 active_transactions: stats.active_transactions(),
543 }
544 }
545}
546
547pub struct TransactionCoordinator {
552 transactions: RwLock<HashMap<(ProducerId, TransactionId), Transaction>>,
554
555 producer_transactions: RwLock<HashMap<ProducerId, TransactionId>>,
557
558 default_timeout: Duration,
560
561 stats: TransactionStats,
563
564 aborted_index: AbortedTransactionIndex,
566}
567
568impl Default for TransactionCoordinator {
569 fn default() -> Self {
570 Self::new()
571 }
572}
573
574impl TransactionCoordinator {
575 pub fn new() -> Self {
577 Self {
578 transactions: RwLock::new(HashMap::new()),
579 producer_transactions: RwLock::new(HashMap::new()),
580 default_timeout: DEFAULT_TRANSACTION_TIMEOUT,
581 stats: TransactionStats::new(),
582 aborted_index: AbortedTransactionIndex::new(),
583 }
584 }
585
586 pub fn with_timeout(timeout: Duration) -> Self {
588 Self {
589 transactions: RwLock::new(HashMap::new()),
590 producer_transactions: RwLock::new(HashMap::new()),
591 default_timeout: timeout,
592 stats: TransactionStats::new(),
593 aborted_index: AbortedTransactionIndex::new(),
594 }
595 }
596
597 pub fn stats(&self) -> &TransactionStats {
599 &self.stats
600 }
601
602 pub fn begin_transaction(
604 &self,
605 txn_id: TransactionId,
606 producer_id: ProducerId,
607 producer_epoch: ProducerEpoch,
608 timeout: Option<Duration>,
609 ) -> TransactionResult {
610 let mut transactions = self.transactions.write();
612 let mut producer_txns = self.producer_transactions.write();
613
614 if let Some(existing_txn_id) = producer_txns.get(&producer_id) {
616 if existing_txn_id != &txn_id {
617 return TransactionResult::ConcurrentTransaction;
618 }
619 if let Some(txn) = transactions.get(&(producer_id, txn_id.clone())) {
621 if txn.producer_epoch != producer_epoch {
622 return TransactionResult::ProducerFenced {
623 expected_epoch: txn.producer_epoch,
624 received_epoch: producer_epoch,
625 };
626 }
627 if txn.state.is_active() {
628 return TransactionResult::Ok; }
630 }
631 }
632
633 let active_count = transactions
635 .values()
636 .filter(|t| t.state.is_active())
637 .count();
638 if active_count >= MAX_PENDING_TRANSACTIONS {
639 return TransactionResult::TooManyTransactions;
640 }
641
642 let txn = Transaction::new(
644 txn_id.clone(),
645 producer_id,
646 producer_epoch,
647 timeout.unwrap_or(self.default_timeout),
648 );
649
650 transactions.insert((producer_id, txn_id.clone()), txn);
651 producer_txns.insert(producer_id, txn_id);
652
653 self.stats.record_start();
654 TransactionResult::Ok
655 }
656
657 pub fn add_partitions_to_transaction(
659 &self,
660 txn_id: &TransactionId,
661 producer_id: ProducerId,
662 producer_epoch: ProducerEpoch,
663 partitions: Vec<TransactionPartition>,
664 ) -> TransactionResult {
665 let mut transactions = self.transactions.write();
666
667 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
668 Some(t) => t,
669 None => return TransactionResult::InvalidTransactionId,
670 };
671
672 if txn.producer_epoch != producer_epoch {
674 return TransactionResult::ProducerFenced {
675 expected_epoch: txn.producer_epoch,
676 received_epoch: producer_epoch,
677 };
678 }
679
680 if !txn.state.is_active() {
682 return TransactionResult::InvalidTransactionState {
683 current: txn.state,
684 expected: "Ongoing",
685 };
686 }
687
688 if txn.is_timed_out() {
690 txn.state = TransactionState::Dead;
691 self.stats.record_timeout();
692 return TransactionResult::TransactionTimeout;
693 }
694
695 for partition in partitions {
697 txn.add_partition(partition);
698 }
699
700 TransactionResult::Ok
701 }
702
703 pub fn add_write_to_transaction(
705 &self,
706 txn_id: &TransactionId,
707 producer_id: ProducerId,
708 producer_epoch: ProducerEpoch,
709 partition: TransactionPartition,
710 sequence: i32,
711 offset: u64,
712 ) -> TransactionResult {
713 let mut transactions = self.transactions.write();
714
715 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
716 Some(t) => t,
717 None => return TransactionResult::InvalidTransactionId,
718 };
719
720 if txn.producer_epoch != producer_epoch {
722 return TransactionResult::ProducerFenced {
723 expected_epoch: txn.producer_epoch,
724 received_epoch: producer_epoch,
725 };
726 }
727
728 if !txn.state.is_active() {
730 return TransactionResult::InvalidTransactionState {
731 current: txn.state,
732 expected: "Ongoing",
733 };
734 }
735
736 if txn.is_timed_out() {
738 txn.state = TransactionState::Dead;
739 self.stats.record_timeout();
740 return TransactionResult::TransactionTimeout;
741 }
742
743 if !txn.partitions.contains(&partition) {
745 return TransactionResult::PartitionNotInTransaction {
746 topic: partition.topic,
747 partition: partition.partition,
748 };
749 }
750
751 txn.add_write(partition, sequence, offset);
753
754 TransactionResult::Ok
755 }
756
757 pub fn add_offsets_to_transaction(
759 &self,
760 txn_id: &TransactionId,
761 producer_id: ProducerId,
762 producer_epoch: ProducerEpoch,
763 group_id: String,
764 offsets: Vec<(TransactionPartition, i64)>,
765 ) -> TransactionResult {
766 let mut transactions = self.transactions.write();
767
768 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
769 Some(t) => t,
770 None => return TransactionResult::InvalidTransactionId,
771 };
772
773 if txn.producer_epoch != producer_epoch {
775 return TransactionResult::ProducerFenced {
776 expected_epoch: txn.producer_epoch,
777 received_epoch: producer_epoch,
778 };
779 }
780
781 if !txn.state.is_active() {
783 return TransactionResult::InvalidTransactionState {
784 current: txn.state,
785 expected: "Ongoing",
786 };
787 }
788
789 if txn.is_timed_out() {
791 txn.state = TransactionState::Dead;
792 self.stats.record_timeout();
793 return TransactionResult::TransactionTimeout;
794 }
795
796 txn.add_offset_commit(group_id, offsets);
798
799 TransactionResult::Ok
800 }
801
802 pub fn prepare_commit(
806 &self,
807 txn_id: &TransactionId,
808 producer_id: ProducerId,
809 producer_epoch: ProducerEpoch,
810 ) -> Result<Transaction, TransactionResult> {
811 let mut transactions = self.transactions.write();
812
813 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
814 Some(t) => t,
815 None => return Err(TransactionResult::InvalidTransactionId),
816 };
817
818 if txn.producer_epoch != producer_epoch {
820 return Err(TransactionResult::ProducerFenced {
821 expected_epoch: txn.producer_epoch,
822 received_epoch: producer_epoch,
823 });
824 }
825
826 if !txn.state.can_commit() {
828 return Err(TransactionResult::InvalidTransactionState {
829 current: txn.state,
830 expected: "Ongoing",
831 });
832 }
833
834 if txn.is_timed_out() {
836 txn.state = TransactionState::Dead;
837 self.stats.record_timeout();
838 return Err(TransactionResult::TransactionTimeout);
839 }
840
841 txn.state = TransactionState::PrepareCommit;
843 txn.touch();
844
845 Ok(txn.clone())
846 }
847
848 pub fn complete_commit(
850 &self,
851 txn_id: &TransactionId,
852 producer_id: ProducerId,
853 ) -> TransactionResult {
854 let mut transactions = self.transactions.write();
855 let mut producer_txns = self.producer_transactions.write();
856
857 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
858 Some(t) => t,
859 None => return TransactionResult::InvalidTransactionId,
860 };
861
862 if txn.state != TransactionState::PrepareCommit {
863 return TransactionResult::InvalidTransactionState {
864 current: txn.state,
865 expected: "PrepareCommit",
866 };
867 }
868
869 txn.state = TransactionState::CompleteCommit;
870
871 transactions.remove(&(producer_id, txn_id.clone()));
873 producer_txns.remove(&producer_id);
874
875 self.stats.record_commit();
876 TransactionResult::Ok
877 }
878
879 pub fn prepare_abort(
881 &self,
882 txn_id: &TransactionId,
883 producer_id: ProducerId,
884 producer_epoch: ProducerEpoch,
885 ) -> Result<Transaction, TransactionResult> {
886 let mut transactions = self.transactions.write();
887
888 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
889 Some(t) => t,
890 None => return Err(TransactionResult::InvalidTransactionId),
891 };
892
893 if txn.producer_epoch != producer_epoch {
895 return Err(TransactionResult::ProducerFenced {
896 expected_epoch: txn.producer_epoch,
897 received_epoch: producer_epoch,
898 });
899 }
900
901 if !txn.state.can_abort() {
903 return Err(TransactionResult::InvalidTransactionState {
904 current: txn.state,
905 expected: "Ongoing or PrepareCommit",
906 });
907 }
908
909 txn.state = TransactionState::PrepareAbort;
911 txn.touch();
912
913 Ok(txn.clone())
914 }
915
916 pub fn complete_abort(
918 &self,
919 txn_id: &TransactionId,
920 producer_id: ProducerId,
921 ) -> TransactionResult {
922 let mut transactions = self.transactions.write();
923 let mut producer_txns = self.producer_transactions.write();
924
925 let txn = match transactions.get_mut(&(producer_id, txn_id.clone())) {
926 Some(t) => t,
927 None => return TransactionResult::InvalidTransactionId,
928 };
929
930 if txn.state != TransactionState::PrepareAbort {
931 return TransactionResult::InvalidTransactionState {
932 current: txn.state,
933 expected: "PrepareAbort",
934 };
935 }
936
937 txn.state = TransactionState::CompleteAbort;
938
939 if let Some(first_offset) = txn.pending_writes.iter().map(|w| w.offset).min() {
942 self.aborted_index.record_abort(producer_id, first_offset);
943 }
944
945 transactions.remove(&(producer_id, txn_id.clone()));
947 producer_txns.remove(&producer_id);
948
949 self.stats.record_abort();
950 TransactionResult::Ok
951 }
952
953 pub fn get_transaction(
955 &self,
956 txn_id: &TransactionId,
957 producer_id: ProducerId,
958 ) -> Option<Transaction> {
959 let transactions = self.transactions.read();
960 transactions.get(&(producer_id, txn_id.clone())).cloned()
961 }
962
963 pub fn has_active_transaction(&self, producer_id: ProducerId) -> bool {
965 let producer_txns = self.producer_transactions.read();
966 producer_txns.contains_key(&producer_id)
967 }
968
969 pub fn get_active_transaction_id(&self, producer_id: ProducerId) -> Option<TransactionId> {
971 let producer_txns = self.producer_transactions.read();
972 producer_txns.get(&producer_id).cloned()
973 }
974
975 pub fn cleanup_timed_out_transactions(&self) -> Vec<Transaction> {
977 let mut timed_out = Vec::new();
978 let mut transactions = self.transactions.write();
979 let mut producer_txns = self.producer_transactions.write();
980
981 let keys_to_remove: Vec<_> = transactions
982 .iter()
983 .filter(|(_, txn)| txn.is_timed_out() && !txn.state.is_terminal())
984 .map(|(k, _)| k.clone())
985 .collect();
986
987 for key in keys_to_remove {
988 if let Some(mut txn) = transactions.remove(&key) {
989 txn.state = TransactionState::Dead;
990 producer_txns.remove(&txn.producer_id);
991 self.stats.record_timeout();
992 timed_out.push(txn);
993 }
994 }
995
996 timed_out
997 }
998
999 pub fn active_count(&self) -> usize {
1001 let transactions = self.transactions.read();
1002 transactions
1003 .values()
1004 .filter(|t| !t.state.is_terminal())
1005 .count()
1006 }
1007
1008 pub fn is_aborted(&self, producer_id: ProducerId, offset: u64) -> bool {
1012 self.aborted_index.is_aborted(producer_id, offset)
1013 }
1014
1015 pub fn get_aborted_in_range(
1019 &self,
1020 start_offset: u64,
1021 end_offset: u64,
1022 ) -> Vec<AbortedTransaction> {
1023 self.aborted_index
1024 .get_aborted_in_range(start_offset, end_offset)
1025 }
1026
1027 pub fn aborted_index(&self) -> &AbortedTransactionIndex {
1029 &self.aborted_index
1030 }
1031}
1032
1033#[cfg(test)]
1038mod tests {
1039 use super::*;
1040 use std::str::FromStr;
1041
1042 #[test]
1043 fn test_transaction_state_transitions() {
1044 assert!(TransactionState::Empty.is_terminal());
1046 assert!(TransactionState::CompleteCommit.is_terminal());
1047 assert!(TransactionState::CompleteAbort.is_terminal());
1048 assert!(TransactionState::Dead.is_terminal());
1049
1050 assert!(!TransactionState::Ongoing.is_terminal());
1052 assert!(!TransactionState::PrepareCommit.is_terminal());
1053 assert!(!TransactionState::PrepareAbort.is_terminal());
1054
1055 assert!(TransactionState::Ongoing.can_commit());
1057 assert!(!TransactionState::Empty.can_commit());
1058 assert!(!TransactionState::PrepareCommit.can_commit());
1059
1060 assert!(TransactionState::Ongoing.can_abort());
1062 assert!(TransactionState::PrepareCommit.can_abort());
1063 assert!(TransactionState::PrepareAbort.can_abort());
1064 assert!(!TransactionState::Empty.can_abort());
1065 }
1066
1067 #[test]
1068 fn test_begin_transaction() {
1069 let coordinator = TransactionCoordinator::new();
1070
1071 let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1073 assert_eq!(result, TransactionResult::Ok);
1074
1075 let txn = coordinator.get_transaction(&"txn-1".to_string(), 1);
1077 assert!(txn.is_some());
1078 let txn = txn.unwrap();
1079 assert_eq!(txn.state, TransactionState::Ongoing);
1080 assert_eq!(txn.producer_id, 1);
1081 assert_eq!(txn.producer_epoch, 0);
1082
1083 assert_eq!(coordinator.stats().transactions_started(), 1);
1085 assert_eq!(coordinator.stats().active_transactions(), 1);
1086 }
1087
1088 #[test]
1089 fn test_concurrent_transaction_rejection() {
1090 let coordinator = TransactionCoordinator::new();
1091
1092 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1094
1095 let result = coordinator.begin_transaction("txn-2".to_string(), 1, 0, None);
1097 assert_eq!(result, TransactionResult::ConcurrentTransaction);
1098 }
1099
1100 #[test]
1101 fn test_add_partitions_to_transaction() {
1102 let coordinator = TransactionCoordinator::new();
1103 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1104
1105 let result = coordinator.add_partitions_to_transaction(
1107 &"txn-1".to_string(),
1108 1,
1109 0,
1110 vec![
1111 TransactionPartition::new("topic-1", 0),
1112 TransactionPartition::new("topic-1", 1),
1113 TransactionPartition::new("topic-2", 0),
1114 ],
1115 );
1116 assert_eq!(result, TransactionResult::Ok);
1117
1118 let txn = coordinator
1120 .get_transaction(&"txn-1".to_string(), 1)
1121 .unwrap();
1122 assert_eq!(txn.partitions.len(), 3);
1123 }
1124
1125 #[test]
1126 fn test_add_write_to_transaction() {
1127 let coordinator = TransactionCoordinator::new();
1128 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1129
1130 let partition = TransactionPartition::new("topic-1", 0);
1131 coordinator.add_partitions_to_transaction(
1132 &"txn-1".to_string(),
1133 1,
1134 0,
1135 vec![partition.clone()],
1136 );
1137
1138 let result =
1140 coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1141 assert_eq!(result, TransactionResult::Ok);
1142
1143 let txn = coordinator
1145 .get_transaction(&"txn-1".to_string(), 1)
1146 .unwrap();
1147 assert_eq!(txn.pending_writes.len(), 1);
1148 assert_eq!(txn.pending_writes[0].offset, 100);
1149 assert_eq!(txn.pending_writes[0].sequence, 0);
1150 }
1151
1152 #[test]
1153 fn test_write_to_non_registered_partition() {
1154 let coordinator = TransactionCoordinator::new();
1155 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1156
1157 let result = coordinator.add_write_to_transaction(
1159 &"txn-1".to_string(),
1160 1,
1161 0,
1162 TransactionPartition::new("topic-1", 0),
1163 0,
1164 100,
1165 );
1166
1167 assert!(matches!(
1168 result,
1169 TransactionResult::PartitionNotInTransaction { .. }
1170 ));
1171 }
1172
1173 #[test]
1174 fn test_commit_transaction() {
1175 let coordinator = TransactionCoordinator::new();
1176 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1177
1178 let partition = TransactionPartition::new("topic-1", 0);
1179 coordinator.add_partitions_to_transaction(
1180 &"txn-1".to_string(),
1181 1,
1182 0,
1183 vec![partition.clone()],
1184 );
1185 coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1186
1187 let txn = coordinator.prepare_commit(&"txn-1".to_string(), 1, 0);
1189 assert!(txn.is_ok());
1190 let txn = txn.unwrap();
1191 assert_eq!(txn.state, TransactionState::PrepareCommit);
1192
1193 let result = coordinator.complete_commit(&"txn-1".to_string(), 1);
1195 assert_eq!(result, TransactionResult::Ok);
1196
1197 assert!(coordinator
1199 .get_transaction(&"txn-1".to_string(), 1)
1200 .is_none());
1201 assert!(!coordinator.has_active_transaction(1));
1202
1203 assert_eq!(coordinator.stats().transactions_committed(), 1);
1205 assert_eq!(coordinator.stats().active_transactions(), 0);
1206 }
1207
1208 #[test]
1209 fn test_abort_transaction() {
1210 let coordinator = TransactionCoordinator::new();
1211 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1212
1213 let partition = TransactionPartition::new("topic-1", 0);
1214 coordinator.add_partitions_to_transaction(
1215 &"txn-1".to_string(),
1216 1,
1217 0,
1218 vec![partition.clone()],
1219 );
1220 coordinator.add_write_to_transaction(&"txn-1".to_string(), 1, 0, partition, 0, 100);
1221
1222 let txn = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1224 assert!(txn.is_ok());
1225
1226 let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1228 assert_eq!(result, TransactionResult::Ok);
1229
1230 assert!(coordinator
1232 .get_transaction(&"txn-1".to_string(), 1)
1233 .is_none());
1234
1235 assert_eq!(coordinator.stats().transactions_aborted(), 1);
1237 }
1238
1239 #[test]
1240 fn test_producer_fencing() {
1241 let coordinator = TransactionCoordinator::new();
1242 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1243
1244 let result = coordinator.add_partitions_to_transaction(
1246 &"txn-1".to_string(),
1247 1,
1248 1, vec![TransactionPartition::new("topic-1", 0)],
1250 );
1251
1252 assert!(matches!(
1253 result,
1254 TransactionResult::ProducerFenced {
1255 expected_epoch: 0,
1256 received_epoch: 1
1257 }
1258 ));
1259 }
1260
1261 #[test]
1262 fn test_transaction_timeout() {
1263 let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1265 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1266
1267 std::thread::sleep(Duration::from_millis(5));
1269
1270 let result = coordinator.add_partitions_to_transaction(
1272 &"txn-1".to_string(),
1273 1,
1274 0,
1275 vec![TransactionPartition::new("topic-1", 0)],
1276 );
1277
1278 assert_eq!(result, TransactionResult::TransactionTimeout);
1279 }
1280
1281 #[test]
1282 fn test_cleanup_timed_out_transactions() {
1283 let coordinator = TransactionCoordinator::with_timeout(Duration::from_millis(1));
1284
1285 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1286 coordinator.begin_transaction("txn-2".to_string(), 2, 0, None);
1287
1288 std::thread::sleep(Duration::from_millis(5));
1290
1291 let timed_out = coordinator.cleanup_timed_out_transactions();
1293 assert_eq!(timed_out.len(), 2);
1294
1295 assert_eq!(coordinator.active_count(), 0);
1297 assert_eq!(coordinator.stats().transactions_timed_out(), 2);
1298 }
1299
1300 #[test]
1301 fn test_add_offsets_to_transaction() {
1302 let coordinator = TransactionCoordinator::new();
1303 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1304
1305 let result = coordinator.add_offsets_to_transaction(
1307 &"txn-1".to_string(),
1308 1,
1309 0,
1310 "consumer-group-1".to_string(),
1311 vec![
1312 (TransactionPartition::new("input-topic", 0), 42),
1313 (TransactionPartition::new("input-topic", 1), 100),
1314 ],
1315 );
1316 assert_eq!(result, TransactionResult::Ok);
1317
1318 let txn = coordinator
1320 .get_transaction(&"txn-1".to_string(), 1)
1321 .unwrap();
1322 assert_eq!(txn.offset_commits.len(), 1);
1323 assert_eq!(txn.offset_commits[0].group_id, "consumer-group-1");
1324 assert_eq!(txn.offset_commits[0].offsets.len(), 2);
1325 }
1326
1327 #[test]
1328 fn test_invalid_state_transitions() {
1329 let coordinator = TransactionCoordinator::new();
1330 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1331
1332 coordinator
1334 .prepare_commit(&"txn-1".to_string(), 1, 0)
1335 .unwrap();
1336
1337 let result = coordinator.add_partitions_to_transaction(
1339 &"txn-1".to_string(),
1340 1,
1341 0,
1342 vec![TransactionPartition::new("topic-1", 0)],
1343 );
1344 assert!(matches!(
1345 result,
1346 TransactionResult::InvalidTransactionState { .. }
1347 ));
1348 }
1349
1350 #[test]
1351 fn test_abort_from_prepare_commit() {
1352 let coordinator = TransactionCoordinator::new();
1353 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1354
1355 coordinator
1357 .prepare_commit(&"txn-1".to_string(), 1, 0)
1358 .unwrap();
1359
1360 let result = coordinator.prepare_abort(&"txn-1".to_string(), 1, 0);
1362 assert!(result.is_ok());
1363
1364 let result = coordinator.complete_abort(&"txn-1".to_string(), 1);
1365 assert_eq!(result, TransactionResult::Ok);
1366 }
1367
1368 #[test]
1369 fn test_transaction_partition_hash() {
1370 let p1 = TransactionPartition::new("topic", 0);
1371 let p2 = TransactionPartition::new("topic", 0);
1372 let p3 = TransactionPartition::new("topic", 1);
1373
1374 assert_eq!(p1, p2);
1375 assert_ne!(p1, p3);
1376
1377 let mut set = HashSet::new();
1378 set.insert(p1.clone());
1379 set.insert(p2); set.insert(p3);
1381 assert_eq!(set.len(), 2);
1382 }
1383
1384 #[test]
1385 fn test_resume_same_transaction() {
1386 let coordinator = TransactionCoordinator::new();
1387
1388 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1390
1391 let result = coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1393 assert_eq!(result, TransactionResult::Ok);
1394
1395 assert_eq!(coordinator.active_count(), 1);
1397 assert_eq!(coordinator.stats().transactions_started(), 1);
1398 }
1399
1400 #[test]
1401 fn test_stats_snapshot() {
1402 let coordinator = TransactionCoordinator::new();
1403 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1404 coordinator
1405 .prepare_commit(&"txn-1".to_string(), 1, 0)
1406 .unwrap();
1407 coordinator.complete_commit(&"txn-1".to_string(), 1);
1408
1409 let snapshot: TransactionStatsSnapshot = coordinator.stats().into();
1410 assert_eq!(snapshot.transactions_started, 1);
1411 assert_eq!(snapshot.transactions_committed, 1);
1412 assert_eq!(snapshot.active_transactions, 0);
1413 }
1414
1415 #[test]
1420 fn test_isolation_level_from_u8() {
1421 assert_eq!(IsolationLevel::from_u8(0), IsolationLevel::ReadUncommitted);
1422 assert_eq!(IsolationLevel::from_u8(1), IsolationLevel::ReadCommitted);
1423 assert_eq!(IsolationLevel::from_u8(2), IsolationLevel::ReadUncommitted); assert_eq!(
1425 IsolationLevel::from_u8(255),
1426 IsolationLevel::ReadUncommitted
1427 );
1428 }
1429
1430 #[test]
1431 fn test_isolation_level_as_u8() {
1432 assert_eq!(IsolationLevel::ReadUncommitted.as_u8(), 0);
1433 assert_eq!(IsolationLevel::ReadCommitted.as_u8(), 1);
1434 }
1435
1436 #[test]
1437 fn test_isolation_level_from_str() {
1438 assert_eq!(
1439 IsolationLevel::from_str("read_uncommitted").unwrap(),
1440 IsolationLevel::ReadUncommitted
1441 );
1442 assert_eq!(
1443 IsolationLevel::from_str("read_committed").unwrap(),
1444 IsolationLevel::ReadCommitted
1445 );
1446 assert_eq!(
1447 IsolationLevel::from_str("READ_UNCOMMITTED").unwrap(),
1448 IsolationLevel::ReadUncommitted
1449 );
1450 assert_eq!(
1451 IsolationLevel::from_str("READ_COMMITTED").unwrap(),
1452 IsolationLevel::ReadCommitted
1453 );
1454 assert!(IsolationLevel::from_str("invalid").is_err());
1455 }
1456
1457 #[test]
1458 fn test_isolation_level_default() {
1459 assert_eq!(IsolationLevel::default(), IsolationLevel::ReadUncommitted);
1460 }
1461
1462 #[test]
1467 fn test_aborted_transaction_index_basic() {
1468 let index = AbortedTransactionIndex::new();
1469 assert!(index.is_empty());
1470 assert_eq!(index.len(), 0);
1471
1472 index.record_abort(1, 100);
1474 assert!(!index.is_empty());
1475 assert_eq!(index.len(), 1);
1476
1477 assert!(index.is_aborted(1, 100)); assert!(index.is_aborted(1, 150)); assert!(!index.is_aborted(1, 50)); assert!(!index.is_aborted(2, 100)); }
1483
1484 #[test]
1485 fn test_aborted_transaction_index_multiple() {
1486 let index = AbortedTransactionIndex::new();
1487
1488 index.record_abort(1, 100);
1490 index.record_abort(1, 300);
1491 index.record_abort(2, 200);
1492
1493 assert_eq!(index.len(), 3);
1494
1495 assert!(index.is_aborted(1, 100));
1497 assert!(index.is_aborted(1, 150)); assert!(index.is_aborted(1, 300));
1499 assert!(index.is_aborted(1, 400)); assert!(index.is_aborted(2, 200));
1501 assert!(index.is_aborted(2, 250));
1502 assert!(!index.is_aborted(2, 100)); }
1504
1505 #[test]
1506 fn test_aborted_transaction_index_get_range() {
1507 let index = AbortedTransactionIndex::new();
1508
1509 index.record_abort(1, 100);
1510 index.record_abort(2, 200);
1511 index.record_abort(1, 300);
1512
1513 let in_range = index.get_aborted_in_range(150, 250);
1515 assert_eq!(in_range.len(), 1);
1516 assert_eq!(in_range[0].producer_id, 2);
1517 assert_eq!(in_range[0].first_offset, 200);
1518
1519 let in_range = index.get_aborted_in_range(0, 500);
1521 assert_eq!(in_range.len(), 3);
1522
1523 let in_range = index.get_aborted_in_range(400, 500);
1525 assert_eq!(in_range.len(), 0);
1526 }
1527
1528 #[test]
1529 fn test_aborted_transaction_index_truncate() {
1530 let index = AbortedTransactionIndex::new();
1531
1532 index.record_abort(1, 100);
1533 index.record_abort(2, 200);
1534 index.record_abort(1, 300);
1535
1536 assert_eq!(index.len(), 3);
1537
1538 index.truncate_before(200);
1540
1541 assert_eq!(index.len(), 2);
1542
1543 assert!(!index.is_aborted(1, 150)); assert!(index.is_aborted(2, 200));
1546 assert!(index.is_aborted(1, 300));
1547 }
1548
1549 #[test]
1550 fn test_coordinator_is_aborted() {
1551 let coordinator = TransactionCoordinator::new();
1552
1553 coordinator.begin_transaction("txn-1".to_string(), 1, 0, None);
1555
1556 coordinator.add_partitions_to_transaction(
1558 &"txn-1".to_string(),
1559 1,
1560 0,
1561 vec![TransactionPartition::new("test-topic", 0)],
1562 );
1563 coordinator.add_write_to_transaction(
1564 &"txn-1".to_string(),
1565 1,
1566 0,
1567 TransactionPartition::new("test-topic", 0),
1568 0,
1569 100, );
1571
1572 assert!(!coordinator.is_aborted(1, 100));
1574
1575 coordinator
1577 .prepare_abort(&"txn-1".to_string(), 1, 0)
1578 .unwrap();
1579 coordinator.complete_abort(&"txn-1".to_string(), 1);
1580
1581 assert!(coordinator.is_aborted(1, 100));
1583 assert!(coordinator.is_aborted(1, 150)); assert!(!coordinator.is_aborted(1, 50)); assert!(!coordinator.is_aborted(2, 100)); }
1587}