1use crate::error::StreamError;
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use sha2::{Digest, Sha256};
21use std::collections::{BTreeMap, HashMap};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::RwLock;
25use tracing::{debug, info};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct IdempotentDeliveryConfig {
34 pub key_ttl: Duration,
36 pub max_keys: usize,
38 pub eviction_interval: Duration,
40 pub cache_outcomes: bool,
42 pub max_cached_outcomes: usize,
44 pub partition_aware: bool,
46 pub hash_algorithm: HashAlgorithm,
48}
49
50impl Default for IdempotentDeliveryConfig {
51 fn default() -> Self {
52 Self {
53 key_ttl: Duration::from_secs(3600),
54 max_keys: 1_000_000,
55 eviction_interval: Duration::from_secs(60),
56 cache_outcomes: true,
57 max_cached_outcomes: 100_000,
58 partition_aware: true,
59 hash_algorithm: HashAlgorithm::Sha256,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66pub enum HashAlgorithm {
67 Sha256,
69 FastHash,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
79pub struct IdempotencyKey {
80 pub digest: String,
82 pub partition: Option<u32>,
84 pub producer_id: Option<String>,
86}
87
88impl IdempotencyKey {
89 pub fn from_content(content: &[u8], algorithm: HashAlgorithm) -> Self {
91 let digest = match algorithm {
92 HashAlgorithm::Sha256 => {
93 let mut hasher = Sha256::new();
94 hasher.update(content);
95 hex::encode(hasher.finalize())
96 }
97 HashAlgorithm::FastHash => {
98 let hash = fnv1a_hash(content);
99 format!("{hash:016x}")
100 }
101 };
102 Self {
103 digest,
104 partition: None,
105 producer_id: None,
106 }
107 }
108
109 pub fn with_partition(mut self, partition: u32) -> Self {
111 self.partition = Some(partition);
112 self
113 }
114
115 pub fn with_producer(mut self, producer: String) -> Self {
117 self.producer_id = Some(producer);
118 self
119 }
120
121 pub fn from_string(key: String) -> Self {
123 Self {
124 digest: key,
125 partition: None,
126 producer_id: None,
127 }
128 }
129
130 pub fn composite_key(&self) -> String {
132 match self.partition {
133 Some(p) => format!("{}:{}", p, self.digest),
134 None => self.digest.clone(),
135 }
136 }
137}
138
139fn fnv1a_hash(data: &[u8]) -> u64 {
141 let mut hash: u64 = 0xcbf29ce484222325;
142 for &byte in data {
143 hash ^= byte as u64;
144 hash = hash.wrapping_mul(0x100000001b3);
145 }
146 hash
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum DeliveryOutcome {
156 Success {
158 result: Option<String>,
160 processed_at: DateTime<Utc>,
162 },
163 Failure {
165 error: String,
167 retryable: bool,
169 failed_at: DateTime<Utc>,
171 },
172 InProgress {
174 started_at: DateTime<Utc>,
176 },
177}
178
179#[derive(Debug, Clone)]
185struct TrackedKey {
186 first_seen: Instant,
188 last_accessed: Instant,
190 submission_count: u64,
192 outcome: Option<DeliveryOutcome>,
194}
195
196#[derive(Debug, Clone, Default, Serialize, Deserialize)]
202pub struct IdempotentDeliveryStats {
203 pub total_submitted: u64,
205 pub accepted: u64,
207 pub duplicates_rejected: u64,
209 pub cache_hits: u64,
211 pub active_keys: usize,
213 pub keys_evicted: u64,
215 pub eviction_sweeps: u64,
217 pub duplicate_rate: f64,
219 pub avg_key_lifetime_secs: f64,
221 pub partition_key_counts: HashMap<u32, usize>,
223}
224
225#[derive(Debug, Clone)]
231pub struct KeyCheckResult {
232 pub key: IdempotencyKey,
234 pub is_duplicate: bool,
236 pub cached_outcome: Option<DeliveryOutcome>,
238 pub submission_count: u64,
240}
241
242pub struct IdempotentDeliveryManager {
251 config: IdempotentDeliveryConfig,
252 keys: Arc<RwLock<HashMap<String, TrackedKey>>>,
254 expiry_queue: Arc<RwLock<BTreeMap<(Instant, String), ()>>>,
256 stats: Arc<RwLock<IdempotentDeliveryStats>>,
258 last_eviction: Arc<RwLock<Instant>>,
260}
261
262impl IdempotentDeliveryManager {
263 pub fn new(config: IdempotentDeliveryConfig) -> Self {
265 Self {
266 config,
267 keys: Arc::new(RwLock::new(HashMap::new())),
268 expiry_queue: Arc::new(RwLock::new(BTreeMap::new())),
269 stats: Arc::new(RwLock::new(IdempotentDeliveryStats::default())),
270 last_eviction: Arc::new(RwLock::new(Instant::now())),
271 }
272 }
273
274 pub fn with_defaults() -> Self {
276 Self::new(IdempotentDeliveryConfig::default())
277 }
278
279 pub fn generate_key(&self, content: &[u8]) -> IdempotencyKey {
281 IdempotencyKey::from_content(content, self.config.hash_algorithm)
282 }
283
284 pub fn generate_partitioned_key(&self, content: &[u8], partition: u32) -> IdempotencyKey {
286 IdempotencyKey::from_content(content, self.config.hash_algorithm).with_partition(partition)
287 }
288
289 pub async fn check_and_register(&self, key: &IdempotencyKey) -> Result<bool, StreamError> {
294 self.maybe_evict().await;
295
296 let composite = key.composite_key();
297 let now = Instant::now();
298
299 let mut keys = self.keys.write().await;
300 let mut stats = self.stats.write().await;
301 stats.total_submitted += 1;
302
303 if let Some(entry) = keys.get_mut(&composite) {
304 if now.duration_since(entry.first_seen) > self.config.key_ttl {
306 keys.remove(&composite);
308 let mut expiry = self.expiry_queue.write().await;
309 expiry.retain(|(_t, k), _| k != &composite);
311 } else {
313 entry.submission_count += 1;
314 entry.last_accessed = now;
315 stats.duplicates_rejected += 1;
316 stats.duplicate_rate = if stats.total_submitted > 0 {
317 stats.duplicates_rejected as f64 / stats.total_submitted as f64
318 } else {
319 0.0
320 };
321 debug!(key = %composite, count = entry.submission_count, "Duplicate key detected");
322 return Ok(true);
323 }
324 }
325
326 if keys.len() >= self.config.max_keys {
328 let mut expiry = self.expiry_queue.write().await;
330 if let Some(((_, oldest_key), _)) = expiry.pop_first() {
331 keys.remove(&oldest_key);
332 stats.keys_evicted += 1;
333 }
334 }
335
336 let entry = TrackedKey {
338 first_seen: now,
339 last_accessed: now,
340 submission_count: 1,
341 outcome: None,
342 };
343 keys.insert(composite.clone(), entry);
344
345 let expiry_time = now + self.config.key_ttl;
347 let mut expiry = self.expiry_queue.write().await;
348 expiry.insert((expiry_time, composite), ());
349
350 stats.accepted += 1;
351 stats.active_keys = keys.len();
352
353 if let Some(p) = key.partition {
355 *stats.partition_key_counts.entry(p).or_insert(0) += 1;
356 }
357
358 stats.duplicate_rate = if stats.total_submitted > 0 {
359 stats.duplicates_rejected as f64 / stats.total_submitted as f64
360 } else {
361 0.0
362 };
363
364 Ok(false)
365 }
366
367 pub async fn record_outcome(
369 &self,
370 key: &IdempotencyKey,
371 outcome: DeliveryOutcome,
372 ) -> Result<(), StreamError> {
373 if !self.config.cache_outcomes {
374 return Ok(());
375 }
376
377 let composite = key.composite_key();
378 let mut keys = self.keys.write().await;
379
380 if let Some(entry) = keys.get_mut(&composite) {
381 entry.outcome = Some(outcome);
382 Ok(())
383 } else {
384 Err(StreamError::NotFound(format!("Key not found: {composite}")))
385 }
386 }
387
388 pub async fn get_outcome(
390 &self,
391 key: &IdempotencyKey,
392 ) -> Result<Option<DeliveryOutcome>, StreamError> {
393 let composite = key.composite_key();
394 let keys = self.keys.read().await;
395 let mut stats = self.stats.write().await;
396
397 if let Some(entry) = keys.get(&composite) {
398 if entry.outcome.is_some() {
399 stats.cache_hits += 1;
400 }
401 Ok(entry.outcome.clone())
402 } else {
403 Ok(None)
404 }
405 }
406
407 pub async fn check_batch(
409 &self,
410 keys_to_check: &[IdempotencyKey],
411 ) -> Result<Vec<KeyCheckResult>, StreamError> {
412 self.maybe_evict().await;
413
414 let stored_keys = self.keys.read().await;
415 let mut results = Vec::with_capacity(keys_to_check.len());
416
417 for key in keys_to_check {
418 let composite = key.composite_key();
419 let (is_duplicate, cached_outcome, submission_count) =
420 if let Some(entry) = stored_keys.get(&composite) {
421 let now = Instant::now();
422 if now.duration_since(entry.first_seen) > self.config.key_ttl {
423 (false, None, 0)
424 } else {
425 (true, entry.outcome.clone(), entry.submission_count)
426 }
427 } else {
428 (false, None, 0)
429 };
430
431 results.push(KeyCheckResult {
432 key: key.clone(),
433 is_duplicate,
434 cached_outcome,
435 submission_count,
436 });
437 }
438
439 Ok(results)
440 }
441
442 pub async fn evict_expired(&self) -> usize {
444 let now = Instant::now();
445 let mut keys = self.keys.write().await;
446 let mut expiry = self.expiry_queue.write().await;
447 let mut stats = self.stats.write().await;
448 let mut evicted = 0;
449
450 let cutoff = (now, String::new());
452 let expired: Vec<(Instant, String)> = expiry
453 .range(..=cutoff)
454 .map(|((t, k), _)| (*t, k.clone()))
455 .collect();
456
457 for (t, k) in &expired {
458 expiry.remove(&(*t, k.clone()));
459 if keys.remove(k).is_some() {
460 evicted += 1;
461 }
462 }
463
464 stats.keys_evicted += evicted as u64;
465 stats.eviction_sweeps += 1;
466 stats.active_keys = keys.len();
467
468 if evicted > 0 {
469 info!(
470 evicted,
471 remaining = keys.len(),
472 "Evicted expired idempotency keys"
473 );
474 }
475
476 evicted
477 }
478
479 pub async fn contains_key(&self, key: &IdempotencyKey) -> bool {
481 let composite = key.composite_key();
482 let keys = self.keys.read().await;
483 if let Some(entry) = keys.get(&composite) {
484 Instant::now().duration_since(entry.first_seen) <= self.config.key_ttl
485 } else {
486 false
487 }
488 }
489
490 pub async fn remove_key(&self, key: &IdempotencyKey) -> bool {
492 let composite = key.composite_key();
493 let mut keys = self.keys.write().await;
494 let removed = keys.remove(&composite).is_some();
495 if removed {
496 let mut stats = self.stats.write().await;
497 stats.active_keys = keys.len();
498 }
499 removed
500 }
501
502 pub async fn clear(&self) {
504 let mut keys = self.keys.write().await;
505 let mut expiry = self.expiry_queue.write().await;
506 let mut stats = self.stats.write().await;
507 keys.clear();
508 expiry.clear();
509 stats.active_keys = 0;
510 info!("Cleared all idempotency keys");
511 }
512
513 pub async fn stats(&self) -> IdempotentDeliveryStats {
515 let stats = self.stats.read().await;
516 stats.clone()
517 }
518
519 pub fn config(&self) -> &IdempotentDeliveryConfig {
521 &self.config
522 }
523
524 async fn maybe_evict(&self) {
526 let should_evict = {
527 let last = self.last_eviction.read().await;
528 last.elapsed() >= self.config.eviction_interval
529 };
530
531 if should_evict {
532 let mut last = self.last_eviction.write().await;
533 if last.elapsed() >= self.config.eviction_interval {
534 *last = Instant::now();
535 drop(last);
537 self.evict_expired().await;
538 }
539 }
540 }
541
542 pub async fn active_key_count(&self) -> usize {
544 let keys = self.keys.read().await;
545 keys.len()
546 }
547
548 pub async fn submission_count(&self, key: &IdempotencyKey) -> u64 {
550 let composite = key.composite_key();
551 let keys = self.keys.read().await;
552 keys.get(&composite).map_or(0, |e| e.submission_count)
553 }
554
555 pub fn is_cache_enabled(&self) -> bool {
557 self.config.cache_outcomes
558 }
559}
560
561pub struct IdempotentProducer {
573 manager: Arc<IdempotentDeliveryManager>,
574}
575
576impl IdempotentProducer {
577 pub fn new(manager: Arc<IdempotentDeliveryManager>) -> Self {
579 Self { manager }
580 }
581
582 pub async fn try_deliver(
587 &self,
588 key: &IdempotencyKey,
589 ) -> Result<Option<DeliveryOutcome>, StreamError> {
590 let is_dup = self.manager.check_and_register(key).await?;
591 if is_dup {
592 let outcome = self.manager.get_outcome(key).await?;
594 Ok(outcome.or(Some(DeliveryOutcome::Success {
595 result: None,
596 processed_at: Utc::now(),
597 })))
598 } else {
599 Ok(None)
600 }
601 }
602
603 pub async fn mark_success(
605 &self,
606 key: &IdempotencyKey,
607 result: Option<String>,
608 ) -> Result<(), StreamError> {
609 self.manager
610 .record_outcome(
611 key,
612 DeliveryOutcome::Success {
613 result,
614 processed_at: Utc::now(),
615 },
616 )
617 .await
618 }
619
620 pub async fn mark_failure(
622 &self,
623 key: &IdempotencyKey,
624 error: String,
625 retryable: bool,
626 ) -> Result<(), StreamError> {
627 self.manager
628 .record_outcome(
629 key,
630 DeliveryOutcome::Failure {
631 error,
632 retryable,
633 failed_at: Utc::now(),
634 },
635 )
636 .await
637 }
638
639 pub fn manager(&self) -> &IdempotentDeliveryManager {
641 &self.manager
642 }
643}
644
645#[cfg(test)]
650mod tests {
651 use super::*;
652 use std::time::Duration;
653
654 fn default_manager() -> IdempotentDeliveryManager {
655 IdempotentDeliveryManager::new(IdempotentDeliveryConfig::default())
656 }
657
658 fn fast_ttl_manager(ttl_ms: u64) -> IdempotentDeliveryManager {
659 IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
660 key_ttl: Duration::from_millis(ttl_ms),
661 eviction_interval: Duration::from_millis(10),
662 ..Default::default()
663 })
664 }
665
666 #[tokio::test]
667 async fn test_new_key_is_not_duplicate() {
668 let mgr = default_manager();
669 let key = mgr.generate_key(b"event-1");
670 let is_dup = mgr.check_and_register(&key).await.expect("check failed");
671 assert!(!is_dup, "First submission should not be duplicate");
672 }
673
674 #[tokio::test]
675 async fn test_same_key_is_duplicate() {
676 let mgr = default_manager();
677 let key = mgr.generate_key(b"event-1");
678 mgr.check_and_register(&key).await.expect("check failed");
679 let is_dup = mgr.check_and_register(&key).await.expect("check failed");
680 assert!(is_dup, "Second submission should be duplicate");
681 }
682
683 #[tokio::test]
684 async fn test_different_keys_not_duplicate() {
685 let mgr = default_manager();
686 let k1 = mgr.generate_key(b"event-1");
687 let k2 = mgr.generate_key(b"event-2");
688 mgr.check_and_register(&k1).await.expect("check failed");
689 let is_dup = mgr.check_and_register(&k2).await.expect("check failed");
690 assert!(!is_dup, "Different events should not be duplicates");
691 }
692
693 #[tokio::test]
694 async fn test_stats_tracking() {
695 let mgr = default_manager();
696 let k1 = mgr.generate_key(b"event-1");
697 mgr.check_and_register(&k1).await.expect("check failed");
698 mgr.check_and_register(&k1).await.expect("check failed");
699 mgr.check_and_register(&k1).await.expect("check failed");
700
701 let stats = mgr.stats().await;
702 assert_eq!(stats.total_submitted, 3);
703 assert_eq!(stats.accepted, 1);
704 assert_eq!(stats.duplicates_rejected, 2);
705 assert!((stats.duplicate_rate - 2.0 / 3.0).abs() < 1e-10);
706 }
707
708 #[tokio::test]
709 async fn test_outcome_caching() {
710 let mgr = default_manager();
711 let key = mgr.generate_key(b"event-1");
712 mgr.check_and_register(&key).await.expect("check failed");
713
714 mgr.record_outcome(
715 &key,
716 DeliveryOutcome::Success {
717 result: Some("ok".into()),
718 processed_at: Utc::now(),
719 },
720 )
721 .await
722 .expect("record failed");
723
724 let outcome = mgr.get_outcome(&key).await.expect("get failed");
725 assert!(outcome.is_some());
726 if let Some(DeliveryOutcome::Success { result, .. }) = outcome {
727 assert_eq!(result, Some("ok".into()));
728 } else {
729 panic!("Expected Success outcome");
730 }
731 }
732
733 #[tokio::test]
734 async fn test_outcome_cache_disabled() {
735 let mgr = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
736 cache_outcomes: false,
737 ..Default::default()
738 });
739 let key = mgr.generate_key(b"event-1");
740 mgr.check_and_register(&key).await.expect("check failed");
741
742 mgr.record_outcome(
744 &key,
745 DeliveryOutcome::Success {
746 result: None,
747 processed_at: Utc::now(),
748 },
749 )
750 .await
751 .expect("record failed");
752
753 let outcome = mgr.get_outcome(&key).await.expect("get failed");
754 assert!(outcome.is_none());
755 }
756
757 #[tokio::test]
758 async fn test_ttl_expiration() {
759 let mgr = fast_ttl_manager(50);
760 let key = mgr.generate_key(b"event-1");
761 mgr.check_and_register(&key).await.expect("check failed");
762
763 tokio::time::sleep(Duration::from_millis(100)).await;
765
766 let is_dup = mgr.check_and_register(&key).await.expect("check failed");
768 assert!(!is_dup, "Expired key should be accepted as new");
769 }
770
771 #[tokio::test]
772 async fn test_evict_expired() {
773 let mgr = fast_ttl_manager(50);
774 let k1 = mgr.generate_key(b"event-1");
775 let k2 = mgr.generate_key(b"event-2");
776 mgr.check_and_register(&k1).await.expect("check failed");
777 mgr.check_and_register(&k2).await.expect("check failed");
778
779 assert_eq!(mgr.active_key_count().await, 2);
780
781 tokio::time::sleep(Duration::from_millis(100)).await;
782 let evicted = mgr.evict_expired().await;
783 assert!(evicted >= 1, "Should have evicted at least one key");
784 }
785
786 #[tokio::test]
787 async fn test_max_keys_eviction() {
788 let mgr = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
789 max_keys: 5,
790 ..Default::default()
791 });
792
793 for i in 0..10 {
794 let key = mgr.generate_key(format!("event-{i}").as_bytes());
795 mgr.check_and_register(&key).await.expect("check failed");
796 }
797
798 assert!(mgr.active_key_count().await <= 6); }
800
801 #[tokio::test]
802 async fn test_partitioned_keys() {
803 let mgr = default_manager();
804 let content = b"event-1";
805 let k1 = mgr.generate_partitioned_key(content, 0);
806 let k2 = mgr.generate_partitioned_key(content, 1);
807
808 mgr.check_and_register(&k1).await.expect("check failed");
809 let is_dup = mgr.check_and_register(&k2).await.expect("check failed");
810 assert!(
811 !is_dup,
812 "Same content in different partitions should not collide"
813 );
814 }
815
816 #[tokio::test]
817 async fn test_same_partition_duplicate() {
818 let mgr = default_manager();
819 let k1 = mgr.generate_partitioned_key(b"event-1", 0);
820 let k2 = mgr.generate_partitioned_key(b"event-1", 0);
821
822 mgr.check_and_register(&k1).await.expect("check failed");
823 let is_dup = mgr.check_and_register(&k2).await.expect("check failed");
824 assert!(is_dup, "Same content in same partition should be duplicate");
825 }
826
827 #[tokio::test]
828 async fn test_contains_key() {
829 let mgr = default_manager();
830 let key = mgr.generate_key(b"event-1");
831 assert!(!mgr.contains_key(&key).await);
832
833 mgr.check_and_register(&key).await.expect("check failed");
834 assert!(mgr.contains_key(&key).await);
835 }
836
837 #[tokio::test]
838 async fn test_remove_key() {
839 let mgr = default_manager();
840 let key = mgr.generate_key(b"event-1");
841 mgr.check_and_register(&key).await.expect("check failed");
842 assert!(mgr.contains_key(&key).await);
843
844 let removed = mgr.remove_key(&key).await;
845 assert!(removed);
846 assert!(!mgr.contains_key(&key).await);
847 }
848
849 #[tokio::test]
850 async fn test_clear_all_keys() {
851 let mgr = default_manager();
852 for i in 0..10 {
853 let key = mgr.generate_key(format!("event-{i}").as_bytes());
854 mgr.check_and_register(&key).await.expect("check failed");
855 }
856 assert_eq!(mgr.active_key_count().await, 10);
857 mgr.clear().await;
858 assert_eq!(mgr.active_key_count().await, 0);
859 }
860
861 #[tokio::test]
862 async fn test_batch_check() {
863 let mgr = default_manager();
864 let k1 = mgr.generate_key(b"event-1");
865 let k2 = mgr.generate_key(b"event-2");
866 let k3 = mgr.generate_key(b"event-3");
867
868 mgr.check_and_register(&k1).await.expect("check failed");
869
870 let results = mgr
871 .check_batch(&[k1.clone(), k2.clone(), k3.clone()])
872 .await
873 .expect("batch failed");
874 assert_eq!(results.len(), 3);
875 assert!(results[0].is_duplicate);
876 assert!(!results[1].is_duplicate);
877 assert!(!results[2].is_duplicate);
878 }
879
880 #[tokio::test]
881 async fn test_submission_count() {
882 let mgr = default_manager();
883 let key = mgr.generate_key(b"event-1");
884 mgr.check_and_register(&key).await.expect("check failed");
885 mgr.check_and_register(&key).await.expect("check failed");
886 mgr.check_and_register(&key).await.expect("check failed");
887
888 let count = mgr.submission_count(&key).await;
889 assert_eq!(count, 3);
890 }
891
892 #[tokio::test]
893 async fn test_idempotent_producer_new_event() {
894 let mgr = Arc::new(default_manager());
895 let producer = IdempotentProducer::new(mgr);
896 let key = producer.manager().generate_key(b"event-1");
897
898 let result = producer.try_deliver(&key).await.expect("deliver failed");
899 assert!(result.is_none(), "New event should return None");
900 }
901
902 #[tokio::test]
903 async fn test_idempotent_producer_duplicate_event() {
904 let mgr = Arc::new(default_manager());
905 let producer = IdempotentProducer::new(mgr);
906 let key = producer.manager().generate_key(b"event-1");
907
908 producer.try_deliver(&key).await.expect("deliver failed");
910 producer
911 .mark_success(&key, Some("done".into()))
912 .await
913 .expect("mark failed");
914
915 let result = producer.try_deliver(&key).await.expect("deliver failed");
917 assert!(result.is_some(), "Duplicate should return cached outcome");
918 }
919
920 #[tokio::test]
921 async fn test_idempotent_producer_mark_failure() {
922 let mgr = Arc::new(default_manager());
923 let producer = IdempotentProducer::new(mgr);
924 let key = producer.manager().generate_key(b"event-1");
925
926 producer.try_deliver(&key).await.expect("deliver failed");
927 producer
928 .mark_failure(&key, "timeout".into(), true)
929 .await
930 .expect("mark failed");
931
932 let outcome = producer
933 .manager()
934 .get_outcome(&key)
935 .await
936 .expect("get failed");
937 assert!(matches!(
938 outcome,
939 Some(DeliveryOutcome::Failure {
940 retryable: true,
941 ..
942 })
943 ));
944 }
945
946 #[tokio::test]
947 async fn test_fast_hash_algorithm() {
948 let mgr = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
949 hash_algorithm: HashAlgorithm::FastHash,
950 ..Default::default()
951 });
952 let key = mgr.generate_key(b"event-1");
953 assert!(!key.digest.is_empty());
954 assert_eq!(key.digest.len(), 16); }
956
957 #[tokio::test]
958 async fn test_sha256_algorithm() {
959 let mgr = default_manager();
960 let key = mgr.generate_key(b"event-1");
961 assert!(!key.digest.is_empty());
962 assert_eq!(key.digest.len(), 64); }
964
965 #[tokio::test]
966 async fn test_from_string_key() {
967 let mgr = default_manager();
968 let key = IdempotencyKey::from_string("custom-key-12345".into());
969 let is_dup = mgr.check_and_register(&key).await.expect("check failed");
970 assert!(!is_dup);
971
972 let is_dup2 = mgr.check_and_register(&key).await.expect("check failed");
973 assert!(is_dup2);
974 }
975
976 #[tokio::test]
977 async fn test_with_producer_id() {
978 let mgr = default_manager();
979 let key = mgr
980 .generate_key(b"event-1")
981 .with_producer("producer-a".into());
982 assert_eq!(key.producer_id, Some("producer-a".into()));
983 }
984
985 #[tokio::test]
986 async fn test_composite_key_format() {
987 let key = IdempotencyKey::from_string("abc".into()).with_partition(3);
988 assert_eq!(key.composite_key(), "3:abc");
989
990 let key2 = IdempotencyKey::from_string("abc".into());
991 assert_eq!(key2.composite_key(), "abc");
992 }
993
994 #[tokio::test]
995 async fn test_config_defaults() {
996 let config = IdempotentDeliveryConfig::default();
997 assert_eq!(config.key_ttl, Duration::from_secs(3600));
998 assert_eq!(config.max_keys, 1_000_000);
999 assert!(config.cache_outcomes);
1000 assert!(config.partition_aware);
1001 assert_eq!(config.hash_algorithm, HashAlgorithm::Sha256);
1002 }
1003
1004 #[tokio::test]
1005 async fn test_with_defaults_constructor() {
1006 let mgr = IdempotentDeliveryManager::with_defaults();
1007 assert_eq!(mgr.config().key_ttl, Duration::from_secs(3600));
1008 }
1009
1010 #[tokio::test]
1011 async fn test_is_cache_enabled() {
1012 let mgr = default_manager();
1013 assert!(mgr.is_cache_enabled());
1014
1015 let mgr2 = IdempotentDeliveryManager::new(IdempotentDeliveryConfig {
1016 cache_outcomes: false,
1017 ..Default::default()
1018 });
1019 assert!(!mgr2.is_cache_enabled());
1020 }
1021
1022 #[tokio::test]
1023 async fn test_concurrent_duplicate_detection() {
1024 let mgr = Arc::new(default_manager());
1025 let mut handles = Vec::new();
1026
1027 for _ in 0..10 {
1028 let m = Arc::clone(&mgr);
1029 handles.push(tokio::spawn(async move {
1030 let key = m.generate_key(b"shared-event");
1031 m.check_and_register(&key).await.expect("check failed")
1032 }));
1033 }
1034
1035 let mut accepted = 0;
1036 let mut duplicates = 0;
1037 for h in handles {
1038 let is_dup = h.await.expect("join failed");
1039 if is_dup {
1040 duplicates += 1;
1041 } else {
1042 accepted += 1;
1043 }
1044 }
1045
1046 assert_eq!(accepted, 1, "Exactly one should be accepted");
1047 assert_eq!(duplicates, 9, "Nine should be duplicates");
1048 }
1049
1050 #[tokio::test]
1051 async fn test_partition_stats() {
1052 let mgr = default_manager();
1053 let k1 = mgr.generate_partitioned_key(b"a", 0);
1054 let k2 = mgr.generate_partitioned_key(b"b", 0);
1055 let k3 = mgr.generate_partitioned_key(b"c", 1);
1056
1057 mgr.check_and_register(&k1).await.expect("check failed");
1058 mgr.check_and_register(&k2).await.expect("check failed");
1059 mgr.check_and_register(&k3).await.expect("check failed");
1060
1061 let stats = mgr.stats().await;
1062 assert_eq!(stats.partition_key_counts.get(&0), Some(&2));
1063 assert_eq!(stats.partition_key_counts.get(&1), Some(&1));
1064 }
1065
1066 #[tokio::test]
1067 async fn test_record_outcome_unknown_key() {
1068 let mgr = default_manager();
1069 let key = mgr.generate_key(b"unknown");
1070
1071 let result = mgr
1072 .record_outcome(
1073 &key,
1074 DeliveryOutcome::Success {
1075 result: None,
1076 processed_at: Utc::now(),
1077 },
1078 )
1079 .await;
1080 assert!(
1081 result.is_err(),
1082 "Recording outcome for unknown key should fail"
1083 );
1084 }
1085
1086 #[tokio::test]
1087 async fn test_remove_nonexistent_key() {
1088 let mgr = default_manager();
1089 let key = mgr.generate_key(b"nonexistent");
1090 let removed = mgr.remove_key(&key).await;
1091 assert!(!removed);
1092 }
1093
1094 #[tokio::test]
1095 async fn test_failure_outcome() {
1096 let mgr = default_manager();
1097 let key = mgr.generate_key(b"event-1");
1098 mgr.check_and_register(&key).await.expect("check failed");
1099
1100 mgr.record_outcome(
1101 &key,
1102 DeliveryOutcome::Failure {
1103 error: "network timeout".into(),
1104 retryable: true,
1105 failed_at: Utc::now(),
1106 },
1107 )
1108 .await
1109 .expect("record failed");
1110
1111 let outcome = mgr.get_outcome(&key).await.expect("get failed");
1112 if let Some(DeliveryOutcome::Failure {
1113 error, retryable, ..
1114 }) = outcome
1115 {
1116 assert_eq!(error, "network timeout");
1117 assert!(retryable);
1118 } else {
1119 panic!("Expected Failure outcome");
1120 }
1121 }
1122
1123 #[tokio::test]
1124 async fn test_in_progress_outcome() {
1125 let mgr = default_manager();
1126 let key = mgr.generate_key(b"event-1");
1127 mgr.check_and_register(&key).await.expect("check failed");
1128
1129 mgr.record_outcome(
1130 &key,
1131 DeliveryOutcome::InProgress {
1132 started_at: Utc::now(),
1133 },
1134 )
1135 .await
1136 .expect("record failed");
1137
1138 let outcome = mgr.get_outcome(&key).await.expect("get failed");
1139 assert!(matches!(outcome, Some(DeliveryOutcome::InProgress { .. })));
1140 }
1141
1142 #[tokio::test]
1143 async fn test_many_unique_events() {
1144 let mgr = default_manager();
1145 for i in 0u64..100 {
1146 let key = mgr.generate_key(&i.to_le_bytes());
1147 let is_dup = mgr.check_and_register(&key).await.expect("check failed");
1148 assert!(!is_dup, "All unique events should be accepted");
1149 }
1150 assert_eq!(mgr.active_key_count().await, 100);
1151 let stats = mgr.stats().await;
1152 assert_eq!(stats.accepted, 100);
1153 assert_eq!(stats.duplicates_rejected, 0);
1154 }
1155
1156 #[tokio::test]
1157 async fn test_fnv1a_deterministic() {
1158 let h1 = fnv1a_hash(b"hello");
1159 let h2 = fnv1a_hash(b"hello");
1160 assert_eq!(h1, h2);
1161 let h3 = fnv1a_hash(b"world");
1162 assert_ne!(h1, h3);
1163 }
1164
1165 #[tokio::test]
1166 async fn test_idempotency_key_serialize() {
1167 let key = IdempotencyKey::from_string("test-key".into()).with_partition(42);
1168 let json = serde_json::to_string(&key).expect("serialize failed");
1169 let deserialized: IdempotencyKey = serde_json::from_str(&json).expect("deserialize failed");
1170 assert_eq!(deserialized.digest, "test-key");
1171 assert_eq!(deserialized.partition, Some(42));
1172 }
1173
1174 #[tokio::test]
1175 async fn test_config_serialize() {
1176 let config = IdempotentDeliveryConfig::default();
1177 let json = serde_json::to_string(&config).expect("serialize failed");
1178 assert!(json.contains("key_ttl"));
1179 }
1180
1181 #[tokio::test]
1182 async fn test_stats_initial_values() {
1183 let mgr = default_manager();
1184 let stats = mgr.stats().await;
1185 assert_eq!(stats.total_submitted, 0);
1186 assert_eq!(stats.accepted, 0);
1187 assert_eq!(stats.duplicates_rejected, 0);
1188 assert_eq!(stats.cache_hits, 0);
1189 assert_eq!(stats.active_keys, 0);
1190 assert_eq!(stats.keys_evicted, 0);
1191 }
1192
1193 #[tokio::test]
1194 async fn test_cache_hit_stats() {
1195 let mgr = default_manager();
1196 let key = mgr.generate_key(b"event-1");
1197 mgr.check_and_register(&key).await.expect("check failed");
1198
1199 mgr.record_outcome(
1200 &key,
1201 DeliveryOutcome::Success {
1202 result: Some("cached".into()),
1203 processed_at: Utc::now(),
1204 },
1205 )
1206 .await
1207 .expect("record failed");
1208
1209 let _ = mgr.get_outcome(&key).await.expect("get failed");
1211 let stats = mgr.stats().await;
1212 assert_eq!(stats.cache_hits, 1);
1213 }
1214}