Skip to main content

oxirs_stream/
idempotent_delivery.rs

1//! # Idempotent Delivery Manager
2//!
3//! Provides exactly-once delivery guarantees with idempotency keys for stream
4//! events. Prevents duplicate processing by tracking idempotency keys with
5//! configurable TTL-based expiration and storage backends.
6//!
7//! ## Features
8//!
9//! - **Idempotency key tracking**: SHA-256 based key generation from event content
10//! - **TTL-based expiration**: Keys expire after configurable time-to-live
11//! - **Outcome caching**: Cache delivery results for idempotent retries
12//! - **Storage backends**: In-memory (default) or pluggable persistent backends
13//! - **Batch processing**: Efficient batch idempotency checks
14//! - **Metrics and statistics**: Track duplicate rates, key cardinality, etc.
15//! - **Partition-aware**: Per-partition idempotency tracking for ordered delivery
16
17use crate::error::StreamError;
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use sha2::{Digest, Sha256};
21use std::collections::{BTreeMap, HashMap};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::RwLock;
25use tracing::{debug, info};
26
27// ─────────────────────────────────────────────
28// Configuration
29// ─────────────────────────────────────────────
30
31/// Configuration for idempotent delivery.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct IdempotentDeliveryConfig {
34    /// Time-to-live for idempotency keys (default: 1 hour).
35    pub key_ttl: Duration,
36    /// Maximum number of tracked keys before forced eviction (default: 1M).
37    pub max_keys: usize,
38    /// How often to run the eviction sweep (default: 60s).
39    pub eviction_interval: Duration,
40    /// Whether to cache delivery outcomes for fast retries (default: true).
41    pub cache_outcomes: bool,
42    /// Maximum outcome cache size (default: 100K).
43    pub max_cached_outcomes: usize,
44    /// Whether to enable per-partition tracking (default: true).
45    pub partition_aware: bool,
46    /// Hash algorithm for key generation (SHA-256).
47    pub hash_algorithm: HashAlgorithm,
48}
49
50impl Default for IdempotentDeliveryConfig {
51    fn default() -> Self {
52        Self {
53            key_ttl: Duration::from_secs(3600),
54            max_keys: 1_000_000,
55            eviction_interval: Duration::from_secs(60),
56            cache_outcomes: true,
57            max_cached_outcomes: 100_000,
58            partition_aware: true,
59            hash_algorithm: HashAlgorithm::Sha256,
60        }
61    }
62}
63
64/// Hash algorithm for idempotency key generation.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66pub enum HashAlgorithm {
67    /// SHA-256 (default, recommended).
68    Sha256,
69    /// Fast non-cryptographic hash (FNV-1a style, for high throughput).
70    FastHash,
71}
72
73// ─────────────────────────────────────────────
74// Idempotency Key
75// ─────────────────────────────────────────────
76
77/// A unique idempotency key derived from event content.
78#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
79pub struct IdempotencyKey {
80    /// The hex-encoded hash digest.
81    pub digest: String,
82    /// Optional partition the key belongs to.
83    pub partition: Option<u32>,
84    /// Source producer id (if available).
85    pub producer_id: Option<String>,
86}
87
88impl IdempotencyKey {
89    /// Create a new idempotency key from raw content bytes.
90    pub fn from_content(content: &[u8], algorithm: HashAlgorithm) -> Self {
91        let digest = match algorithm {
92            HashAlgorithm::Sha256 => {
93                let mut hasher = Sha256::new();
94                hasher.update(content);
95                hex::encode(hasher.finalize())
96            }
97            HashAlgorithm::FastHash => {
98                let hash = fnv1a_hash(content);
99                format!("{hash:016x}")
100            }
101        };
102        Self {
103            digest,
104            partition: None,
105            producer_id: None,
106        }
107    }
108
109    /// Create with explicit partition.
110    pub fn with_partition(mut self, partition: u32) -> Self {
111        self.partition = Some(partition);
112        self
113    }
114
115    /// Create with explicit producer id.
116    pub fn with_producer(mut self, producer: String) -> Self {
117        self.producer_id = Some(producer);
118        self
119    }
120
121    /// Create from a pre-computed string key.
122    pub fn from_string(key: String) -> Self {
123        Self {
124            digest: key,
125            partition: None,
126            producer_id: None,
127        }
128    }
129
130    /// Return the composite key used for lookups (includes partition if set).
131    pub fn composite_key(&self) -> String {
132        match self.partition {
133            Some(p) => format!("{}:{}", p, self.digest),
134            None => self.digest.clone(),
135        }
136    }
137}
138
139/// FNV-1a hash for fast non-cryptographic hashing.
140fn fnv1a_hash(data: &[u8]) -> u64 {
141    let mut hash: u64 = 0xcbf29ce484222325;
142    for &byte in data {
143        hash ^= byte as u64;
144        hash = hash.wrapping_mul(0x100000001b3);
145    }
146    hash
147}
148
149// ─────────────────────────────────────────────
150// Delivery Outcome
151// ─────────────────────────────────────────────
152
153/// The result of a delivery attempt.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum DeliveryOutcome {
156    /// Event was successfully processed.
157    Success {
158        /// Serialized result (if any).
159        result: Option<String>,
160        /// When the event was processed.
161        processed_at: DateTime<Utc>,
162    },
163    /// Event processing failed.
164    Failure {
165        /// Error message.
166        error: String,
167        /// Whether the failure is retryable.
168        retryable: bool,
169        /// When the failure occurred.
170        failed_at: DateTime<Utc>,
171    },
172    /// Event is currently being processed.
173    InProgress {
174        /// When processing started.
175        started_at: DateTime<Utc>,
176    },
177}
178
179// ─────────────────────────────────────────────
180// Tracked Key Entry
181// ─────────────────────────────────────────────
182
183/// Internal entry for a tracked idempotency key.
184#[derive(Debug, Clone)]
185struct TrackedKey {
186    /// When this key was first seen.
187    first_seen: Instant,
188    /// When this key was last accessed.
189    last_accessed: Instant,
190    /// How many times this key was submitted.
191    submission_count: u64,
192    /// Cached outcome (if enabled).
193    outcome: Option<DeliveryOutcome>,
194}
195
196// ─────────────────────────────────────────────
197// Statistics
198// ─────────────────────────────────────────────
199
200/// Statistics for the idempotent delivery manager.
201#[derive(Debug, Clone, Default, Serialize, Deserialize)]
202pub struct IdempotentDeliveryStats {
203    /// Total events submitted.
204    pub total_submitted: u64,
205    /// Events accepted (new, non-duplicate).
206    pub accepted: u64,
207    /// Events rejected as duplicates.
208    pub duplicates_rejected: u64,
209    /// Events served from outcome cache.
210    pub cache_hits: u64,
211    /// Current number of tracked keys.
212    pub active_keys: usize,
213    /// Total keys evicted (TTL or capacity).
214    pub keys_evicted: u64,
215    /// Eviction sweeps performed.
216    pub eviction_sweeps: u64,
217    /// Duplicate rate (duplicates / total).
218    pub duplicate_rate: f64,
219    /// Average key lifetime in seconds.
220    pub avg_key_lifetime_secs: f64,
221    /// Per-partition key counts.
222    pub partition_key_counts: HashMap<u32, usize>,
223}
224
225// ─────────────────────────────────────────────
226// Batch check result
227// ─────────────────────────────────────────────
228
229/// Result of checking a single key in a batch.
230#[derive(Debug, Clone)]
231pub struct KeyCheckResult {
232    /// The idempotency key.
233    pub key: IdempotencyKey,
234    /// Whether this is a duplicate.
235    pub is_duplicate: bool,
236    /// Cached outcome (if available).
237    pub cached_outcome: Option<DeliveryOutcome>,
238    /// How many times this key has been seen.
239    pub submission_count: u64,
240}
241
242// ─────────────────────────────────────────────
243// Idempotent Delivery Manager
244// ─────────────────────────────────────────────
245
246/// Manages idempotent delivery guarantees for stream events.
247///
248/// Tracks idempotency keys in a sliding TTL window with configurable
249/// eviction, outcome caching, and partition-aware tracking.
250pub struct IdempotentDeliveryManager {
251    config: IdempotentDeliveryConfig,
252    /// Map from composite key -> tracked entry.
253    keys: Arc<RwLock<HashMap<String, TrackedKey>>>,
254    /// Ordered set for TTL eviction: (expiry_instant, composite_key).
255    expiry_queue: Arc<RwLock<BTreeMap<(Instant, String), ()>>>,
256    /// Running statistics.
257    stats: Arc<RwLock<IdempotentDeliveryStats>>,
258    /// When the last eviction sweep ran.
259    last_eviction: Arc<RwLock<Instant>>,
260}
261
262impl IdempotentDeliveryManager {
263    /// Create a new idempotent delivery manager.
264    pub fn new(config: IdempotentDeliveryConfig) -> Self {
265        Self {
266            config,
267            keys: Arc::new(RwLock::new(HashMap::new())),
268            expiry_queue: Arc::new(RwLock::new(BTreeMap::new())),
269            stats: Arc::new(RwLock::new(IdempotentDeliveryStats::default())),
270            last_eviction: Arc::new(RwLock::new(Instant::now())),
271        }
272    }
273
274    /// Create with default configuration.
275    pub fn with_defaults() -> Self {
276        Self::new(IdempotentDeliveryConfig::default())
277    }
278
279    /// Generate an idempotency key from raw content.
280    pub fn generate_key(&self, content: &[u8]) -> IdempotencyKey {
281        IdempotencyKey::from_content(content, self.config.hash_algorithm)
282    }
283
284    /// Generate a partitioned idempotency key.
285    pub fn generate_partitioned_key(&self, content: &[u8], partition: u32) -> IdempotencyKey {
286        IdempotencyKey::from_content(content, self.config.hash_algorithm).with_partition(partition)
287    }
288
289    /// Check if a key is a duplicate and register it atomically.
290    ///
291    /// Returns `Ok(false)` if this is a new key (event should be processed).
292    /// Returns `Ok(true)` if this is a duplicate (event should be skipped).
293    pub async fn check_and_register(&self, key: &IdempotencyKey) -> Result<bool, StreamError> {
294        self.maybe_evict().await;
295
296        let composite = key.composite_key();
297        let now = Instant::now();
298
299        let mut keys = self.keys.write().await;
300        let mut stats = self.stats.write().await;
301        stats.total_submitted += 1;
302
303        if let Some(entry) = keys.get_mut(&composite) {
304            // Check if expired
305            if now.duration_since(entry.first_seen) > self.config.key_ttl {
306                // Key expired — remove and treat as new
307                keys.remove(&composite);
308                let mut expiry = self.expiry_queue.write().await;
309                // Remove from expiry queue (best effort — key might not match exactly)
310                expiry.retain(|(_t, k), _| k != &composite);
311                // Fall through to insert below
312            } else {
313                entry.submission_count += 1;
314                entry.last_accessed = now;
315                stats.duplicates_rejected += 1;
316                stats.duplicate_rate = if stats.total_submitted > 0 {
317                    stats.duplicates_rejected as f64 / stats.total_submitted as f64
318                } else {
319                    0.0
320                };
321                debug!(key = %composite, count = entry.submission_count, "Duplicate key detected");
322                return Ok(true);
323            }
324        }
325
326        // Capacity check
327        if keys.len() >= self.config.max_keys {
328            // Evict oldest key
329            let mut expiry = self.expiry_queue.write().await;
330            if let Some(((_, oldest_key), _)) = expiry.pop_first() {
331                keys.remove(&oldest_key);
332                stats.keys_evicted += 1;
333            }
334        }
335
336        // Insert new key
337        let entry = TrackedKey {
338            first_seen: now,
339            last_accessed: now,
340            submission_count: 1,
341            outcome: None,
342        };
343        keys.insert(composite.clone(), entry);
344
345        // Add to expiry queue
346        let expiry_time = now + self.config.key_ttl;
347        let mut expiry = self.expiry_queue.write().await;
348        expiry.insert((expiry_time, composite), ());
349
350        stats.accepted += 1;
351        stats.active_keys = keys.len();
352
353        // Update partition stats
354        if let Some(p) = key.partition {
355            *stats.partition_key_counts.entry(p).or_insert(0) += 1;
356        }
357
358        stats.duplicate_rate = if stats.total_submitted > 0 {
359            stats.duplicates_rejected as f64 / stats.total_submitted as f64
360        } else {
361            0.0
362        };
363
364        Ok(false)
365    }
366
367    /// Record the outcome of processing an event.
368    pub async fn record_outcome(
369        &self,
370        key: &IdempotencyKey,
371        outcome: DeliveryOutcome,
372    ) -> Result<(), StreamError> {
373        if !self.config.cache_outcomes {
374            return Ok(());
375        }
376
377        let composite = key.composite_key();
378        let mut keys = self.keys.write().await;
379
380        if let Some(entry) = keys.get_mut(&composite) {
381            entry.outcome = Some(outcome);
382            Ok(())
383        } else {
384            Err(StreamError::NotFound(format!("Key not found: {composite}")))
385        }
386    }
387
388    /// Get the cached outcome for a key (if any).
389    pub async fn get_outcome(
390        &self,
391        key: &IdempotencyKey,
392    ) -> Result<Option<DeliveryOutcome>, StreamError> {
393        let composite = key.composite_key();
394        let keys = self.keys.read().await;
395        let mut stats = self.stats.write().await;
396
397        if let Some(entry) = keys.get(&composite) {
398            if entry.outcome.is_some() {
399                stats.cache_hits += 1;
400            }
401            Ok(entry.outcome.clone())
402        } else {
403            Ok(None)
404        }
405    }
406
407    /// Check multiple keys in batch.
408    pub async fn check_batch(
409        &self,
410        keys_to_check: &[IdempotencyKey],
411    ) -> Result<Vec<KeyCheckResult>, StreamError> {
412        self.maybe_evict().await;
413
414        let stored_keys = self.keys.read().await;
415        let mut results = Vec::with_capacity(keys_to_check.len());
416
417        for key in keys_to_check {
418            let composite = key.composite_key();
419            let (is_duplicate, cached_outcome, submission_count) =
420                if let Some(entry) = stored_keys.get(&composite) {
421                    let now = Instant::now();
422                    if now.duration_since(entry.first_seen) > self.config.key_ttl {
423                        (false, None, 0)
424                    } else {
425                        (true, entry.outcome.clone(), entry.submission_count)
426                    }
427                } else {
428                    (false, None, 0)
429                };
430
431            results.push(KeyCheckResult {
432                key: key.clone(),
433                is_duplicate,
434                cached_outcome,
435                submission_count,
436            });
437        }
438
439        Ok(results)
440    }
441
442    /// Manually evict expired keys.
443    pub async fn evict_expired(&self) -> usize {
444        let now = Instant::now();
445        let mut keys = self.keys.write().await;
446        let mut expiry = self.expiry_queue.write().await;
447        let mut stats = self.stats.write().await;
448        let mut evicted = 0;
449
450        // Remove all entries with expiry <= now
451        let cutoff = (now, String::new());
452        let expired: Vec<(Instant, String)> = expiry
453            .range(..=cutoff)
454            .map(|((t, k), _)| (*t, k.clone()))
455            .collect();
456
457        for (t, k) in &expired {
458            expiry.remove(&(*t, k.clone()));
459            if keys.remove(k).is_some() {
460                evicted += 1;
461            }
462        }
463
464        stats.keys_evicted += evicted as u64;
465        stats.eviction_sweeps += 1;
466        stats.active_keys = keys.len();
467
468        if evicted > 0 {
469            info!(
470                evicted,
471                remaining = keys.len(),
472                "Evicted expired idempotency keys"
473            );
474        }
475
476        evicted
477    }
478
479    /// Check if a key exists (without registering or counting it).
480    pub async fn contains_key(&self, key: &IdempotencyKey) -> bool {
481        let composite = key.composite_key();
482        let keys = self.keys.read().await;
483        if let Some(entry) = keys.get(&composite) {
484            Instant::now().duration_since(entry.first_seen) <= self.config.key_ttl
485        } else {
486            false
487        }
488    }
489
490    /// Remove a specific key (e.g., after a failed delivery that should be retried).
491    pub async fn remove_key(&self, key: &IdempotencyKey) -> bool {
492        let composite = key.composite_key();
493        let mut keys = self.keys.write().await;
494        let removed = keys.remove(&composite).is_some();
495        if removed {
496            let mut stats = self.stats.write().await;
497            stats.active_keys = keys.len();
498        }
499        removed
500    }
501
502    /// Clear all tracked keys.
503    pub async fn clear(&self) {
504        let mut keys = self.keys.write().await;
505        let mut expiry = self.expiry_queue.write().await;
506        let mut stats = self.stats.write().await;
507        keys.clear();
508        expiry.clear();
509        stats.active_keys = 0;
510        info!("Cleared all idempotency keys");
511    }
512
513    /// Get current statistics.
514    pub async fn stats(&self) -> IdempotentDeliveryStats {
515        let stats = self.stats.read().await;
516        stats.clone()
517    }
518
519    /// Get the current configuration.
520    pub fn config(&self) -> &IdempotentDeliveryConfig {
521        &self.config
522    }
523
524    /// Internal: run eviction if interval has elapsed.
525    async fn maybe_evict(&self) {
526        let should_evict = {
527            let last = self.last_eviction.read().await;
528            last.elapsed() >= self.config.eviction_interval
529        };
530
531        if should_evict {
532            let mut last = self.last_eviction.write().await;
533            if last.elapsed() >= self.config.eviction_interval {
534                *last = Instant::now();
535                // Drop the lock before the expensive eviction
536                drop(last);
537                self.evict_expired().await;
538            }
539        }
540    }
541
542    /// Get number of active (non-expired) keys.
543    pub async fn active_key_count(&self) -> usize {
544        let keys = self.keys.read().await;
545        keys.len()
546    }
547
548    /// Get the submission count for a specific key.
549    pub async fn submission_count(&self, key: &IdempotencyKey) -> u64 {
550        let composite = key.composite_key();
551        let keys = self.keys.read().await;
552        keys.get(&composite).map_or(0, |e| e.submission_count)
553    }
554
555    /// Check if the outcome cache is enabled.
556    pub fn is_cache_enabled(&self) -> bool {
557        self.config.cache_outcomes
558    }
559}
560
561// ─────────────────────────────────────────────
562// Idempotent Producer Wrapper
563// ─────────────────────────────────────────────
564
565/// Wraps a delivery function with idempotency guarantees.
566///
567/// Usage pattern:
568/// 1. Compute idempotency key from event content
569/// 2. Check against the manager
570/// 3. If new, process the event and record the outcome
571/// 4. If duplicate, return the cached outcome or skip
572pub struct IdempotentProducer {
573    manager: Arc<IdempotentDeliveryManager>,
574}
575
576impl IdempotentProducer {
577    /// Create an idempotent producer wrapping the given manager.
578    pub fn new(manager: Arc<IdempotentDeliveryManager>) -> Self {
579        Self { manager }
580    }
581
582    /// Attempt to deliver an event with idempotency key.
583    ///
584    /// Returns `Ok(Some(outcome))` if the event was already processed.
585    /// Returns `Ok(None)` if the event is new and should be processed by the caller.
586    pub async fn try_deliver(
587        &self,
588        key: &IdempotencyKey,
589    ) -> Result<Option<DeliveryOutcome>, StreamError> {
590        let is_dup = self.manager.check_and_register(key).await?;
591        if is_dup {
592            // Return cached outcome if available
593            let outcome = self.manager.get_outcome(key).await?;
594            Ok(outcome.or(Some(DeliveryOutcome::Success {
595                result: None,
596                processed_at: Utc::now(),
597            })))
598        } else {
599            Ok(None)
600        }
601    }
602
603    /// Record that delivery succeeded.
604    pub async fn mark_success(
605        &self,
606        key: &IdempotencyKey,
607        result: Option<String>,
608    ) -> Result<(), StreamError> {
609        self.manager
610            .record_outcome(
611                key,
612                DeliveryOutcome::Success {
613                    result,
614                    processed_at: Utc::now(),
615                },
616            )
617            .await
618    }
619
620    /// Record that delivery failed.
621    pub async fn mark_failure(
622        &self,
623        key: &IdempotencyKey,
624        error: String,
625        retryable: bool,
626    ) -> Result<(), StreamError> {
627        self.manager
628            .record_outcome(
629                key,
630                DeliveryOutcome::Failure {
631                    error,
632                    retryable,
633                    failed_at: Utc::now(),
634                },
635            )
636            .await
637    }
638
639    /// Get the underlying manager.
640    pub fn manager(&self) -> &IdempotentDeliveryManager {
641        &self.manager
642    }
643}
644
645// ─────────────────────────────────────────────
646// Tests
647// ─────────────────────────────────────────────
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652    use std::time::Duration;
653
654    fn default_manager() -> IdempotentDeliveryManager {
655        IdempotentDeliveryManager::new(IdempotentDeliveryConfig::default())
656    }
657
658    fn fast_ttl_manager(ttl_ms: u64) -> IdempotentDeliveryManager {
659        IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
660            key_ttl: Duration::from_millis(ttl_ms),
661            eviction_interval: Duration::from_millis(10),
662            ..Default::default()
663        })
664    }
665
666    #[tokio::test]
667    async fn test_new_key_is_not_duplicate() {
668        let mgr = default_manager();
669        let key = mgr.generate_key(b"event-1");
670        let is_dup = mgr.check_and_register(&key).await.expect("check failed");
671        assert!(!is_dup, "First submission should not be duplicate");
672    }
673
674    #[tokio::test]
675    async fn test_same_key_is_duplicate() {
676        let mgr = default_manager();
677        let key = mgr.generate_key(b"event-1");
678        mgr.check_and_register(&key).await.expect("check failed");
679        let is_dup = mgr.check_and_register(&key).await.expect("check failed");
680        assert!(is_dup, "Second submission should be duplicate");
681    }
682
683    #[tokio::test]
684    async fn test_different_keys_not_duplicate() {
685        let mgr = default_manager();
686        let k1 = mgr.generate_key(b"event-1");
687        let k2 = mgr.generate_key(b"event-2");
688        mgr.check_and_register(&k1).await.expect("check failed");
689        let is_dup = mgr.check_and_register(&k2).await.expect("check failed");
690        assert!(!is_dup, "Different events should not be duplicates");
691    }
692
693    #[tokio::test]
694    async fn test_stats_tracking() {
695        let mgr = default_manager();
696        let k1 = mgr.generate_key(b"event-1");
697        mgr.check_and_register(&k1).await.expect("check failed");
698        mgr.check_and_register(&k1).await.expect("check failed");
699        mgr.check_and_register(&k1).await.expect("check failed");
700
701        let stats = mgr.stats().await;
702        assert_eq!(stats.total_submitted, 3);
703        assert_eq!(stats.accepted, 1);
704        assert_eq!(stats.duplicates_rejected, 2);
705        assert!((stats.duplicate_rate - 2.0 / 3.0).abs() < 1e-10);
706    }
707
708    #[tokio::test]
709    async fn test_outcome_caching() {
710        let mgr = default_manager();
711        let key = mgr.generate_key(b"event-1");
712        mgr.check_and_register(&key).await.expect("check failed");
713
714        mgr.record_outcome(
715            &key,
716            DeliveryOutcome::Success {
717                result: Some("ok".into()),
718                processed_at: Utc::now(),
719            },
720        )
721        .await
722        .expect("record failed");
723
724        let outcome = mgr.get_outcome(&key).await.expect("get failed");
725        assert!(outcome.is_some());
726        if let Some(DeliveryOutcome::Success { result, .. }) = outcome {
727            assert_eq!(result, Some("ok".into()));
728        } else {
729            panic!("Expected Success outcome");
730        }
731    }
732
733    #[tokio::test]
734    async fn test_outcome_cache_disabled() {
735        let mgr = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
736            cache_outcomes: false,
737            ..Default::default()
738        });
739        let key = mgr.generate_key(b"event-1");
740        mgr.check_and_register(&key).await.expect("check failed");
741
742        // Recording should be a no-op when cache is disabled
743        mgr.record_outcome(
744            &key,
745            DeliveryOutcome::Success {
746                result: None,
747                processed_at: Utc::now(),
748            },
749        )
750        .await
751        .expect("record failed");
752
753        let outcome = mgr.get_outcome(&key).await.expect("get failed");
754        assert!(outcome.is_none());
755    }
756
757    #[tokio::test]
758    async fn test_ttl_expiration() {
759        let mgr = fast_ttl_manager(50);
760        let key = mgr.generate_key(b"event-1");
761        mgr.check_and_register(&key).await.expect("check failed");
762
763        // Wait for TTL to expire
764        tokio::time::sleep(Duration::from_millis(100)).await;
765
766        // After expiry, the same key should be accepted as new
767        let is_dup = mgr.check_and_register(&key).await.expect("check failed");
768        assert!(!is_dup, "Expired key should be accepted as new");
769    }
770
771    #[tokio::test]
772    async fn test_evict_expired() {
773        let mgr = fast_ttl_manager(50);
774        let k1 = mgr.generate_key(b"event-1");
775        let k2 = mgr.generate_key(b"event-2");
776        mgr.check_and_register(&k1).await.expect("check failed");
777        mgr.check_and_register(&k2).await.expect("check failed");
778
779        assert_eq!(mgr.active_key_count().await, 2);
780
781        tokio::time::sleep(Duration::from_millis(100)).await;
782        let evicted = mgr.evict_expired().await;
783        assert!(evicted >= 1, "Should have evicted at least one key");
784    }
785
786    #[tokio::test]
787    async fn test_max_keys_eviction() {
788        let mgr = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
789            max_keys: 5,
790            ..Default::default()
791        });
792
793        for i in 0..10 {
794            let key = mgr.generate_key(format!("event-{i}").as_bytes());
795            mgr.check_and_register(&key).await.expect("check failed");
796        }
797
798        assert!(mgr.active_key_count().await <= 6); // May be up to max_keys + 1 due to insertion order
799    }
800
801    #[tokio::test]
802    async fn test_partitioned_keys() {
803        let mgr = default_manager();
804        let content = b"event-1";
805        let k1 = mgr.generate_partitioned_key(content, 0);
806        let k2 = mgr.generate_partitioned_key(content, 1);
807
808        mgr.check_and_register(&k1).await.expect("check failed");
809        let is_dup = mgr.check_and_register(&k2).await.expect("check failed");
810        assert!(
811            !is_dup,
812            "Same content in different partitions should not collide"
813        );
814    }
815
816    #[tokio::test]
817    async fn test_same_partition_duplicate() {
818        let mgr = default_manager();
819        let k1 = mgr.generate_partitioned_key(b"event-1", 0);
820        let k2 = mgr.generate_partitioned_key(b"event-1", 0);
821
822        mgr.check_and_register(&k1).await.expect("check failed");
823        let is_dup = mgr.check_and_register(&k2).await.expect("check failed");
824        assert!(is_dup, "Same content in same partition should be duplicate");
825    }
826
827    #[tokio::test]
828    async fn test_contains_key() {
829        let mgr = default_manager();
830        let key = mgr.generate_key(b"event-1");
831        assert!(!mgr.contains_key(&key).await);
832
833        mgr.check_and_register(&key).await.expect("check failed");
834        assert!(mgr.contains_key(&key).await);
835    }
836
837    #[tokio::test]
838    async fn test_remove_key() {
839        let mgr = default_manager();
840        let key = mgr.generate_key(b"event-1");
841        mgr.check_and_register(&key).await.expect("check failed");
842        assert!(mgr.contains_key(&key).await);
843
844        let removed = mgr.remove_key(&key).await;
845        assert!(removed);
846        assert!(!mgr.contains_key(&key).await);
847    }
848
849    #[tokio::test]
850    async fn test_clear_all_keys() {
851        let mgr = default_manager();
852        for i in 0..10 {
853            let key = mgr.generate_key(format!("event-{i}").as_bytes());
854            mgr.check_and_register(&key).await.expect("check failed");
855        }
856        assert_eq!(mgr.active_key_count().await, 10);
857        mgr.clear().await;
858        assert_eq!(mgr.active_key_count().await, 0);
859    }
860
861    #[tokio::test]
862    async fn test_batch_check() {
863        let mgr = default_manager();
864        let k1 = mgr.generate_key(b"event-1");
865        let k2 = mgr.generate_key(b"event-2");
866        let k3 = mgr.generate_key(b"event-3");
867
868        mgr.check_and_register(&k1).await.expect("check failed");
869
870        let results = mgr
871            .check_batch(&[k1.clone(), k2.clone(), k3.clone()])
872            .await
873            .expect("batch failed");
874        assert_eq!(results.len(), 3);
875        assert!(results[0].is_duplicate);
876        assert!(!results[1].is_duplicate);
877        assert!(!results[2].is_duplicate);
878    }
879
880    #[tokio::test]
881    async fn test_submission_count() {
882        let mgr = default_manager();
883        let key = mgr.generate_key(b"event-1");
884        mgr.check_and_register(&key).await.expect("check failed");
885        mgr.check_and_register(&key).await.expect("check failed");
886        mgr.check_and_register(&key).await.expect("check failed");
887
888        let count = mgr.submission_count(&key).await;
889        assert_eq!(count, 3);
890    }
891
892    #[tokio::test]
893    async fn test_idempotent_producer_new_event() {
894        let mgr = Arc::new(default_manager());
895        let producer = IdempotentProducer::new(mgr);
896        let key = producer.manager().generate_key(b"event-1");
897
898        let result = producer.try_deliver(&key).await.expect("deliver failed");
899        assert!(result.is_none(), "New event should return None");
900    }
901
902    #[tokio::test]
903    async fn test_idempotent_producer_duplicate_event() {
904        let mgr = Arc::new(default_manager());
905        let producer = IdempotentProducer::new(mgr);
906        let key = producer.manager().generate_key(b"event-1");
907
908        // First delivery
909        producer.try_deliver(&key).await.expect("deliver failed");
910        producer
911            .mark_success(&key, Some("done".into()))
912            .await
913            .expect("mark failed");
914
915        // Second delivery (duplicate)
916        let result = producer.try_deliver(&key).await.expect("deliver failed");
917        assert!(result.is_some(), "Duplicate should return cached outcome");
918    }
919
920    #[tokio::test]
921    async fn test_idempotent_producer_mark_failure() {
922        let mgr = Arc::new(default_manager());
923        let producer = IdempotentProducer::new(mgr);
924        let key = producer.manager().generate_key(b"event-1");
925
926        producer.try_deliver(&key).await.expect("deliver failed");
927        producer
928            .mark_failure(&key, "timeout".into(), true)
929            .await
930            .expect("mark failed");
931
932        let outcome = producer
933            .manager()
934            .get_outcome(&key)
935            .await
936            .expect("get failed");
937        assert!(matches!(
938            outcome,
939            Some(DeliveryOutcome::Failure {
940                retryable: true,
941                ..
942            })
943        ));
944    }
945
946    #[tokio::test]
947    async fn test_fast_hash_algorithm() {
948        let mgr = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
949            hash_algorithm: HashAlgorithm::FastHash,
950            ..Default::default()
951        });
952        let key = mgr.generate_key(b"event-1");
953        assert!(!key.digest.is_empty());
954        assert_eq!(key.digest.len(), 16); // 64-bit hex
955    }
956
957    #[tokio::test]
958    async fn test_sha256_algorithm() {
959        let mgr = default_manager();
960        let key = mgr.generate_key(b"event-1");
961        assert!(!key.digest.is_empty());
962        assert_eq!(key.digest.len(), 64); // SHA-256 hex
963    }
964
965    #[tokio::test]
966    async fn test_from_string_key() {
967        let mgr = default_manager();
968        let key = IdempotencyKey::from_string("custom-key-12345".into());
969        let is_dup = mgr.check_and_register(&key).await.expect("check failed");
970        assert!(!is_dup);
971
972        let is_dup2 = mgr.check_and_register(&key).await.expect("check failed");
973        assert!(is_dup2);
974    }
975
976    #[tokio::test]
977    async fn test_with_producer_id() {
978        let mgr = default_manager();
979        let key = mgr
980            .generate_key(b"event-1")
981            .with_producer("producer-a".into());
982        assert_eq!(key.producer_id, Some("producer-a".into()));
983    }
984
985    #[tokio::test]
986    async fn test_composite_key_format() {
987        let key = IdempotencyKey::from_string("abc".into()).with_partition(3);
988        assert_eq!(key.composite_key(), "3:abc");
989
990        let key2 = IdempotencyKey::from_string("abc".into());
991        assert_eq!(key2.composite_key(), "abc");
992    }
993
994    #[tokio::test]
995    async fn test_config_defaults() {
996        let config = IdempotentDeliveryConfig::default();
997        assert_eq!(config.key_ttl, Duration::from_secs(3600));
998        assert_eq!(config.max_keys, 1_000_000);
999        assert!(config.cache_outcomes);
1000        assert!(config.partition_aware);
1001        assert_eq!(config.hash_algorithm, HashAlgorithm::Sha256);
1002    }
1003
1004    #[tokio::test]
1005    async fn test_with_defaults_constructor() {
1006        let mgr = IdempotentDeliveryManager::with_defaults();
1007        assert_eq!(mgr.config().key_ttl, Duration::from_secs(3600));
1008    }
1009
1010    #[tokio::test]
1011    async fn test_is_cache_enabled() {
1012        let mgr = default_manager();
1013        assert!(mgr.is_cache_enabled());
1014
1015        let mgr2 = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
1016            cache_outcomes: false,
1017            ..Default::default()
1018        });
1019        assert!(!mgr2.is_cache_enabled());
1020    }
1021
1022    #[tokio::test]
1023    async fn test_concurrent_duplicate_detection() {
1024        let mgr = Arc::new(default_manager());
1025        let mut handles = Vec::new();
1026
1027        for _ in 0..10 {
1028            let m = Arc::clone(&mgr);
1029            handles.push(tokio::spawn(async move {
1030                let key = m.generate_key(b"shared-event");
1031                m.check_and_register(&key).await.expect("check failed")
1032            }));
1033        }
1034
1035        let mut accepted = 0;
1036        let mut duplicates = 0;
1037        for h in handles {
1038            let is_dup = h.await.expect("join failed");
1039            if is_dup {
1040                duplicates += 1;
1041            } else {
1042                accepted += 1;
1043            }
1044        }
1045
1046        assert_eq!(accepted, 1, "Exactly one should be accepted");
1047        assert_eq!(duplicates, 9, "Nine should be duplicates");
1048    }
1049
1050    #[tokio::test]
1051    async fn test_partition_stats() {
1052        let mgr = default_manager();
1053        let k1 = mgr.generate_partitioned_key(b"a", 0);
1054        let k2 = mgr.generate_partitioned_key(b"b", 0);
1055        let k3 = mgr.generate_partitioned_key(b"c", 1);
1056
1057        mgr.check_and_register(&k1).await.expect("check failed");
1058        mgr.check_and_register(&k2).await.expect("check failed");
1059        mgr.check_and_register(&k3).await.expect("check failed");
1060
1061        let stats = mgr.stats().await;
1062        assert_eq!(stats.partition_key_counts.get(&0), Some(&2));
1063        assert_eq!(stats.partition_key_counts.get(&1), Some(&1));
1064    }
1065
1066    #[tokio::test]
1067    async fn test_record_outcome_unknown_key() {
1068        let mgr = default_manager();
1069        let key = mgr.generate_key(b"unknown");
1070
1071        let result = mgr
1072            .record_outcome(
1073                &key,
1074                DeliveryOutcome::Success {
1075                    result: None,
1076                    processed_at: Utc::now(),
1077                },
1078            )
1079            .await;
1080        assert!(
1081            result.is_err(),
1082            "Recording outcome for unknown key should fail"
1083        );
1084    }
1085
1086    #[tokio::test]
1087    async fn test_remove_nonexistent_key() {
1088        let mgr = default_manager();
1089        let key = mgr.generate_key(b"nonexistent");
1090        let removed = mgr.remove_key(&key).await;
1091        assert!(!removed);
1092    }
1093
1094    #[tokio::test]
1095    async fn test_failure_outcome() {
1096        let mgr = default_manager();
1097        let key = mgr.generate_key(b"event-1");
1098        mgr.check_and_register(&key).await.expect("check failed");
1099
1100        mgr.record_outcome(
1101            &key,
1102            DeliveryOutcome::Failure {
1103                error: "network timeout".into(),
1104                retryable: true,
1105                failed_at: Utc::now(),
1106            },
1107        )
1108        .await
1109        .expect("record failed");
1110
1111        let outcome = mgr.get_outcome(&key).await.expect("get failed");
1112        if let Some(DeliveryOutcome::Failure {
1113            error, retryable, ..
1114        }) = outcome
1115        {
1116            assert_eq!(error, "network timeout");
1117            assert!(retryable);
1118        } else {
1119            panic!("Expected Failure outcome");
1120        }
1121    }
1122
1123    #[tokio::test]
1124    async fn test_in_progress_outcome() {
1125        let mgr = default_manager();
1126        let key = mgr.generate_key(b"event-1");
1127        mgr.check_and_register(&key).await.expect("check failed");
1128
1129        mgr.record_outcome(
1130            &key,
1131            DeliveryOutcome::InProgress {
1132                started_at: Utc::now(),
1133            },
1134        )
1135        .await
1136        .expect("record failed");
1137
1138        let outcome = mgr.get_outcome(&key).await.expect("get failed");
1139        assert!(matches!(outcome, Some(DeliveryOutcome::InProgress { .. })));
1140    }
1141
1142    #[tokio::test]
1143    async fn test_many_unique_events() {
1144        let mgr = default_manager();
1145        for i in 0u64..100 {
1146            let key = mgr.generate_key(&i.to_le_bytes());
1147            let is_dup = mgr.check_and_register(&key).await.expect("check failed");
1148            assert!(!is_dup, "All unique events should be accepted");
1149        }
1150        assert_eq!(mgr.active_key_count().await, 100);
1151        let stats = mgr.stats().await;
1152        assert_eq!(stats.accepted, 100);
1153        assert_eq!(stats.duplicates_rejected, 0);
1154    }
1155
1156    #[tokio::test]
1157    async fn test_fnv1a_deterministic() {
1158        let h1 = fnv1a_hash(b"hello");
1159        let h2 = fnv1a_hash(b"hello");
1160        assert_eq!(h1, h2);
1161        let h3 = fnv1a_hash(b"world");
1162        assert_ne!(h1, h3);
1163    }
1164
1165    #[tokio::test]
1166    async fn test_idempotency_key_serialize() {
1167        let key = IdempotencyKey::from_string("test-key".into()).with_partition(42);
1168        let json = serde_json::to_string(&key).expect("serialize failed");
1169        let deserialized: IdempotencyKey = serde_json::from_str(&json).expect("deserialize failed");
1170        assert_eq!(deserialized.digest, "test-key");
1171        assert_eq!(deserialized.partition, Some(42));
1172    }
1173
1174    #[tokio::test]
1175    async fn test_config_serialize() {
1176        let config = IdempotentDeliveryConfig::default();
1177        let json = serde_json::to_string(&config).expect("serialize failed");
1178        assert!(json.contains("key_ttl"));
1179    }
1180
1181    #[tokio::test]
1182    async fn test_stats_initial_values() {
1183        let mgr = default_manager();
1184        let stats = mgr.stats().await;
1185        assert_eq!(stats.total_submitted, 0);
1186        assert_eq!(stats.accepted, 0);
1187        assert_eq!(stats.duplicates_rejected, 0);
1188        assert_eq!(stats.cache_hits, 0);
1189        assert_eq!(stats.active_keys, 0);
1190        assert_eq!(stats.keys_evicted, 0);
1191    }
1192
1193    #[tokio::test]
1194    async fn test_cache_hit_stats() {
1195        let mgr = default_manager();
1196        let key = mgr.generate_key(b"event-1");
1197        mgr.check_and_register(&key).await.expect("check failed");
1198
1199        mgr.record_outcome(
1200            &key,
1201            DeliveryOutcome::Success {
1202                result: Some("cached".into()),
1203                processed_at: Utc::now(),
1204            },
1205        )
1206        .await
1207        .expect("record failed");
1208
1209        // Retrieve outcome to trigger cache hit
1210        let _ = mgr.get_outcome(&key).await.expect("get failed");
1211        let stats = mgr.stats().await;
1212        assert_eq!(stats.cache_hits, 1);
1213    }
1214}