ringkernel_core/
idempotency.rs1use std::collections::{HashMap, VecDeque};
21use std::time::{Duration, Instant};
22
23#[derive(Debug, Clone)]
25pub struct DeduplicationConfig {
26 pub max_entries: usize,
28 pub ttl: Duration,
30 pub log_duplicates: bool,
32}
33
34impl Default for DeduplicationConfig {
35 fn default() -> Self {
36 Self {
37 max_entries: 100_000,
38 ttl: Duration::from_secs(300), log_duplicates: false,
40 }
41 }
42}
43
44pub struct DeduplicationCache {
49 seen: HashMap<u64, Instant>,
51 order: VecDeque<(u64, Instant)>,
53 config: DeduplicationConfig,
55 duplicates_detected: u64,
57 total_checked: u64,
58}
59
60impl DeduplicationCache {
61 pub fn new(config: DeduplicationConfig) -> Self {
63 Self {
64 seen: HashMap::with_capacity(config.max_entries),
65 order: VecDeque::with_capacity(config.max_entries),
66 config,
67 duplicates_detected: 0,
68 total_checked: 0,
69 }
70 }
71
72 pub fn is_duplicate(&mut self, idempotency_key: u64) -> bool {
77 self.total_checked += 1;
78 self.expire_old();
79
80 if self.seen.contains_key(&idempotency_key) {
81 self.duplicates_detected += 1;
82 if self.config.log_duplicates {
83 tracing::debug!(key = idempotency_key, "Duplicate message detected");
84 }
85 return true;
86 }
87
88 let now = Instant::now();
90 self.seen.insert(idempotency_key, now);
91 self.order.push_back((idempotency_key, now));
92
93 while self.seen.len() > self.config.max_entries {
95 if let Some((old_key, _)) = self.order.pop_front() {
96 self.seen.remove(&old_key);
97 }
98 }
99
100 false
101 }
102
103 pub fn was_seen(&self, idempotency_key: u64) -> bool {
105 self.seen.contains_key(&idempotency_key)
106 }
107
108 fn expire_old(&mut self) {
110 let cutoff = Instant::now() - self.config.ttl;
111
112 while let Some(&(key, time)) = self.order.front() {
113 if time < cutoff {
114 self.order.pop_front();
115 self.seen.remove(&key);
116 } else {
117 break; }
119 }
120 }
121
122 pub fn len(&self) -> usize {
124 self.seen.len()
125 }
126
127 pub fn is_empty(&self) -> bool {
129 self.seen.is_empty()
130 }
131
132 pub fn clear(&mut self) {
134 self.seen.clear();
135 self.order.clear();
136 }
137
138 pub fn metrics(&self) -> DeduplicationMetrics {
140 DeduplicationMetrics {
141 cache_size: self.seen.len() as u64,
142 total_checked: self.total_checked,
143 duplicates_detected: self.duplicates_detected,
144 dedup_rate: if self.total_checked > 0 {
145 self.duplicates_detected as f64 / self.total_checked as f64
146 } else {
147 0.0
148 },
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct DeduplicationMetrics {
156 pub cache_size: u64,
158 pub total_checked: u64,
160 pub duplicates_detected: u64,
162 pub dedup_rate: f64,
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[test]
171 fn test_basic_dedup() {
172 let mut cache = DeduplicationCache::new(DeduplicationConfig::default());
173
174 assert!(!cache.is_duplicate(1)); assert!(cache.is_duplicate(1)); assert!(!cache.is_duplicate(2)); assert!(cache.is_duplicate(2)); }
179
180 #[test]
181 fn test_capacity_eviction() {
182 let mut cache = DeduplicationCache::new(DeduplicationConfig {
183 max_entries: 3,
184 ..Default::default()
185 });
186
187 cache.is_duplicate(1);
188 cache.is_duplicate(2);
189 cache.is_duplicate(3);
190 cache.is_duplicate(4); assert!(!cache.was_seen(1)); assert!(cache.was_seen(2));
194 assert!(cache.was_seen(3));
195 assert!(cache.was_seen(4));
196 }
197
198 #[test]
199 fn test_metrics() {
200 let mut cache = DeduplicationCache::new(DeduplicationConfig::default());
201
202 cache.is_duplicate(1);
203 cache.is_duplicate(2);
204 cache.is_duplicate(1); let m = cache.metrics();
207 assert_eq!(m.total_checked, 3);
208 assert_eq!(m.duplicates_detected, 1);
209 assert!((m.dedup_rate - 1.0 / 3.0).abs() < 0.01);
210 }
211
212 #[test]
213 fn test_clear() {
214 let mut cache = DeduplicationCache::new(DeduplicationConfig::default());
215 cache.is_duplicate(1);
216 cache.is_duplicate(2);
217 assert_eq!(cache.len(), 2);
218
219 cache.clear();
220 assert_eq!(cache.len(), 0);
221 assert!(!cache.is_duplicate(1)); }
223
224 #[test]
225 fn test_ttl_expiry() {
226 let mut cache = DeduplicationCache::new(DeduplicationConfig {
227 ttl: Duration::from_millis(1), ..Default::default()
229 });
230
231 cache.is_duplicate(1);
232 std::thread::sleep(Duration::from_millis(5));
233
234 assert!(!cache.is_duplicate(1));
236 }
237}