#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PartitionPosition {
pub offset_epoch: i32,
pub leader_id: i32,
pub leader_epoch: i32,
pub awaiting_validation: bool,
}
impl Default for PartitionPosition {
fn default() -> Self {
Self {
offset_epoch: -1,
leader_id: -1,
leader_epoch: -1,
awaiting_validation: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ValidationOutcome {
Valid { leader_epoch: i32 },
Truncated { safe_offset: i64 },
}
pub(crate) fn classify(
offset: i64,
offset_epoch: i32,
leader_epoch: i32,
leader_end_offset: i64,
) -> ValidationOutcome {
if leader_end_offset < 0 || leader_epoch < offset_epoch || leader_end_offset < offset {
ValidationOutcome::Truncated {
safe_offset: if leader_end_offset < 0 {
0
} else {
leader_end_offset
},
}
} else {
ValidationOutcome::Valid { leader_epoch }
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn consistent_position_is_valid() {
assert!(classify(100, 2, 2, 150) == ValidationOutcome::Valid { leader_epoch: 2 });
}
#[test]
fn leader_end_below_position_is_truncation() {
assert!(classify(100, 2, 2, 80) == ValidationOutcome::Truncated { safe_offset: 80 });
}
#[test]
fn older_leader_epoch_is_truncation() {
assert!(classify(100, 2, 1, 60) == ValidationOutcome::Truncated { safe_offset: 60 });
}
#[test]
fn undefined_leader_offset_truncates_to_zero() {
assert!(classify(100, 2, -1, -1) == ValidationOutcome::Truncated { safe_offset: 0 });
}
}