Skip to main content

ringkernel_core/
idempotency.rs

1//! Message Idempotency & Deduplication — FR-006
2//!
3//! Deduplication layer for K2K messages ensuring exactly-once semantics:
4//! - Each message carries an idempotency key (derived from MessageId)
5//! - Receiver maintains a bounded dedup cache with configurable TTL
6//! - Duplicate messages are silently dropped (logged, not processed)
7//!
8//! # Usage
9//!
10//! ```ignore
11//! let mut dedup = DeduplicationCache::new(DeduplicationConfig::default());
12//!
13//! // First time: not a duplicate
14//! assert!(!dedup.is_duplicate(msg_id_1));
15//!
16//! // Second time: duplicate detected
17//! assert!(dedup.is_duplicate(msg_id_1));
18//! ```
19
20use std::collections::{HashMap, VecDeque};
21use std::time::{Duration, Instant};
22
23/// Configuration for the deduplication cache.
24#[derive(Debug, Clone)]
25pub struct DeduplicationConfig {
26    /// Maximum entries in the cache.
27    pub max_entries: usize,
28    /// Time-to-live for cache entries.
29    pub ttl: Duration,
30    /// Whether to log duplicate detections.
31    pub log_duplicates: bool,
32}
33
34impl Default for DeduplicationConfig {
35    fn default() -> Self {
36        Self {
37            max_entries: 100_000,
38            ttl: Duration::from_secs(300), // 5 minutes
39            log_duplicates: false,
40        }
41    }
42}
43
44/// Deduplication cache for K2K message processing.
45///
46/// Tracks recently-seen message IDs to prevent duplicate processing.
47/// Uses a bounded LRU-style eviction with TTL expiry.
48pub struct DeduplicationCache {
49    /// Map from idempotency key to insertion time.
50    seen: HashMap<u64, Instant>,
51    /// Insertion-order queue for TTL eviction.
52    order: VecDeque<(u64, Instant)>,
53    /// Configuration.
54    config: DeduplicationConfig,
55    /// Metrics.
56    duplicates_detected: u64,
57    total_checked: u64,
58}
59
60impl DeduplicationCache {
61    /// Create a new deduplication cache.
62    pub fn new(config: DeduplicationConfig) -> Self {
63        Self {
64            seen: HashMap::with_capacity(config.max_entries),
65            order: VecDeque::with_capacity(config.max_entries),
66            config,
67            duplicates_detected: 0,
68            total_checked: 0,
69        }
70    }
71
72    /// Check if a message ID is a duplicate. If not, records it.
73    ///
74    /// Returns `true` if this message was already seen (duplicate).
75    /// Returns `false` if this is the first time (not duplicate, now recorded).
76    pub fn is_duplicate(&mut self, idempotency_key: u64) -> bool {
77        self.total_checked += 1;
78        self.expire_old();
79
80        if self.seen.contains_key(&idempotency_key) {
81            self.duplicates_detected += 1;
82            if self.config.log_duplicates {
83                tracing::debug!(key = idempotency_key, "Duplicate message detected");
84            }
85            return true;
86        }
87
88        // Record the new key
89        let now = Instant::now();
90        self.seen.insert(idempotency_key, now);
91        self.order.push_back((idempotency_key, now));
92
93        // Evict if over capacity
94        while self.seen.len() > self.config.max_entries {
95            if let Some((old_key, _)) = self.order.pop_front() {
96                self.seen.remove(&old_key);
97            }
98        }
99
100        false
101    }
102
103    /// Check if a key was seen without recording it.
104    pub fn was_seen(&self, idempotency_key: u64) -> bool {
105        self.seen.contains_key(&idempotency_key)
106    }
107
108    /// Expire entries older than TTL.
109    fn expire_old(&mut self) {
110        let cutoff = Instant::now() - self.config.ttl;
111
112        while let Some(&(key, time)) = self.order.front() {
113            if time < cutoff {
114                self.order.pop_front();
115                self.seen.remove(&key);
116            } else {
117                break; // Remaining entries are newer
118            }
119        }
120    }
121
122    /// Number of entries in the cache.
123    pub fn len(&self) -> usize {
124        self.seen.len()
125    }
126
127    /// Check if empty.
128    pub fn is_empty(&self) -> bool {
129        self.seen.is_empty()
130    }
131
132    /// Clear the cache.
133    pub fn clear(&mut self) {
134        self.seen.clear();
135        self.order.clear();
136    }
137
138    /// Get deduplication metrics.
139    pub fn metrics(&self) -> DeduplicationMetrics {
140        DeduplicationMetrics {
141            cache_size: self.seen.len() as u64,
142            total_checked: self.total_checked,
143            duplicates_detected: self.duplicates_detected,
144            dedup_rate: if self.total_checked > 0 {
145                self.duplicates_detected as f64 / self.total_checked as f64
146            } else {
147                0.0
148            },
149        }
150    }
151}
152
153/// Deduplication metrics.
154#[derive(Debug, Clone)]
155pub struct DeduplicationMetrics {
156    /// Current cache size.
157    pub cache_size: u64,
158    /// Total messages checked.
159    pub total_checked: u64,
160    /// Duplicates detected.
161    pub duplicates_detected: u64,
162    /// Deduplication rate (duplicates / total).
163    pub dedup_rate: f64,
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[test]
171    fn test_basic_dedup() {
172        let mut cache = DeduplicationCache::new(DeduplicationConfig::default());
173
174        assert!(!cache.is_duplicate(1)); // First time
175        assert!(cache.is_duplicate(1)); // Duplicate
176        assert!(!cache.is_duplicate(2)); // New message
177        assert!(cache.is_duplicate(2)); // Duplicate
178    }
179
180    #[test]
181    fn test_capacity_eviction() {
182        let mut cache = DeduplicationCache::new(DeduplicationConfig {
183            max_entries: 3,
184            ..Default::default()
185        });
186
187        cache.is_duplicate(1);
188        cache.is_duplicate(2);
189        cache.is_duplicate(3);
190        cache.is_duplicate(4); // Evicts 1
191
192        assert!(!cache.was_seen(1)); // Evicted
193        assert!(cache.was_seen(2));
194        assert!(cache.was_seen(3));
195        assert!(cache.was_seen(4));
196    }
197
198    #[test]
199    fn test_metrics() {
200        let mut cache = DeduplicationCache::new(DeduplicationConfig::default());
201
202        cache.is_duplicate(1);
203        cache.is_duplicate(2);
204        cache.is_duplicate(1); // Duplicate
205
206        let m = cache.metrics();
207        assert_eq!(m.total_checked, 3);
208        assert_eq!(m.duplicates_detected, 1);
209        assert!((m.dedup_rate - 1.0 / 3.0).abs() < 0.01);
210    }
211
212    #[test]
213    fn test_clear() {
214        let mut cache = DeduplicationCache::new(DeduplicationConfig::default());
215        cache.is_duplicate(1);
216        cache.is_duplicate(2);
217        assert_eq!(cache.len(), 2);
218
219        cache.clear();
220        assert_eq!(cache.len(), 0);
221        assert!(!cache.is_duplicate(1)); // No longer seen
222    }
223
224    #[test]
225    fn test_ttl_expiry() {
226        let mut cache = DeduplicationCache::new(DeduplicationConfig {
227            ttl: Duration::from_millis(1), // Very short TTL for testing
228            ..Default::default()
229        });
230
231        cache.is_duplicate(1);
232        std::thread::sleep(Duration::from_millis(5));
233
234        // After TTL, message should not be considered duplicate
235        assert!(!cache.is_duplicate(1));
236    }
237}