rust_rabbit/patterns/
deduplication.rs

1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::hash_map::DefaultHasher;
5use std::collections::HashMap;
6use std::hash::{Hash, Hasher};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use tracing::{debug, warn};
10use uuid::Uuid;
11
12/// Unique message identifier
13#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct MessageId(String);
15
16impl MessageId {
17    pub fn new() -> Self {
18        Self(Uuid::new_v4().to_string())
19    }
20
21    pub fn from_string(id: String) -> Self {
22        Self(id)
23    }
24
25    pub fn as_str(&self) -> &str {
26        &self.0
27    }
28}
29
30impl Default for MessageId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for MessageId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42/// Content-based hash for duplicate detection
43#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
44pub struct ContentHash(u64);
45
46impl ContentHash {
47    pub fn from_content(content: &[u8]) -> Self {
48        let mut hasher = DefaultHasher::new();
49        content.hash(&mut hasher);
50        Self(hasher.finish())
51    }
52
53    pub fn value(&self) -> u64 {
54        self.0
55    }
56}
57
58/// Deduplication strategies
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub enum DeduplicationStrategy {
61    /// Use message ID for deduplication
62    MessageId,
63    /// Use content hash for deduplication
64    ContentHash,
65    /// Use both message ID and content hash
66    IdAndContent,
67    /// Use custom key provided in message
68    CustomKey(String),
69}
70
71/// Message with deduplication metadata
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct DeduplicatedMessage {
74    pub message_id: MessageId,
75    pub content_hash: ContentHash,
76    pub custom_key: Option<String>,
77    pub payload: Vec<u8>,
78    pub headers: HashMap<String, String>,
79    pub timestamp: DateTime<Utc>,
80    pub ttl: Option<Duration>,
81}
82
83impl DeduplicatedMessage {
84    pub fn new(payload: Vec<u8>) -> Self {
85        let content_hash = ContentHash::from_content(&payload);
86        Self {
87            message_id: MessageId::new(),
88            content_hash,
89            custom_key: None,
90            payload,
91            headers: HashMap::new(),
92            timestamp: Utc::now(),
93            ttl: None,
94        }
95    }
96
97    pub fn with_message_id(mut self, message_id: MessageId) -> Self {
98        self.message_id = message_id;
99        self
100    }
101
102    pub fn with_custom_key(mut self, key: String) -> Self {
103        self.custom_key = Some(key);
104        self
105    }
106
107    pub fn with_header(mut self, key: String, value: String) -> Self {
108        self.headers.insert(key, value);
109        self
110    }
111
112    pub fn with_ttl(mut self, ttl: Duration) -> Self {
113        self.ttl = Some(ttl);
114        self
115    }
116
117    /// Get deduplication key based on strategy
118    pub fn get_dedup_key(&self, strategy: &DeduplicationStrategy) -> String {
119        match strategy {
120            DeduplicationStrategy::MessageId => self.message_id.as_str().to_string(),
121            DeduplicationStrategy::ContentHash => self.content_hash.value().to_string(),
122            DeduplicationStrategy::IdAndContent => {
123                format!("{}:{}", self.message_id.as_str(), self.content_hash.value())
124            }
125            DeduplicationStrategy::CustomKey(_key) => self
126                .custom_key
127                .as_ref()
128                .unwrap_or(&self.message_id.0)
129                .clone(),
130        }
131    }
132
133    /// Check if message has expired based on TTL
134    pub fn is_expired(&self) -> bool {
135        if let Some(ttl) = self.ttl {
136            let elapsed = Utc::now().signed_duration_since(self.timestamp);
137            elapsed.to_std().unwrap_or(Duration::ZERO) > ttl
138        } else {
139            false
140        }
141    }
142}
143
144/// Deduplication result
145#[derive(Debug, Clone, PartialEq)]
146pub enum DeduplicationResult {
147    /// Message is unique, should be processed
148    Unique,
149    /// Message is a duplicate, should be ignored
150    Duplicate(DuplicateInfo),
151}
152
153/// Information about detected duplicate
154#[derive(Debug, Clone, PartialEq)]
155pub struct DuplicateInfo {
156    pub original_message_id: MessageId,
157    pub original_timestamp: DateTime<Utc>,
158    pub duplicate_count: u32,
159}
160
161/// Deduplication record stored in cache
162#[derive(Debug, Clone)]
163struct DeduplicationRecord {
164    message_id: MessageId,
165    timestamp: DateTime<Utc>,
166    access_count: u32,
167    last_accessed: Instant,
168}
169
170impl DeduplicationRecord {
171    fn new(message_id: MessageId) -> Self {
172        Self {
173            message_id,
174            timestamp: Utc::now(),
175            access_count: 1,
176            last_accessed: Instant::now(),
177        }
178    }
179
180    fn increment_access(&mut self) {
181        self.access_count += 1;
182        self.last_accessed = Instant::now();
183    }
184}
185
186/// Configuration for deduplication manager
187#[derive(Debug, Clone)]
188pub struct DeduplicationConfig {
189    pub strategy: DeduplicationStrategy,
190    pub default_ttl: Duration,
191    pub cache_size_limit: usize,
192    pub cleanup_interval: Duration,
193}
194
195impl Default for DeduplicationConfig {
196    fn default() -> Self {
197        Self {
198            strategy: DeduplicationStrategy::MessageId,
199            default_ttl: Duration::from_secs(24 * 60 * 60),
200            cache_size_limit: 100_000,
201            cleanup_interval: Duration::from_secs(300), // 5 minutes
202        }
203    }
204}
205
206/// Message deduplication manager
207#[derive(Debug)]
208pub struct DeduplicationManager {
209    config: DeduplicationConfig,
210    dedup_cache: Arc<Mutex<HashMap<String, DeduplicationRecord>>>,
211}
212
213impl DeduplicationManager {
214    pub fn new(config: DeduplicationConfig) -> Self {
215        let manager = Self {
216            config,
217            dedup_cache: Arc::new(Mutex::new(HashMap::new())),
218        };
219
220        // Start cleanup task
221        manager.start_cleanup_task();
222        manager
223    }
224
225    /// Check if message is duplicate
226    pub fn check_duplicate(&self, message: &DeduplicatedMessage) -> Result<DeduplicationResult> {
227        let dedup_key = message.get_dedup_key(&self.config.strategy);
228
229        debug!(
230            message_id = %message.message_id,
231            dedup_key = %dedup_key,
232            "Checking for duplicate message"
233        );
234
235        let mut cache = self.dedup_cache.lock().unwrap();
236
237        if let Some(record) = cache.get_mut(&dedup_key) {
238            // Found duplicate
239            record.increment_access();
240
241            warn!(
242                message_id = %message.message_id,
243                original_message_id = %record.message_id,
244                duplicate_count = record.access_count,
245                "Duplicate message detected"
246            );
247
248            Ok(DeduplicationResult::Duplicate(DuplicateInfo {
249                original_message_id: record.message_id.clone(),
250                original_timestamp: record.timestamp,
251                duplicate_count: record.access_count,
252            }))
253        } else {
254            // New unique message
255            let record = DeduplicationRecord::new(message.message_id.clone());
256            cache.insert(dedup_key.clone(), record);
257
258            debug!(
259                message_id = %message.message_id,
260                dedup_key = %dedup_key,
261                "Message is unique"
262            );
263
264            Ok(DeduplicationResult::Unique)
265        }
266    }
267
268    /// Manually mark a message as processed (for external dedup stores)
269    pub fn mark_processed(&self, message: &DeduplicatedMessage) -> Result<()> {
270        let dedup_key = message.get_dedup_key(&self.config.strategy);
271        let mut cache = self.dedup_cache.lock().unwrap();
272
273        cache
274            .entry(dedup_key)
275            .or_insert_with(|| DeduplicationRecord::new(message.message_id.clone()));
276
277        Ok(())
278    }
279
280    /// Get cache statistics
281    pub fn cache_stats(&self) -> DeduplicationStats {
282        let cache = self.dedup_cache.lock().unwrap();
283
284        let total_entries = cache.len();
285        let total_access_count: u32 = cache.values().map(|record| record.access_count).sum();
286
287        DeduplicationStats {
288            total_entries,
289            total_access_count,
290            cache_hit_rate: if total_access_count > 0 {
291                ((total_access_count - total_entries as u32) as f64 / total_access_count as f64)
292                    * 100.0
293            } else {
294                0.0
295            },
296        }
297    }
298
299    /// Clear expired entries from cache
300    pub fn cleanup_expired(&self) -> usize {
301        let mut cache = self.dedup_cache.lock().unwrap();
302        let mut expired_keys = Vec::new();
303        let now = Instant::now();
304
305        for (key, record) in cache.iter() {
306            // Remove entries older than TTL
307            let age = now.duration_since(record.last_accessed);
308            if age > self.config.default_ttl {
309                expired_keys.push(key.clone());
310            }
311        }
312
313        let expired_count = expired_keys.len();
314        for key in expired_keys {
315            cache.remove(&key);
316        }
317
318        if expired_count > 0 {
319            debug!(
320                expired_count = expired_count,
321                "Cleaned up expired deduplication entries"
322            );
323        }
324
325        expired_count
326    }
327
328    /// Start background cleanup task
329    fn start_cleanup_task(&self) {
330        let cache = self.dedup_cache.clone();
331        let cleanup_interval = self.config.cleanup_interval;
332        let default_ttl = self.config.default_ttl;
333        let cache_size_limit = self.config.cache_size_limit;
334
335        tokio::spawn(async move {
336            let mut interval = tokio::time::interval(cleanup_interval);
337
338            loop {
339                interval.tick().await;
340
341                // Cleanup expired entries
342                let mut expired_keys = Vec::new();
343                let now = Instant::now();
344
345                {
346                    let cache = cache.lock().unwrap();
347                    for (key, record) in cache.iter() {
348                        let age = now.duration_since(record.last_accessed);
349                        if age > default_ttl {
350                            expired_keys.push(key.clone());
351                        }
352                    }
353                }
354
355                if !expired_keys.is_empty() {
356                    let mut cache = cache.lock().unwrap();
357                    for key in &expired_keys {
358                        cache.remove(key);
359                    }
360
361                    debug!(
362                        expired_count = expired_keys.len(),
363                        "Background cleanup removed expired entries"
364                    );
365                }
366
367                // Enforce cache size limit (LRU eviction)
368                {
369                    let mut cache = cache.lock().unwrap();
370                    if cache.len() > cache_size_limit {
371                        let mut entries: Vec<_> = cache
372                            .iter()
373                            .map(|(k, v)| (k.clone(), v.last_accessed))
374                            .collect();
375
376                        entries.sort_by(|a, b| a.1.cmp(&b.1));
377
378                        let remove_count = cache.len() - cache_size_limit;
379                        for (key, _) in entries.into_iter().take(remove_count) {
380                            cache.remove(&key);
381                        }
382
383                        debug!(
384                            removed_count = remove_count,
385                            "Background cleanup removed LRU entries to enforce size limit"
386                        );
387                    }
388                }
389            }
390        });
391    }
392}
393
394/// Deduplication statistics
395#[derive(Debug, Clone)]
396pub struct DeduplicationStats {
397    pub total_entries: usize,
398    pub total_access_count: u32,
399    pub cache_hit_rate: f64,
400}
401
402/// Trait for custom deduplication stores
403#[async_trait::async_trait]
404pub trait DeduplicationStore {
405    async fn is_duplicate(&self, key: &str) -> Result<bool>;
406    async fn mark_processed(&self, key: &str, message_id: &MessageId) -> Result<()>;
407    async fn cleanup_expired(&self) -> Result<usize>;
408}
409
410/// Redis-based deduplication store (placeholder implementation)
411#[derive(Debug)]
412pub struct RedisDeduplicationStore {
413    // This would contain Redis connection details
414    _connection_string: String,
415}
416
417impl RedisDeduplicationStore {
418    pub fn new(connection_string: String) -> Self {
419        Self {
420            _connection_string: connection_string,
421        }
422    }
423}
424
425#[async_trait::async_trait]
426impl DeduplicationStore for RedisDeduplicationStore {
427    async fn is_duplicate(&self, _key: &str) -> Result<bool> {
428        // TODO: Implement Redis SETNX operation
429        // This would use Redis SET with NX option to atomically check and set
430        Ok(false)
431    }
432
433    async fn mark_processed(&self, _key: &str, _message_id: &MessageId) -> Result<()> {
434        // TODO: Implement Redis SET operation with TTL
435        Ok(())
436    }
437
438    async fn cleanup_expired(&self) -> Result<usize> {
439        // Redis handles TTL automatically, so this might be a no-op
440        // or could scan for keys with specific patterns
441        Ok(0)
442    }
443}
444
445/// Distributed deduplication manager using external store
446pub struct DistributedDeduplicationManager {
447    config: DeduplicationConfig,
448    store: Arc<dyn DeduplicationStore + Send + Sync>,
449    local_cache: Arc<Mutex<HashMap<String, DeduplicationRecord>>>,
450}
451
452impl DistributedDeduplicationManager {
453    pub fn new(
454        config: DeduplicationConfig,
455        store: Arc<dyn DeduplicationStore + Send + Sync>,
456    ) -> Self {
457        Self {
458            config,
459            store,
460            local_cache: Arc::new(Mutex::new(HashMap::new())),
461        }
462    }
463
464    pub async fn check_duplicate(
465        &self,
466        message: &DeduplicatedMessage,
467    ) -> Result<DeduplicationResult> {
468        let dedup_key = message.get_dedup_key(&self.config.strategy);
469
470        // Check local cache first
471        {
472            let mut cache = self.local_cache.lock().unwrap();
473            if let Some(record) = cache.get_mut(&dedup_key) {
474                record.increment_access();
475                return Ok(DeduplicationResult::Duplicate(DuplicateInfo {
476                    original_message_id: record.message_id.clone(),
477                    original_timestamp: record.timestamp,
478                    duplicate_count: record.access_count,
479                }));
480            }
481        }
482
483        // Check distributed store
484        if self.store.is_duplicate(&dedup_key).await? {
485            // Add to local cache for faster future lookups
486            {
487                let mut cache = self.local_cache.lock().unwrap();
488                let record = DeduplicationRecord::new(message.message_id.clone());
489                cache.insert(dedup_key, record);
490            }
491
492            Ok(DeduplicationResult::Duplicate(DuplicateInfo {
493                original_message_id: message.message_id.clone(), // We don't have the original ID from store
494                original_timestamp: Utc::now(), // We don't have the original timestamp from store
495                duplicate_count: 1,
496            }))
497        } else {
498            // Mark as processed in both local cache and distributed store
499            self.store
500                .mark_processed(&dedup_key, &message.message_id)
501                .await?;
502
503            {
504                let mut cache = self.local_cache.lock().unwrap();
505                let record = DeduplicationRecord::new(message.message_id.clone());
506                cache.insert(dedup_key, record);
507            }
508
509            Ok(DeduplicationResult::Unique)
510        }
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    #[test]
519    fn test_message_id_generation() {
520        let id1 = MessageId::new();
521        let id2 = MessageId::new();
522        assert_ne!(id1, id2);
523    }
524
525    #[test]
526    fn test_content_hash() {
527        let content1 = b"hello world";
528        let content2 = b"hello world";
529        let content3 = b"different content";
530
531        let hash1 = ContentHash::from_content(content1);
532        let hash2 = ContentHash::from_content(content2);
533        let hash3 = ContentHash::from_content(content3);
534
535        assert_eq!(hash1, hash2);
536        assert_ne!(hash1, hash3);
537    }
538
539    #[test]
540    fn test_deduplication_keys() {
541        let payload = b"test message".to_vec();
542        let message = DeduplicatedMessage::new(payload).with_custom_key("custom_123".to_string());
543
544        let key1 = message.get_dedup_key(&DeduplicationStrategy::MessageId);
545        let key2 = message.get_dedup_key(&DeduplicationStrategy::ContentHash);
546        let key3 = message.get_dedup_key(&DeduplicationStrategy::IdAndContent);
547        let key4 = message.get_dedup_key(&DeduplicationStrategy::CustomKey("test".to_string()));
548
549        assert_eq!(key1, message.message_id.as_str());
550        assert_eq!(key2, message.content_hash.value().to_string());
551        assert!(key3.contains(&message.message_id.as_str()));
552        assert!(key3.contains(&message.content_hash.value().to_string()));
553        assert_eq!(key4, "custom_123");
554    }
555
556    #[tokio::test]
557    async fn test_deduplication_manager() {
558        let config = DeduplicationConfig::default();
559        let manager = DeduplicationManager::new(config);
560
561        let payload = b"test message".to_vec();
562        let message = DeduplicatedMessage::new(payload);
563
564        // First check should be unique
565        let result1 = manager.check_duplicate(&message).unwrap();
566        assert_eq!(result1, DeduplicationResult::Unique);
567
568        // Second check should be duplicate
569        let result2 = manager.check_duplicate(&message).unwrap();
570        assert!(matches!(result2, DeduplicationResult::Duplicate(_)));
571
572        if let DeduplicationResult::Duplicate(info) = result2 {
573            assert_eq!(info.original_message_id, message.message_id);
574            assert_eq!(info.duplicate_count, 2);
575        }
576    }
577
578    #[tokio::test]
579    async fn test_different_strategies() {
580        let config_id = DeduplicationConfig {
581            strategy: DeduplicationStrategy::MessageId,
582            ..Default::default()
583        };
584        let manager_id = DeduplicationManager::new(config_id);
585
586        let config_content = DeduplicationConfig {
587            strategy: DeduplicationStrategy::ContentHash,
588            ..Default::default()
589        };
590        let manager_content = DeduplicationManager::new(config_content);
591
592        // Same content, different message IDs
593        let payload = b"same content".to_vec();
594        let message1 = DeduplicatedMessage::new(payload.clone());
595        let message2 = DeduplicatedMessage::new(payload).with_message_id(MessageId::new());
596
597        // For message ID strategy - should be unique (different IDs)
598        let result1_id = manager_id.check_duplicate(&message1).unwrap();
599        let result2_id = manager_id.check_duplicate(&message2).unwrap();
600        assert_eq!(result1_id, DeduplicationResult::Unique);
601        assert_eq!(result2_id, DeduplicationResult::Unique);
602
603        // For content hash strategy - should be duplicate (same content)
604        let result1_content = manager_content.check_duplicate(&message1).unwrap();
605        let result2_content = manager_content.check_duplicate(&message2).unwrap();
606        assert_eq!(result1_content, DeduplicationResult::Unique);
607        assert!(matches!(result2_content, DeduplicationResult::Duplicate(_)));
608    }
609
610    #[test]
611    fn test_message_expiry() {
612        let mut message =
613            DeduplicatedMessage::new(b"test".to_vec()).with_ttl(Duration::from_millis(1));
614
615        assert!(!message.is_expired());
616
617        // Manually set timestamp to past
618        message.timestamp = Utc::now() - chrono::Duration::seconds(1);
619        assert!(message.is_expired());
620    }
621
622    #[tokio::test]
623    async fn test_cache_cleanup() {
624        let config = DeduplicationConfig {
625            default_ttl: Duration::from_millis(100),
626            ..Default::default()
627        };
628        let manager = DeduplicationManager::new(config);
629
630        let payload = b"test message".to_vec();
631        let message = DeduplicatedMessage::new(payload);
632
633        // Add message to cache
634        manager.check_duplicate(&message).unwrap();
635
636        // Initially should have 1 entry
637        let stats = manager.cache_stats();
638        assert_eq!(stats.total_entries, 1);
639
640        // Wait for TTL to expire
641        tokio::time::sleep(Duration::from_millis(150)).await;
642
643        // Cleanup expired entries
644        let cleaned = manager.cleanup_expired();
645        assert_eq!(cleaned, 1);
646
647        // Should have 0 entries now
648        let stats = manager.cache_stats();
649        assert_eq!(stats.total_entries, 0);
650    }
651}