use chrono::{DateTime, Duration, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct ExactlyOnceConfig {
pub idempotency_ttl: Duration,
pub max_idempotency_keys: usize,
}
impl Default for ExactlyOnceConfig {
fn default() -> Self {
Self {
idempotency_ttl: Duration::hours(24),
max_idempotency_keys: 1_000_000,
}
}
}
#[derive(Debug, Clone, Serialize)]
struct IdempotencyRecord {
event_id: Uuid,
created_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerOffset {
pub pipeline_id: String,
pub stream_id: String,
pub offset: usize,
pub last_event_id: Uuid,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
struct ProcessingReceipt {
pipeline_id: String,
event_id: Uuid,
}
#[derive(Debug, Clone, Serialize)]
pub enum IdempotencyResult {
New,
Duplicate { original_event_id: Uuid },
}
pub struct ExactlyOnceRegistry {
config: ExactlyOnceConfig,
idempotency_keys: DashMap<String, IdempotencyRecord>,
consumer_offsets: DashMap<(String, String), ConsumerOffset>,
processing_receipts: DashMap<ProcessingReceipt, DateTime<Utc>>,
}
impl ExactlyOnceRegistry {
pub fn new(config: ExactlyOnceConfig) -> Self {
Self {
config,
idempotency_keys: DashMap::new(),
consumer_offsets: DashMap::new(),
processing_receipts: DashMap::new(),
}
}
pub fn check_idempotency(&self, key: &str, event_id: Uuid) -> IdempotencyResult {
let now = Utc::now();
if let Some(existing) = self.idempotency_keys.get(key) {
let age = now - existing.created_at;
if age < self.config.idempotency_ttl {
return IdempotencyResult::Duplicate {
original_event_id: existing.event_id,
};
}
drop(existing);
self.idempotency_keys.remove(key);
}
if self.idempotency_keys.len() >= self.config.max_idempotency_keys {
self.evict_expired_keys();
}
self.idempotency_keys.insert(
key.to_string(),
IdempotencyRecord {
event_id,
created_at: now,
},
);
IdempotencyResult::New
}
pub fn commit_offset(&self, pipeline_id: &str, stream_id: &str, offset: usize, event_id: Uuid) {
let key = (pipeline_id.to_string(), stream_id.to_string());
self.consumer_offsets.insert(
key,
ConsumerOffset {
pipeline_id: pipeline_id.to_string(),
stream_id: stream_id.to_string(),
offset,
last_event_id: event_id,
updated_at: Utc::now(),
},
);
}
pub fn get_offset(&self, pipeline_id: &str, stream_id: &str) -> Option<ConsumerOffset> {
let key = (pipeline_id.to_string(), stream_id.to_string());
self.consumer_offsets.get(&key).map(|v| v.clone())
}
pub fn record_processing(&self, pipeline_id: &str, event_id: Uuid) {
let receipt = ProcessingReceipt {
pipeline_id: pipeline_id.to_string(),
event_id,
};
self.processing_receipts.insert(receipt, Utc::now());
}
pub fn was_processed(&self, pipeline_id: &str, event_id: Uuid) -> bool {
let receipt = ProcessingReceipt {
pipeline_id: pipeline_id.to_string(),
event_id,
};
self.processing_receipts.contains_key(&receipt)
}
fn evict_expired_keys(&self) {
let now = Utc::now();
let ttl = self.config.idempotency_ttl;
self.idempotency_keys
.retain(|_, record| now - record.created_at < ttl);
}
pub fn stats(&self) -> ExactlyOnceStats {
ExactlyOnceStats {
idempotency_keys: self.idempotency_keys.len(),
consumer_offsets: self.consumer_offsets.len(),
processing_receipts: self.processing_receipts.len(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ExactlyOnceStats {
pub idempotency_keys: usize,
pub consumer_offsets: usize,
pub processing_receipts: usize,
}
pub fn extract_idempotency_key(metadata: &Option<serde_json::Value>) -> Option<String> {
metadata
.as_ref()?
.get("idempotency_key")
.and_then(|v| v.as_str())
.map(std::string::ToString::to_string)
}
#[cfg(test)]
mod tests {
use super::*;
fn registry() -> ExactlyOnceRegistry {
ExactlyOnceRegistry::new(ExactlyOnceConfig::default())
}
#[test]
fn test_new_idempotency_key() {
let reg = registry();
let id = Uuid::new_v4();
match reg.check_idempotency("key-1", id) {
IdempotencyResult::New => {}
IdempotencyResult::Duplicate { .. } => panic!("Expected New, got Duplicate"),
}
}
#[test]
fn test_duplicate_idempotency_key() {
let reg = registry();
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
reg.check_idempotency("key-1", id1);
match reg.check_idempotency("key-1", id2) {
IdempotencyResult::Duplicate { original_event_id } => {
assert_eq!(original_event_id, id1);
}
IdempotencyResult::New => panic!("Expected Duplicate, got New"),
}
}
#[test]
fn test_expired_key_treated_as_new() {
let reg = ExactlyOnceRegistry::new(ExactlyOnceConfig {
idempotency_ttl: Duration::seconds(-1), ..Default::default()
});
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
reg.check_idempotency("key-1", id1);
match reg.check_idempotency("key-1", id2) {
IdempotencyResult::New => {}
IdempotencyResult::Duplicate { .. } => panic!("Expected New (expired), got Duplicate"),
}
}
#[test]
fn test_consumer_offset_commit_and_get() {
let reg = registry();
let eid = Uuid::new_v4();
reg.commit_offset("pipeline-1", "stream-a", 42, eid);
let offset = reg.get_offset("pipeline-1", "stream-a").unwrap();
assert_eq!(offset.offset, 42);
assert_eq!(offset.last_event_id, eid);
}
#[test]
fn test_consumer_offset_not_found() {
let reg = registry();
assert!(reg.get_offset("nonexistent", "stream").is_none());
}
#[test]
fn test_processing_receipts() {
let reg = registry();
let eid = Uuid::new_v4();
assert!(!reg.was_processed("pipeline-1", eid));
reg.record_processing("pipeline-1", eid);
assert!(reg.was_processed("pipeline-1", eid));
}
#[test]
fn test_processing_receipt_different_pipeline() {
let reg = registry();
let eid = Uuid::new_v4();
reg.record_processing("pipeline-1", eid);
assert!(!reg.was_processed("pipeline-2", eid));
}
#[test]
fn test_extract_idempotency_key() {
let meta = Some(serde_json::json!({"idempotency_key": "abc-123"}));
assert_eq!(extract_idempotency_key(&meta), Some("abc-123".to_string()));
let no_key = Some(serde_json::json!({"source": "web"}));
assert_eq!(extract_idempotency_key(&no_key), None);
assert_eq!(extract_idempotency_key(&None), None);
}
#[test]
fn test_stats() {
let reg = registry();
let id = Uuid::new_v4();
reg.check_idempotency("k1", id);
reg.commit_offset("p1", "s1", 0, id);
reg.record_processing("p1", id);
let stats = reg.stats();
assert_eq!(stats.idempotency_keys, 1);
assert_eq!(stats.consumer_offsets, 1);
assert_eq!(stats.processing_receipts, 1);
}
#[test]
fn test_eviction_on_capacity() {
let reg = ExactlyOnceRegistry::new(ExactlyOnceConfig {
idempotency_ttl: Duration::seconds(-1), max_idempotency_keys: 2,
});
reg.idempotency_keys.insert(
"old-1".to_string(),
IdempotencyRecord {
event_id: Uuid::new_v4(),
created_at: Utc::now() - Duration::hours(48),
},
);
reg.idempotency_keys.insert(
"old-2".to_string(),
IdempotencyRecord {
event_id: Uuid::new_v4(),
created_at: Utc::now() - Duration::hours(48),
},
);
let id = Uuid::new_v4();
reg.check_idempotency("new-key", id);
assert!(reg.idempotency_keys.len() <= 2);
}
}