1use crate::Message;
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::hash::{Hash, Hasher};
9use std::time::{Duration, Instant};
10use uuid::Uuid;
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub enum DedupKey {
15 TaskId(Uuid),
17 ContentHash(u64),
19 Custom(String),
21}
22
23impl DedupKey {
24 pub fn from_task_id(message: &Message) -> Self {
26 Self::TaskId(message.headers.id)
27 }
28
29 pub fn from_content(message: &Message) -> Self {
31 let mut hasher = std::collections::hash_map::DefaultHasher::new();
32 message.headers.task.hash(&mut hasher);
33 message.body.hash(&mut hasher);
34 Self::ContentHash(hasher.finish())
35 }
36
37 pub fn custom(key: impl Into<String>) -> Self {
39 Self::Custom(key.into())
40 }
41}
42
43#[derive(Debug, Clone)]
45struct DedupEntry {
46 inserted_at: Instant,
47}
48
49#[derive(Debug, Clone)]
51pub struct DedupCache {
52 entries: HashMap<DedupKey, DedupEntry>,
53 max_size: usize,
54 ttl: Duration,
55 insertion_order: VecDeque<DedupKey>,
56}
57
58impl DedupCache {
59 pub fn new(max_size: usize, ttl: Duration) -> Self {
66 Self {
67 entries: HashMap::new(),
68 max_size,
69 ttl,
70 insertion_order: VecDeque::new(),
71 }
72 }
73
74 pub fn with_defaults() -> Self {
76 Self::new(10000, Duration::from_secs(3600))
77 }
78
79 pub fn contains(&mut self, key: &DedupKey) -> bool {
81 self.cleanup_expired();
82 self.entries.contains_key(key)
83 }
84
85 pub fn insert(&mut self, key: DedupKey) -> bool {
89 self.cleanup_expired();
90
91 if self.entries.contains_key(&key) {
92 return false;
93 }
94
95 if self.entries.len() >= self.max_size {
97 if let Some(oldest_key) = self.insertion_order.pop_front() {
98 self.entries.remove(&oldest_key);
99 }
100 }
101
102 let entry = DedupEntry {
103 inserted_at: Instant::now(),
104 };
105
106 self.entries.insert(key.clone(), entry);
107 self.insertion_order.push_back(key);
108 true
109 }
110
111 pub fn is_duplicate(&mut self, message: &Message, use_content_hash: bool) -> bool {
115 let key = if use_content_hash {
116 DedupKey::from_content(message)
117 } else {
118 DedupKey::from_task_id(message)
119 };
120
121 self.contains(&key)
122 }
123
124 pub fn mark_seen(&mut self, message: &Message, use_content_hash: bool) -> bool {
128 let key = if use_content_hash {
129 DedupKey::from_content(message)
130 } else {
131 DedupKey::from_task_id(message)
132 };
133
134 self.insert(key)
135 }
136
137 fn cleanup_expired(&mut self) {
139 let now = Instant::now();
140 let ttl = self.ttl;
141
142 self.entries
144 .retain(|_, entry| now.duration_since(entry.inserted_at) < ttl);
145
146 self.insertion_order
148 .retain(|key| self.entries.contains_key(key));
149 }
150
151 pub fn clear(&mut self) {
153 self.entries.clear();
154 self.insertion_order.clear();
155 }
156
157 #[inline]
159 pub fn len(&self) -> usize {
160 self.entries.len()
161 }
162
163 #[inline]
165 pub fn is_empty(&self) -> bool {
166 self.entries.is_empty()
167 }
168}
169
170#[derive(Debug, Clone)]
172pub struct SimpleDedupSet {
173 seen_ids: HashSet<Uuid>,
174 max_size: usize,
175}
176
177impl SimpleDedupSet {
178 pub fn new(max_size: usize) -> Self {
180 Self {
181 seen_ids: HashSet::new(),
182 max_size,
183 }
184 }
185
186 pub fn contains(&self, message: &Message) -> bool {
188 self.seen_ids.contains(&message.headers.id)
189 }
190
191 pub fn mark_seen(&mut self, message: &Message) -> bool {
195 if self.seen_ids.len() >= self.max_size {
196 let to_remove: Vec<_> = self
198 .seen_ids
199 .iter()
200 .take(self.max_size / 2)
201 .copied()
202 .collect();
203 for id in to_remove {
204 self.seen_ids.remove(&id);
205 }
206 }
207
208 self.seen_ids.insert(message.headers.id)
209 }
210
211 pub fn clear(&mut self) {
213 self.seen_ids.clear();
214 }
215
216 #[inline]
218 pub fn len(&self) -> usize {
219 self.seen_ids.len()
220 }
221
222 #[inline]
224 pub fn is_empty(&self) -> bool {
225 self.seen_ids.is_empty()
226 }
227}
228
229pub fn filter_duplicates(messages: Vec<Message>) -> Vec<Message> {
231 let mut seen = HashSet::new();
232 messages
233 .into_iter()
234 .filter(|msg| seen.insert(msg.headers.id))
235 .collect()
236}
237
238pub fn filter_duplicates_by_content(messages: Vec<Message>) -> Vec<Message> {
240 let mut seen = HashSet::new();
241 messages
242 .into_iter()
243 .filter(|msg| {
244 let key = DedupKey::from_content(msg);
245 seen.insert(key)
246 })
247 .collect()
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use crate::builder::MessageBuilder;
254
255 fn create_test_message(task: &str) -> Message {
256 MessageBuilder::new(task).build().unwrap()
257 }
258
259 #[test]
260 fn test_dedup_key_from_task_id() {
261 let msg = create_test_message("task1");
262 let key = DedupKey::from_task_id(&msg);
263
264 match key {
265 DedupKey::TaskId(id) => assert_eq!(id, msg.headers.id),
266 _ => panic!("Expected TaskId"),
267 }
268 }
269
270 #[test]
271 fn test_dedup_key_from_content() {
272 let msg1 = MessageBuilder::new("task1")
273 .args(vec![serde_json::json!(42)])
274 .build()
275 .unwrap();
276
277 let msg2 = MessageBuilder::new("task1")
278 .args(vec![serde_json::json!(42)])
279 .build()
280 .unwrap();
281
282 let key1 = DedupKey::from_content(&msg1);
283 let key2 = DedupKey::from_content(&msg2);
284
285 assert_eq!(key1, key2);
287 }
288
289 #[test]
290 fn test_dedup_cache_insert() {
291 let mut cache = DedupCache::new(3, Duration::from_secs(60));
292 let msg1 = create_test_message("task1");
293 let msg2 = create_test_message("task2");
294
295 assert!(cache.mark_seen(&msg1, false));
296 assert!(!cache.mark_seen(&msg1, false)); assert!(cache.mark_seen(&msg2, false));
298 }
299
300 #[test]
301 fn test_dedup_cache_is_duplicate() {
302 let mut cache = DedupCache::new(3, Duration::from_secs(60));
303 let msg = create_test_message("task1");
304
305 assert!(!cache.is_duplicate(&msg, false));
306 cache.mark_seen(&msg, false);
307 assert!(cache.is_duplicate(&msg, false));
308 }
309
310 #[test]
311 fn test_dedup_cache_eviction() {
312 let mut cache = DedupCache::new(2, Duration::from_secs(60));
313 let msg1 = create_test_message("task1");
314 let msg2 = create_test_message("task2");
315 let msg3 = create_test_message("task3");
316
317 cache.mark_seen(&msg1, false);
318 cache.mark_seen(&msg2, false);
319 assert_eq!(cache.len(), 2);
320
321 cache.mark_seen(&msg3, false);
323 assert_eq!(cache.len(), 2);
324 }
325
326 #[test]
327 fn test_dedup_cache_content_hash() {
328 let mut cache = DedupCache::new(10, Duration::from_secs(60));
329
330 let msg1 = MessageBuilder::new("task1")
331 .args(vec![serde_json::json!(1)])
332 .build()
333 .unwrap();
334
335 let msg2 = MessageBuilder::new("task1")
336 .args(vec![serde_json::json!(1)])
337 .build()
338 .unwrap();
339
340 assert!(cache.mark_seen(&msg1, true));
341 assert!(!cache.mark_seen(&msg2, true)); }
343
344 #[test]
345 fn test_simple_dedup_set() {
346 let mut dedup = SimpleDedupSet::new(10);
347 let msg1 = create_test_message("task1");
348 let msg2 = create_test_message("task2");
349
350 assert!(dedup.mark_seen(&msg1));
351 assert!(!dedup.mark_seen(&msg1)); assert!(dedup.mark_seen(&msg2));
353
354 assert!(dedup.contains(&msg1));
355 assert!(dedup.contains(&msg2));
356 }
357
358 #[test]
359 fn test_filter_duplicates() {
360 let msg1 = create_test_message("task1");
361 let msg2 = create_test_message("task2");
362 let msg1_dup = msg1.clone();
363
364 let messages = vec![msg1, msg2, msg1_dup];
365 let filtered = filter_duplicates(messages);
366
367 assert_eq!(filtered.len(), 2);
368 }
369
370 #[test]
371 fn test_filter_duplicates_by_content() {
372 let msg1 = MessageBuilder::new("task1")
373 .args(vec![serde_json::json!(1)])
374 .build()
375 .unwrap();
376
377 let msg2 = MessageBuilder::new("task1")
378 .args(vec![serde_json::json!(1)])
379 .build()
380 .unwrap();
381
382 let msg3 = MessageBuilder::new("task2")
383 .args(vec![serde_json::json!(2)])
384 .build()
385 .unwrap();
386
387 let messages = vec![msg1, msg2, msg3];
388 let filtered = filter_duplicates_by_content(messages);
389
390 assert_eq!(filtered.len(), 2);
392 }
393
394 #[test]
395 fn test_dedup_cache_clear() {
396 let mut cache = DedupCache::new(10, Duration::from_secs(60));
397 let msg = create_test_message("task1");
398
399 cache.mark_seen(&msg, false);
400 assert_eq!(cache.len(), 1);
401
402 cache.clear();
403 assert_eq!(cache.len(), 0);
404 assert!(cache.is_empty());
405 }
406
407 #[test]
408 fn test_simple_dedup_set_eviction() {
409 let mut dedup = SimpleDedupSet::new(4);
410
411 for i in 0..6 {
412 let msg = create_test_message(&format!("task{}", i));
413 dedup.mark_seen(&msg);
414 }
415
416 assert!(dedup.len() <= 4);
418 }
419}