celers_protocol/
dedup.rs

1//! Message deduplication utilities
2//!
3//! This module provides utilities for preventing duplicate message processing
4//! through various deduplication strategies.
5
6use crate::Message;
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::hash::{Hash, Hasher};
9use std::time::{Duration, Instant};
10use uuid::Uuid;
11
12/// Deduplication key for a message
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub enum DedupKey {
15    /// Use the message task ID
16    TaskId(Uuid),
17    /// Use the task name and arguments hash
18    ContentHash(u64),
19    /// Custom key
20    Custom(String),
21}
22
23impl DedupKey {
24    /// Create a dedup key from a message's task ID
25    pub fn from_task_id(message: &Message) -> Self {
26        Self::TaskId(message.headers.id)
27    }
28
29    /// Create a dedup key from message content hash
30    pub fn from_content(message: &Message) -> Self {
31        let mut hasher = std::collections::hash_map::DefaultHasher::new();
32        message.headers.task.hash(&mut hasher);
33        message.body.hash(&mut hasher);
34        Self::ContentHash(hasher.finish())
35    }
36
37    /// Create a custom dedup key
38    pub fn custom(key: impl Into<String>) -> Self {
39        Self::Custom(key.into())
40    }
41}
42
43/// Entry in the deduplication cache with expiry
44#[derive(Debug, Clone)]
45struct DedupEntry {
46    inserted_at: Instant,
47}
48
49/// Message deduplication cache
50#[derive(Debug, Clone)]
51pub struct DedupCache {
52    entries: HashMap<DedupKey, DedupEntry>,
53    max_size: usize,
54    ttl: Duration,
55    insertion_order: VecDeque<DedupKey>,
56}
57
58impl DedupCache {
59    /// Create a new deduplication cache
60    ///
61    /// # Arguments
62    ///
63    /// * `max_size` - Maximum number of entries to cache
64    /// * `ttl` - Time-to-live for each entry
65    pub fn new(max_size: usize, ttl: Duration) -> Self {
66        Self {
67            entries: HashMap::new(),
68            max_size,
69            ttl,
70            insertion_order: VecDeque::new(),
71        }
72    }
73
74    /// Create a cache with default settings (10000 entries, 1 hour TTL)
75    pub fn with_defaults() -> Self {
76        Self::new(10000, Duration::from_secs(3600))
77    }
78
79    /// Check if a key has been seen recently
80    pub fn contains(&mut self, key: &DedupKey) -> bool {
81        self.cleanup_expired();
82        self.entries.contains_key(key)
83    }
84
85    /// Insert a key into the cache
86    ///
87    /// Returns `true` if the key was newly inserted, `false` if it already existed
88    pub fn insert(&mut self, key: DedupKey) -> bool {
89        self.cleanup_expired();
90
91        if self.entries.contains_key(&key) {
92            return false;
93        }
94
95        // Evict oldest entry if at capacity
96        if self.entries.len() >= self.max_size {
97            if let Some(oldest_key) = self.insertion_order.pop_front() {
98                self.entries.remove(&oldest_key);
99            }
100        }
101
102        let entry = DedupEntry {
103            inserted_at: Instant::now(),
104        };
105
106        self.entries.insert(key.clone(), entry);
107        self.insertion_order.push_back(key);
108        true
109    }
110
111    /// Check if a message is a duplicate
112    ///
113    /// Returns `true` if the message has been seen before, `false` otherwise
114    pub fn is_duplicate(&mut self, message: &Message, use_content_hash: bool) -> bool {
115        let key = if use_content_hash {
116            DedupKey::from_content(message)
117        } else {
118            DedupKey::from_task_id(message)
119        };
120
121        self.contains(&key)
122    }
123
124    /// Mark a message as seen
125    ///
126    /// Returns `true` if this is the first time seeing this message, `false` if duplicate
127    pub fn mark_seen(&mut self, message: &Message, use_content_hash: bool) -> bool {
128        let key = if use_content_hash {
129            DedupKey::from_content(message)
130        } else {
131            DedupKey::from_task_id(message)
132        };
133
134        self.insert(key)
135    }
136
137    /// Remove expired entries from the cache
138    fn cleanup_expired(&mut self) {
139        let now = Instant::now();
140        let ttl = self.ttl;
141
142        // Remove expired entries
143        self.entries
144            .retain(|_, entry| now.duration_since(entry.inserted_at) < ttl);
145
146        // Clean up insertion order
147        self.insertion_order
148            .retain(|key| self.entries.contains_key(key));
149    }
150
151    /// Clear all entries from the cache
152    pub fn clear(&mut self) {
153        self.entries.clear();
154        self.insertion_order.clear();
155    }
156
157    /// Get the number of entries in the cache
158    #[inline]
159    pub fn len(&self) -> usize {
160        self.entries.len()
161    }
162
163    /// Check if the cache is empty
164    #[inline]
165    pub fn is_empty(&self) -> bool {
166        self.entries.is_empty()
167    }
168}
169
170/// Simple deduplication using a HashSet of task IDs
171#[derive(Debug, Clone)]
172pub struct SimpleDedupSet {
173    seen_ids: HashSet<Uuid>,
174    max_size: usize,
175}
176
177impl SimpleDedupSet {
178    /// Create a new simple deduplication set
179    pub fn new(max_size: usize) -> Self {
180        Self {
181            seen_ids: HashSet::new(),
182            max_size,
183        }
184    }
185
186    /// Check if a message ID has been seen
187    pub fn contains(&self, message: &Message) -> bool {
188        self.seen_ids.contains(&message.headers.id)
189    }
190
191    /// Mark a message ID as seen
192    ///
193    /// Returns `true` if newly inserted, `false` if already seen
194    pub fn mark_seen(&mut self, message: &Message) -> bool {
195        if self.seen_ids.len() >= self.max_size {
196            // Simple eviction: clear half the set
197            let to_remove: Vec<_> = self
198                .seen_ids
199                .iter()
200                .take(self.max_size / 2)
201                .copied()
202                .collect();
203            for id in to_remove {
204                self.seen_ids.remove(&id);
205            }
206        }
207
208        self.seen_ids.insert(message.headers.id)
209    }
210
211    /// Clear all seen IDs
212    pub fn clear(&mut self) {
213        self.seen_ids.clear();
214    }
215
216    /// Get the number of seen IDs
217    #[inline]
218    pub fn len(&self) -> usize {
219        self.seen_ids.len()
220    }
221
222    /// Check if the set is empty
223    #[inline]
224    pub fn is_empty(&self) -> bool {
225        self.seen_ids.is_empty()
226    }
227}
228
229/// Filter out duplicate messages from a collection
230pub fn filter_duplicates(messages: Vec<Message>) -> Vec<Message> {
231    let mut seen = HashSet::new();
232    messages
233        .into_iter()
234        .filter(|msg| seen.insert(msg.headers.id))
235        .collect()
236}
237
238/// Filter duplicates based on content hash
239pub fn filter_duplicates_by_content(messages: Vec<Message>) -> Vec<Message> {
240    let mut seen = HashSet::new();
241    messages
242        .into_iter()
243        .filter(|msg| {
244            let key = DedupKey::from_content(msg);
245            seen.insert(key)
246        })
247        .collect()
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::builder::MessageBuilder;
254
255    fn create_test_message(task: &str) -> Message {
256        MessageBuilder::new(task).build().unwrap()
257    }
258
259    #[test]
260    fn test_dedup_key_from_task_id() {
261        let msg = create_test_message("task1");
262        let key = DedupKey::from_task_id(&msg);
263
264        match key {
265            DedupKey::TaskId(id) => assert_eq!(id, msg.headers.id),
266            _ => panic!("Expected TaskId"),
267        }
268    }
269
270    #[test]
271    fn test_dedup_key_from_content() {
272        let msg1 = MessageBuilder::new("task1")
273            .args(vec![serde_json::json!(42)])
274            .build()
275            .unwrap();
276
277        let msg2 = MessageBuilder::new("task1")
278            .args(vec![serde_json::json!(42)])
279            .build()
280            .unwrap();
281
282        let key1 = DedupKey::from_content(&msg1);
283        let key2 = DedupKey::from_content(&msg2);
284
285        // Same content should produce same hash
286        assert_eq!(key1, key2);
287    }
288
289    #[test]
290    fn test_dedup_cache_insert() {
291        let mut cache = DedupCache::new(3, Duration::from_secs(60));
292        let msg1 = create_test_message("task1");
293        let msg2 = create_test_message("task2");
294
295        assert!(cache.mark_seen(&msg1, false));
296        assert!(!cache.mark_seen(&msg1, false)); // Duplicate
297        assert!(cache.mark_seen(&msg2, false));
298    }
299
300    #[test]
301    fn test_dedup_cache_is_duplicate() {
302        let mut cache = DedupCache::new(3, Duration::from_secs(60));
303        let msg = create_test_message("task1");
304
305        assert!(!cache.is_duplicate(&msg, false));
306        cache.mark_seen(&msg, false);
307        assert!(cache.is_duplicate(&msg, false));
308    }
309
310    #[test]
311    fn test_dedup_cache_eviction() {
312        let mut cache = DedupCache::new(2, Duration::from_secs(60));
313        let msg1 = create_test_message("task1");
314        let msg2 = create_test_message("task2");
315        let msg3 = create_test_message("task3");
316
317        cache.mark_seen(&msg1, false);
318        cache.mark_seen(&msg2, false);
319        assert_eq!(cache.len(), 2);
320
321        // Should evict oldest (msg1)
322        cache.mark_seen(&msg3, false);
323        assert_eq!(cache.len(), 2);
324    }
325
326    #[test]
327    fn test_dedup_cache_content_hash() {
328        let mut cache = DedupCache::new(10, Duration::from_secs(60));
329
330        let msg1 = MessageBuilder::new("task1")
331            .args(vec![serde_json::json!(1)])
332            .build()
333            .unwrap();
334
335        let msg2 = MessageBuilder::new("task1")
336            .args(vec![serde_json::json!(1)])
337            .build()
338            .unwrap();
339
340        assert!(cache.mark_seen(&msg1, true));
341        assert!(!cache.mark_seen(&msg2, true)); // Same content, different ID
342    }
343
344    #[test]
345    fn test_simple_dedup_set() {
346        let mut dedup = SimpleDedupSet::new(10);
347        let msg1 = create_test_message("task1");
348        let msg2 = create_test_message("task2");
349
350        assert!(dedup.mark_seen(&msg1));
351        assert!(!dedup.mark_seen(&msg1)); // Duplicate
352        assert!(dedup.mark_seen(&msg2));
353
354        assert!(dedup.contains(&msg1));
355        assert!(dedup.contains(&msg2));
356    }
357
358    #[test]
359    fn test_filter_duplicates() {
360        let msg1 = create_test_message("task1");
361        let msg2 = create_test_message("task2");
362        let msg1_dup = msg1.clone();
363
364        let messages = vec![msg1, msg2, msg1_dup];
365        let filtered = filter_duplicates(messages);
366
367        assert_eq!(filtered.len(), 2);
368    }
369
370    #[test]
371    fn test_filter_duplicates_by_content() {
372        let msg1 = MessageBuilder::new("task1")
373            .args(vec![serde_json::json!(1)])
374            .build()
375            .unwrap();
376
377        let msg2 = MessageBuilder::new("task1")
378            .args(vec![serde_json::json!(1)])
379            .build()
380            .unwrap();
381
382        let msg3 = MessageBuilder::new("task2")
383            .args(vec![serde_json::json!(2)])
384            .build()
385            .unwrap();
386
387        let messages = vec![msg1, msg2, msg3];
388        let filtered = filter_duplicates_by_content(messages);
389
390        // msg1 and msg2 have same content, should be deduplicated
391        assert_eq!(filtered.len(), 2);
392    }
393
394    #[test]
395    fn test_dedup_cache_clear() {
396        let mut cache = DedupCache::new(10, Duration::from_secs(60));
397        let msg = create_test_message("task1");
398
399        cache.mark_seen(&msg, false);
400        assert_eq!(cache.len(), 1);
401
402        cache.clear();
403        assert_eq!(cache.len(), 0);
404        assert!(cache.is_empty());
405    }
406
407    #[test]
408    fn test_simple_dedup_set_eviction() {
409        let mut dedup = SimpleDedupSet::new(4);
410
411        for i in 0..6 {
412            let msg = create_test_message(&format!("task{}", i));
413            dedup.mark_seen(&msg);
414        }
415
416        // Should have evicted some entries
417        assert!(dedup.len() <= 4);
418    }
419}