#![allow(dead_code)]
use std::collections::HashSet;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TxnState {
Empty,
Ongoing,
PrepareCommit,
PrepareAbort,
CompleteCommit,
CompleteAbort,
Dead,
}
impl TxnState {
pub fn can_transition_to(self, other: TxnState) -> bool {
use TxnState::{
CompleteAbort, CompleteCommit, Dead, Empty, Ongoing, PrepareAbort, PrepareCommit,
};
matches!(
(self, other),
(Empty | CompleteCommit | CompleteAbort, Empty)
| (Empty | Ongoing | CompleteCommit | CompleteAbort, Ongoing)
| (Ongoing, PrepareCommit | PrepareAbort)
| (PrepareCommit, CompleteCommit)
| (PrepareAbort, CompleteAbort)
| (CompleteCommit | CompleteAbort, Dead)
)
}
#[must_use]
pub fn to_kafka_status(self) -> i8 {
match self {
TxnState::Empty => 0,
TxnState::Ongoing => 1,
TxnState::PrepareCommit => 2,
TxnState::PrepareAbort => 3,
TxnState::CompleteCommit => 4,
TxnState::CompleteAbort => 5,
TxnState::Dead => 6,
}
}
#[must_use]
pub fn from_kafka_status(id: i8) -> Option<TxnState> {
Some(match id {
0 => TxnState::Empty,
1 => TxnState::Ongoing,
2 => TxnState::PrepareCommit,
3 => TxnState::PrepareAbort,
4 => TxnState::CompleteCommit,
5 => TxnState::CompleteAbort,
6 => TxnState::Dead,
_ => return None,
})
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct TopicPartition {
pub topic: String,
pub partition: i32,
}
#[derive(Debug, Clone)]
pub struct TxnEntry {
pub transactional_id: String,
pub producer_id: i64,
pub producer_epoch: i16,
pub state: TxnState,
pub txn_timeout_ms: i32,
pub partitions: HashSet<TopicPartition>,
pub prev_producer_id: i64,
pub next_producer_id: i64,
pub last_update_ms: i64,
pub start_ms: i64,
}
impl TxnEntry {
pub fn new_empty(
transactional_id: String,
producer_id: i64,
producer_epoch: i16,
txn_timeout_ms: i32,
now_ms: i64,
) -> Self {
Self {
transactional_id,
producer_id,
producer_epoch,
state: TxnState::Empty,
txn_timeout_ms,
partitions: HashSet::new(),
prev_producer_id: -1,
next_producer_id: -1,
last_update_ms: now_ms,
start_ms: now_ms,
}
}
}
#[cfg(test)]
mod tests {
use assert2::assert;
use super::*;
#[test]
fn empty_to_ongoing_allowed() {
assert!(TxnState::Empty.can_transition_to(TxnState::Ongoing));
}
#[test]
fn empty_to_prepare_commit_disallowed() {
assert!(!TxnState::Empty.can_transition_to(TxnState::PrepareCommit));
}
#[test]
fn ongoing_to_complete_commit_disallowed_without_prepare() {
assert!(!TxnState::Ongoing.can_transition_to(TxnState::CompleteCommit));
}
#[test]
fn complete_commit_to_empty_for_reuse() {
assert!(TxnState::CompleteCommit.can_transition_to(TxnState::Empty));
}
#[test]
fn kafka_status_round_trips_all_states() {
for s in [
TxnState::Empty,
TxnState::Ongoing,
TxnState::PrepareCommit,
TxnState::PrepareAbort,
TxnState::CompleteCommit,
TxnState::CompleteAbort,
TxnState::Dead,
] {
assert!(TxnState::from_kafka_status(s.to_kafka_status()) == Some(s));
}
}
#[test]
fn kafka_status_values_match_kafka() {
assert!(TxnState::Ongoing.to_kafka_status() == 1);
assert!(TxnState::Empty.to_kafka_status() == 0);
assert!(TxnState::Dead.to_kafka_status() == 6);
assert!(TxnState::from_kafka_status(7).is_none());
assert!(TxnState::from_kafka_status(-1).is_none());
}
}