use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime};
pub type ProducerId = u64;
pub type ProducerEpoch = u16;
pub type SequenceNumber = i32;
pub const NO_SEQUENCE: SequenceNumber = -1;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SequenceResult {
Valid,
Duplicate { cached_offset: u64 },
OutOfOrder {
expected: SequenceNumber,
received: SequenceNumber,
},
Fenced {
current_epoch: ProducerEpoch,
received_epoch: ProducerEpoch,
},
UnknownProducer,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionProducerState {
pub last_sequence: SequenceNumber,
pub last_offset: u64,
#[serde(with = "crate::serde_utils::system_time")]
pub last_activity: SystemTime,
}
impl PartitionProducerState {
pub fn new(sequence: SequenceNumber, offset: u64) -> Self {
Self {
last_sequence: sequence,
last_offset: offset,
last_activity: SystemTime::now(),
}
}
pub fn update(&mut self, sequence: SequenceNumber, offset: u64) {
self.last_sequence = sequence;
self.last_offset = offset;
self.last_activity = SystemTime::now();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProducerMetadata {
pub producer_id: ProducerId,
pub epoch: ProducerEpoch,
pub partitions: HashMap<u32, PartitionProducerState>,
#[serde(with = "crate::serde_utils::system_time")]
pub created_at: SystemTime,
#[serde(with = "crate::serde_utils::system_time")]
pub last_activity: SystemTime,
}
impl ProducerMetadata {
pub fn new(producer_id: ProducerId, epoch: ProducerEpoch) -> Self {
let now = SystemTime::now();
Self {
producer_id,
epoch,
partitions: HashMap::new(),
created_at: now,
last_activity: now,
}
}
pub fn bump_epoch(&mut self) -> ProducerEpoch {
self.epoch = self.epoch.wrapping_add(1);
self.last_activity = SystemTime::now();
self.partitions.clear();
self.epoch
}
pub fn validate_sequence(
&mut self,
partition: u32,
epoch: ProducerEpoch,
sequence: SequenceNumber,
offset: u64,
) -> SequenceResult {
if epoch < self.epoch {
return SequenceResult::Fenced {
current_epoch: self.epoch,
received_epoch: epoch,
};
}
if epoch > self.epoch {
self.epoch = epoch;
self.partitions.clear();
}
self.last_activity = SystemTime::now();
if let Some(state) = self.partitions.get_mut(&partition) {
let expected = state.last_sequence.wrapping_add(1);
if sequence == expected {
state.update(sequence, offset);
SequenceResult::Valid
} else if sequence <= state.last_sequence {
SequenceResult::Duplicate {
cached_offset: state.last_offset,
}
} else {
SequenceResult::OutOfOrder {
expected,
received: sequence,
}
}
} else {
if sequence == 0 {
self.partitions
.insert(partition, PartitionProducerState::new(sequence, offset));
SequenceResult::Valid
} else {
SequenceResult::OutOfOrder {
expected: 0,
received: sequence,
}
}
}
}
pub fn is_idle(&self, timeout: Duration) -> bool {
self.last_activity
.elapsed()
.map(|d| d > timeout)
.unwrap_or(false)
}
}
#[derive(Debug)]
pub struct IdempotentProducerManager {
next_producer_id: AtomicU64,
producers: RwLock<HashMap<ProducerId, ProducerMetadata>>,
idle_timeout: Duration,
}
impl Default for IdempotentProducerManager {
fn default() -> Self {
Self::new()
}
}
impl IdempotentProducerManager {
pub fn new() -> Self {
Self {
next_producer_id: AtomicU64::new(1), producers: RwLock::new(HashMap::new()),
idle_timeout: Duration::from_secs(7 * 24 * 60 * 60), }
}
pub fn with_idle_timeout(idle_timeout: Duration) -> Self {
Self {
next_producer_id: AtomicU64::new(1),
producers: RwLock::new(HashMap::new()),
idle_timeout,
}
}
pub fn init_producer(
&self,
existing_producer_id: Option<ProducerId>,
) -> (ProducerId, ProducerEpoch) {
let mut producers = self.producers.write();
if let Some(pid) = existing_producer_id {
if let Some(metadata) = producers.get_mut(&pid) {
let new_epoch = metadata.bump_epoch();
return (pid, new_epoch);
}
let metadata = ProducerMetadata::new(pid, 0);
producers.insert(pid, metadata);
(pid, 0)
} else {
let pid = self.next_producer_id.fetch_add(1, Ordering::SeqCst);
let metadata = ProducerMetadata::new(pid, 0);
producers.insert(pid, metadata);
(pid, 0)
}
}
pub fn validate_produce(
&self,
producer_id: ProducerId,
epoch: ProducerEpoch,
partition: u32,
sequence: SequenceNumber,
offset: u64,
) -> SequenceResult {
let mut producers = self.producers.write();
if let Some(metadata) = producers.get_mut(&producer_id) {
metadata.validate_sequence(partition, epoch, sequence, offset)
} else {
SequenceResult::UnknownProducer
}
}
pub fn record_produce(
&self,
producer_id: ProducerId,
epoch: ProducerEpoch,
partition: u32,
sequence: SequenceNumber,
offset: u64,
) {
let mut producers = self.producers.write();
if let Some(metadata) = producers.get_mut(&producer_id) {
if metadata.epoch == epoch {
if let Some(state) = metadata.partitions.get_mut(&partition) {
state.update(sequence, offset);
} else {
metadata
.partitions
.insert(partition, PartitionProducerState::new(sequence, offset));
}
metadata.last_activity = SystemTime::now();
}
}
}
pub fn get_producer(&self, producer_id: ProducerId) -> Option<ProducerMetadata> {
self.producers.read().get(&producer_id).cloned()
}
pub fn has_producer(&self, producer_id: ProducerId) -> bool {
self.producers.read().contains_key(&producer_id)
}
pub fn producer_count(&self) -> usize {
self.producers.read().len()
}
pub fn cleanup_idle_producers(&self) -> usize {
let mut producers = self.producers.write();
let before = producers.len();
producers.retain(|_, metadata| !metadata.is_idle(self.idle_timeout));
before - producers.len()
}
pub fn stats(&self) -> IdempotentProducerStats {
let producers = self.producers.read();
let mut total_partitions = 0;
let mut oldest_activity = SystemTime::now();
for metadata in producers.values() {
total_partitions += metadata.partitions.len();
if metadata.last_activity < oldest_activity {
oldest_activity = metadata.last_activity;
}
}
IdempotentProducerStats {
active_producers: producers.len(),
total_partition_states: total_partitions,
oldest_activity,
next_producer_id: self.next_producer_id.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct IdempotentProducerStats {
pub active_producers: usize,
pub total_partition_states: usize,
pub oldest_activity: SystemTime,
pub next_producer_id: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_init_new_producer() {
let manager = IdempotentProducerManager::new();
let (pid1, epoch1) = manager.init_producer(None);
assert_eq!(pid1, 1);
assert_eq!(epoch1, 0);
let (pid2, epoch2) = manager.init_producer(None);
assert_eq!(pid2, 2);
assert_eq!(epoch2, 0);
}
#[test]
fn test_reconnecting_producer_bumps_epoch() {
let manager = IdempotentProducerManager::new();
let (pid, epoch0) = manager.init_producer(None);
assert_eq!(epoch0, 0);
let (pid2, epoch1) = manager.init_producer(Some(pid));
assert_eq!(pid2, pid);
assert_eq!(epoch1, 1);
let (pid3, epoch2) = manager.init_producer(Some(pid));
assert_eq!(pid3, pid);
assert_eq!(epoch2, 2);
}
#[test]
fn test_valid_sequence() {
let manager = IdempotentProducerManager::new();
let (pid, epoch) = manager.init_producer(None);
let result = manager.validate_produce(pid, epoch, 0, 0, 100);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 0, 1, 101);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 0, 2, 102);
assert_eq!(result, SequenceResult::Valid);
}
#[test]
fn test_duplicate_detection() {
let manager = IdempotentProducerManager::new();
let (pid, epoch) = manager.init_producer(None);
let result = manager.validate_produce(pid, epoch, 0, 0, 100);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 0, 0, 999);
assert_eq!(result, SequenceResult::Duplicate { cached_offset: 100 });
let result = manager.validate_produce(pid, epoch, 0, 1, 101);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 0, 1, 999);
assert_eq!(result, SequenceResult::Duplicate { cached_offset: 101 });
let result = manager.validate_produce(pid, epoch, 0, 0, 999);
assert_eq!(result, SequenceResult::Duplicate { cached_offset: 101 });
}
#[test]
fn test_out_of_order_sequence() {
let manager = IdempotentProducerManager::new();
let (pid, epoch) = manager.init_producer(None);
let result = manager.validate_produce(pid, epoch, 0, 0, 100);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 0, 2, 101);
assert_eq!(
result,
SequenceResult::OutOfOrder {
expected: 1,
received: 2
}
);
}
#[test]
fn test_first_message_must_be_zero() {
let manager = IdempotentProducerManager::new();
let (pid, epoch) = manager.init_producer(None);
let result = manager.validate_produce(pid, epoch, 0, 5, 100);
assert_eq!(
result,
SequenceResult::OutOfOrder {
expected: 0,
received: 5
}
);
}
#[test]
fn test_producer_fencing() {
let manager = IdempotentProducerManager::new();
let (pid, epoch0) = manager.init_producer(None);
let result = manager.validate_produce(pid, epoch0, 0, 0, 100);
assert_eq!(result, SequenceResult::Valid);
let (_, epoch1) = manager.init_producer(Some(pid));
assert_eq!(epoch1, 1);
let result = manager.validate_produce(pid, epoch0, 0, 1, 101);
assert_eq!(
result,
SequenceResult::Fenced {
current_epoch: 1,
received_epoch: 0
}
);
let result = manager.validate_produce(pid, epoch1, 0, 0, 101);
assert_eq!(result, SequenceResult::Valid);
}
#[test]
fn test_multiple_partitions() {
let manager = IdempotentProducerManager::new();
let (pid, epoch) = manager.init_producer(None);
let result = manager.validate_produce(pid, epoch, 0, 0, 100);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 1, 0, 200);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 0, 1, 101);
assert_eq!(result, SequenceResult::Valid);
let result = manager.validate_produce(pid, epoch, 1, 1, 201);
assert_eq!(result, SequenceResult::Valid);
}
#[test]
fn test_unknown_producer() {
let manager = IdempotentProducerManager::new();
let result = manager.validate_produce(9999, 0, 0, 0, 100);
assert_eq!(result, SequenceResult::UnknownProducer);
}
#[test]
fn test_cleanup_idle_producers() {
let manager = IdempotentProducerManager::with_idle_timeout(Duration::from_millis(1));
let (pid, _) = manager.init_producer(None);
assert!(manager.has_producer(pid));
std::thread::sleep(Duration::from_millis(10));
let removed = manager.cleanup_idle_producers();
assert_eq!(removed, 1);
assert!(!manager.has_producer(pid));
}
#[test]
fn test_stats() {
let manager = IdempotentProducerManager::new();
let stats = manager.stats();
assert_eq!(stats.active_producers, 0);
assert_eq!(stats.total_partition_states, 0);
let (pid, epoch) = manager.init_producer(None);
manager.validate_produce(pid, epoch, 0, 0, 100);
manager.validate_produce(pid, epoch, 1, 0, 200);
let stats = manager.stats();
assert_eq!(stats.active_producers, 1);
assert_eq!(stats.total_partition_states, 2);
}
#[test]
fn test_epoch_upgrade_clears_state() {
let manager = IdempotentProducerManager::new();
let (pid, epoch0) = manager.init_producer(None);
manager.validate_produce(pid, epoch0, 0, 0, 100);
manager.validate_produce(pid, epoch0, 0, 1, 101);
let new_epoch: ProducerEpoch = 5;
let result = manager.validate_produce(pid, new_epoch, 0, 0, 200);
assert_eq!(result, SequenceResult::Valid);
let metadata = manager.get_producer(pid).unwrap();
assert_eq!(metadata.epoch, 5);
}
}