1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::collections::hash_map::DefaultHasher;
5use std::collections::HashMap;
6use std::hash::{Hash, Hasher};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9use tracing::{debug, warn};
10use uuid::Uuid;
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct MessageId(String);
15
16impl MessageId {
17 pub fn new() -> Self {
18 Self(Uuid::new_v4().to_string())
19 }
20
21 pub fn from_string(id: String) -> Self {
22 Self(id)
23 }
24
25 pub fn as_str(&self) -> &str {
26 &self.0
27 }
28}
29
30impl Default for MessageId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl std::fmt::Display for MessageId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
44pub struct ContentHash(u64);
45
46impl ContentHash {
47 pub fn from_content(content: &[u8]) -> Self {
48 let mut hasher = DefaultHasher::new();
49 content.hash(&mut hasher);
50 Self(hasher.finish())
51 }
52
53 pub fn value(&self) -> u64 {
54 self.0
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub enum DeduplicationStrategy {
61 MessageId,
63 ContentHash,
65 IdAndContent,
67 CustomKey(String),
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct DeduplicatedMessage {
74 pub message_id: MessageId,
75 pub content_hash: ContentHash,
76 pub custom_key: Option<String>,
77 pub payload: Vec<u8>,
78 pub headers: HashMap<String, String>,
79 pub timestamp: DateTime<Utc>,
80 pub ttl: Option<Duration>,
81}
82
83impl DeduplicatedMessage {
84 pub fn new(payload: Vec<u8>) -> Self {
85 let content_hash = ContentHash::from_content(&payload);
86 Self {
87 message_id: MessageId::new(),
88 content_hash,
89 custom_key: None,
90 payload,
91 headers: HashMap::new(),
92 timestamp: Utc::now(),
93 ttl: None,
94 }
95 }
96
97 pub fn with_message_id(mut self, message_id: MessageId) -> Self {
98 self.message_id = message_id;
99 self
100 }
101
102 pub fn with_custom_key(mut self, key: String) -> Self {
103 self.custom_key = Some(key);
104 self
105 }
106
107 pub fn with_header(mut self, key: String, value: String) -> Self {
108 self.headers.insert(key, value);
109 self
110 }
111
112 pub fn with_ttl(mut self, ttl: Duration) -> Self {
113 self.ttl = Some(ttl);
114 self
115 }
116
117 pub fn get_dedup_key(&self, strategy: &DeduplicationStrategy) -> String {
119 match strategy {
120 DeduplicationStrategy::MessageId => self.message_id.as_str().to_string(),
121 DeduplicationStrategy::ContentHash => self.content_hash.value().to_string(),
122 DeduplicationStrategy::IdAndContent => {
123 format!("{}:{}", self.message_id.as_str(), self.content_hash.value())
124 }
125 DeduplicationStrategy::CustomKey(_key) => self
126 .custom_key
127 .as_ref()
128 .unwrap_or(&self.message_id.0)
129 .clone(),
130 }
131 }
132
133 pub fn is_expired(&self) -> bool {
135 if let Some(ttl) = self.ttl {
136 let elapsed = Utc::now().signed_duration_since(self.timestamp);
137 elapsed.to_std().unwrap_or(Duration::ZERO) > ttl
138 } else {
139 false
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq)]
146pub enum DeduplicationResult {
147 Unique,
149 Duplicate(DuplicateInfo),
151}
152
153#[derive(Debug, Clone, PartialEq)]
155pub struct DuplicateInfo {
156 pub original_message_id: MessageId,
157 pub original_timestamp: DateTime<Utc>,
158 pub duplicate_count: u32,
159}
160
161#[derive(Debug, Clone)]
163struct DeduplicationRecord {
164 message_id: MessageId,
165 timestamp: DateTime<Utc>,
166 access_count: u32,
167 last_accessed: Instant,
168}
169
170impl DeduplicationRecord {
171 fn new(message_id: MessageId) -> Self {
172 Self {
173 message_id,
174 timestamp: Utc::now(),
175 access_count: 1,
176 last_accessed: Instant::now(),
177 }
178 }
179
180 fn increment_access(&mut self) {
181 self.access_count += 1;
182 self.last_accessed = Instant::now();
183 }
184}
185
186#[derive(Debug, Clone)]
188pub struct DeduplicationConfig {
189 pub strategy: DeduplicationStrategy,
190 pub default_ttl: Duration,
191 pub cache_size_limit: usize,
192 pub cleanup_interval: Duration,
193}
194
195impl Default for DeduplicationConfig {
196 fn default() -> Self {
197 Self {
198 strategy: DeduplicationStrategy::MessageId,
199 default_ttl: Duration::from_secs(24 * 60 * 60),
200 cache_size_limit: 100_000,
201 cleanup_interval: Duration::from_secs(300), }
203 }
204}
205
206#[derive(Debug)]
208pub struct DeduplicationManager {
209 config: DeduplicationConfig,
210 dedup_cache: Arc<Mutex<HashMap<String, DeduplicationRecord>>>,
211}
212
213impl DeduplicationManager {
214 pub fn new(config: DeduplicationConfig) -> Self {
215 let manager = Self {
216 config,
217 dedup_cache: Arc::new(Mutex::new(HashMap::new())),
218 };
219
220 manager.start_cleanup_task();
222 manager
223 }
224
225 pub fn check_duplicate(&self, message: &DeduplicatedMessage) -> Result<DeduplicationResult> {
227 let dedup_key = message.get_dedup_key(&self.config.strategy);
228
229 debug!(
230 message_id = %message.message_id,
231 dedup_key = %dedup_key,
232 "Checking for duplicate message"
233 );
234
235 let mut cache = self.dedup_cache.lock().unwrap();
236
237 if let Some(record) = cache.get_mut(&dedup_key) {
238 record.increment_access();
240
241 warn!(
242 message_id = %message.message_id,
243 original_message_id = %record.message_id,
244 duplicate_count = record.access_count,
245 "Duplicate message detected"
246 );
247
248 Ok(DeduplicationResult::Duplicate(DuplicateInfo {
249 original_message_id: record.message_id.clone(),
250 original_timestamp: record.timestamp,
251 duplicate_count: record.access_count,
252 }))
253 } else {
254 let record = DeduplicationRecord::new(message.message_id.clone());
256 cache.insert(dedup_key.clone(), record);
257
258 debug!(
259 message_id = %message.message_id,
260 dedup_key = %dedup_key,
261 "Message is unique"
262 );
263
264 Ok(DeduplicationResult::Unique)
265 }
266 }
267
268 pub fn mark_processed(&self, message: &DeduplicatedMessage) -> Result<()> {
270 let dedup_key = message.get_dedup_key(&self.config.strategy);
271 let mut cache = self.dedup_cache.lock().unwrap();
272
273 cache
274 .entry(dedup_key)
275 .or_insert_with(|| DeduplicationRecord::new(message.message_id.clone()));
276
277 Ok(())
278 }
279
280 pub fn cache_stats(&self) -> DeduplicationStats {
282 let cache = self.dedup_cache.lock().unwrap();
283
284 let total_entries = cache.len();
285 let total_access_count: u32 = cache.values().map(|record| record.access_count).sum();
286
287 DeduplicationStats {
288 total_entries,
289 total_access_count,
290 cache_hit_rate: if total_access_count > 0 {
291 ((total_access_count - total_entries as u32) as f64 / total_access_count as f64)
292 * 100.0
293 } else {
294 0.0
295 },
296 }
297 }
298
299 pub fn cleanup_expired(&self) -> usize {
301 let mut cache = self.dedup_cache.lock().unwrap();
302 let mut expired_keys = Vec::new();
303 let now = Instant::now();
304
305 for (key, record) in cache.iter() {
306 let age = now.duration_since(record.last_accessed);
308 if age > self.config.default_ttl {
309 expired_keys.push(key.clone());
310 }
311 }
312
313 let expired_count = expired_keys.len();
314 for key in expired_keys {
315 cache.remove(&key);
316 }
317
318 if expired_count > 0 {
319 debug!(
320 expired_count = expired_count,
321 "Cleaned up expired deduplication entries"
322 );
323 }
324
325 expired_count
326 }
327
328 fn start_cleanup_task(&self) {
330 let cache = self.dedup_cache.clone();
331 let cleanup_interval = self.config.cleanup_interval;
332 let default_ttl = self.config.default_ttl;
333 let cache_size_limit = self.config.cache_size_limit;
334
335 tokio::spawn(async move {
336 let mut interval = tokio::time::interval(cleanup_interval);
337
338 loop {
339 interval.tick().await;
340
341 let mut expired_keys = Vec::new();
343 let now = Instant::now();
344
345 {
346 let cache = cache.lock().unwrap();
347 for (key, record) in cache.iter() {
348 let age = now.duration_since(record.last_accessed);
349 if age > default_ttl {
350 expired_keys.push(key.clone());
351 }
352 }
353 }
354
355 if !expired_keys.is_empty() {
356 let mut cache = cache.lock().unwrap();
357 for key in &expired_keys {
358 cache.remove(key);
359 }
360
361 debug!(
362 expired_count = expired_keys.len(),
363 "Background cleanup removed expired entries"
364 );
365 }
366
367 {
369 let mut cache = cache.lock().unwrap();
370 if cache.len() > cache_size_limit {
371 let mut entries: Vec<_> = cache
372 .iter()
373 .map(|(k, v)| (k.clone(), v.last_accessed))
374 .collect();
375
376 entries.sort_by(|a, b| a.1.cmp(&b.1));
377
378 let remove_count = cache.len() - cache_size_limit;
379 for (key, _) in entries.into_iter().take(remove_count) {
380 cache.remove(&key);
381 }
382
383 debug!(
384 removed_count = remove_count,
385 "Background cleanup removed LRU entries to enforce size limit"
386 );
387 }
388 }
389 }
390 });
391 }
392}
393
394#[derive(Debug, Clone)]
396pub struct DeduplicationStats {
397 pub total_entries: usize,
398 pub total_access_count: u32,
399 pub cache_hit_rate: f64,
400}
401
402#[async_trait::async_trait]
404pub trait DeduplicationStore {
405 async fn is_duplicate(&self, key: &str) -> Result<bool>;
406 async fn mark_processed(&self, key: &str, message_id: &MessageId) -> Result<()>;
407 async fn cleanup_expired(&self) -> Result<usize>;
408}
409
410#[derive(Debug)]
412pub struct RedisDeduplicationStore {
413 _connection_string: String,
415}
416
417impl RedisDeduplicationStore {
418 pub fn new(connection_string: String) -> Self {
419 Self {
420 _connection_string: connection_string,
421 }
422 }
423}
424
425#[async_trait::async_trait]
426impl DeduplicationStore for RedisDeduplicationStore {
427 async fn is_duplicate(&self, _key: &str) -> Result<bool> {
428 Ok(false)
431 }
432
433 async fn mark_processed(&self, _key: &str, _message_id: &MessageId) -> Result<()> {
434 Ok(())
436 }
437
438 async fn cleanup_expired(&self) -> Result<usize> {
439 Ok(0)
442 }
443}
444
445pub struct DistributedDeduplicationManager {
447 config: DeduplicationConfig,
448 store: Arc<dyn DeduplicationStore + Send + Sync>,
449 local_cache: Arc<Mutex<HashMap<String, DeduplicationRecord>>>,
450}
451
452impl DistributedDeduplicationManager {
453 pub fn new(
454 config: DeduplicationConfig,
455 store: Arc<dyn DeduplicationStore + Send + Sync>,
456 ) -> Self {
457 Self {
458 config,
459 store,
460 local_cache: Arc::new(Mutex::new(HashMap::new())),
461 }
462 }
463
464 pub async fn check_duplicate(
465 &self,
466 message: &DeduplicatedMessage,
467 ) -> Result<DeduplicationResult> {
468 let dedup_key = message.get_dedup_key(&self.config.strategy);
469
470 {
472 let mut cache = self.local_cache.lock().unwrap();
473 if let Some(record) = cache.get_mut(&dedup_key) {
474 record.increment_access();
475 return Ok(DeduplicationResult::Duplicate(DuplicateInfo {
476 original_message_id: record.message_id.clone(),
477 original_timestamp: record.timestamp,
478 duplicate_count: record.access_count,
479 }));
480 }
481 }
482
483 if self.store.is_duplicate(&dedup_key).await? {
485 {
487 let mut cache = self.local_cache.lock().unwrap();
488 let record = DeduplicationRecord::new(message.message_id.clone());
489 cache.insert(dedup_key, record);
490 }
491
492 Ok(DeduplicationResult::Duplicate(DuplicateInfo {
493 original_message_id: message.message_id.clone(), original_timestamp: Utc::now(), duplicate_count: 1,
496 }))
497 } else {
498 self.store
500 .mark_processed(&dedup_key, &message.message_id)
501 .await?;
502
503 {
504 let mut cache = self.local_cache.lock().unwrap();
505 let record = DeduplicationRecord::new(message.message_id.clone());
506 cache.insert(dedup_key, record);
507 }
508
509 Ok(DeduplicationResult::Unique)
510 }
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517
518 #[test]
519 fn test_message_id_generation() {
520 let id1 = MessageId::new();
521 let id2 = MessageId::new();
522 assert_ne!(id1, id2);
523 }
524
525 #[test]
526 fn test_content_hash() {
527 let content1 = b"hello world";
528 let content2 = b"hello world";
529 let content3 = b"different content";
530
531 let hash1 = ContentHash::from_content(content1);
532 let hash2 = ContentHash::from_content(content2);
533 let hash3 = ContentHash::from_content(content3);
534
535 assert_eq!(hash1, hash2);
536 assert_ne!(hash1, hash3);
537 }
538
539 #[test]
540 fn test_deduplication_keys() {
541 let payload = b"test message".to_vec();
542 let message = DeduplicatedMessage::new(payload).with_custom_key("custom_123".to_string());
543
544 let key1 = message.get_dedup_key(&DeduplicationStrategy::MessageId);
545 let key2 = message.get_dedup_key(&DeduplicationStrategy::ContentHash);
546 let key3 = message.get_dedup_key(&DeduplicationStrategy::IdAndContent);
547 let key4 = message.get_dedup_key(&DeduplicationStrategy::CustomKey("test".to_string()));
548
549 assert_eq!(key1, message.message_id.as_str());
550 assert_eq!(key2, message.content_hash.value().to_string());
551 assert!(key3.contains(&message.message_id.as_str()));
552 assert!(key3.contains(&message.content_hash.value().to_string()));
553 assert_eq!(key4, "custom_123");
554 }
555
556 #[tokio::test]
557 async fn test_deduplication_manager() {
558 let config = DeduplicationConfig::default();
559 let manager = DeduplicationManager::new(config);
560
561 let payload = b"test message".to_vec();
562 let message = DeduplicatedMessage::new(payload);
563
564 let result1 = manager.check_duplicate(&message).unwrap();
566 assert_eq!(result1, DeduplicationResult::Unique);
567
568 let result2 = manager.check_duplicate(&message).unwrap();
570 assert!(matches!(result2, DeduplicationResult::Duplicate(_)));
571
572 if let DeduplicationResult::Duplicate(info) = result2 {
573 assert_eq!(info.original_message_id, message.message_id);
574 assert_eq!(info.duplicate_count, 2);
575 }
576 }
577
578 #[tokio::test]
579 async fn test_different_strategies() {
580 let config_id = DeduplicationConfig {
581 strategy: DeduplicationStrategy::MessageId,
582 ..Default::default()
583 };
584 let manager_id = DeduplicationManager::new(config_id);
585
586 let config_content = DeduplicationConfig {
587 strategy: DeduplicationStrategy::ContentHash,
588 ..Default::default()
589 };
590 let manager_content = DeduplicationManager::new(config_content);
591
592 let payload = b"same content".to_vec();
594 let message1 = DeduplicatedMessage::new(payload.clone());
595 let message2 = DeduplicatedMessage::new(payload).with_message_id(MessageId::new());
596
597 let result1_id = manager_id.check_duplicate(&message1).unwrap();
599 let result2_id = manager_id.check_duplicate(&message2).unwrap();
600 assert_eq!(result1_id, DeduplicationResult::Unique);
601 assert_eq!(result2_id, DeduplicationResult::Unique);
602
603 let result1_content = manager_content.check_duplicate(&message1).unwrap();
605 let result2_content = manager_content.check_duplicate(&message2).unwrap();
606 assert_eq!(result1_content, DeduplicationResult::Unique);
607 assert!(matches!(result2_content, DeduplicationResult::Duplicate(_)));
608 }
609
610 #[test]
611 fn test_message_expiry() {
612 let mut message =
613 DeduplicatedMessage::new(b"test".to_vec()).with_ttl(Duration::from_millis(1));
614
615 assert!(!message.is_expired());
616
617 message.timestamp = Utc::now() - chrono::Duration::seconds(1);
619 assert!(message.is_expired());
620 }
621
622 #[tokio::test]
623 async fn test_cache_cleanup() {
624 let config = DeduplicationConfig {
625 default_ttl: Duration::from_millis(100),
626 ..Default::default()
627 };
628 let manager = DeduplicationManager::new(config);
629
630 let payload = b"test message".to_vec();
631 let message = DeduplicatedMessage::new(payload);
632
633 manager.check_duplicate(&message).unwrap();
635
636 let stats = manager.cache_stats();
638 assert_eq!(stats.total_entries, 1);
639
640 tokio::time::sleep(Duration::from_millis(150)).await;
642
643 let cleaned = manager.cleanup_expired();
645 assert_eq!(cleaned, 1);
646
647 let stats = manager.cache_stats();
649 assert_eq!(stats.total_entries, 0);
650 }
651}