1use anyhow::{anyhow, Result};
12use chrono::{DateTime, Duration as ChronoDuration, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21use crate::StreamEvent;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum DeliveryGuarantee {
26 AtMostOnce,
28 AtLeastOnce,
30 ExactlyOnce,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct ReliabilityConfig {
37 pub delivery_guarantee: DeliveryGuarantee,
39 pub enable_deduplication: bool,
41 pub deduplication_window: Duration,
43 pub max_retries: u32,
45 pub initial_backoff: Duration,
47 pub max_backoff: Duration,
49 pub backoff_multiplier: f64,
51 pub backoff_jitter: bool,
53 pub dlq_config: Option<DlqConfig>,
55 pub ack_timeout: Duration,
57 pub enable_persistence: bool,
59 pub max_in_flight: usize,
61 pub preserve_ordering: bool,
63}
64
65impl Default for ReliabilityConfig {
66 fn default() -> Self {
67 Self {
68 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
69 enable_deduplication: true,
70 deduplication_window: Duration::from_secs(300), max_retries: 3,
72 initial_backoff: Duration::from_millis(100),
73 max_backoff: Duration::from_secs(30),
74 backoff_multiplier: 2.0,
75 backoff_jitter: true,
76 dlq_config: Some(DlqConfig::default()),
77 ack_timeout: Duration::from_secs(30),
78 enable_persistence: false,
79 max_in_flight: 1000,
80 preserve_ordering: false,
81 }
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct DlqConfig {
88 pub enabled: bool,
90 pub topic: String,
92 pub max_size: usize,
94 pub retention: Duration,
96 pub include_error_details: bool,
98 pub enable_replay: bool,
100 pub max_replay_attempts: u32,
102 pub replay_backoff: Duration,
104 pub replay_batch_size: usize,
106}
107
108impl Default for DlqConfig {
109 fn default() -> Self {
110 Self {
111 enabled: true,
112 topic: "oxirs-dlq".to_string(),
113 max_size: 10000,
114 retention: Duration::from_secs(86400 * 7), include_error_details: true,
116 enable_replay: true,
117 max_replay_attempts: 3,
118 replay_backoff: Duration::from_secs(60), replay_batch_size: 100,
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
126pub enum ReplayStatus {
127 #[default]
129 Available,
130 InProgress,
132 Succeeded,
134 Failed,
136 Paused,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ReliableMessage {
143 pub message_id: String,
145 pub event: StreamEvent,
147 pub retry_count: u32,
149 pub first_attempt: DateTime<Utc>,
151 pub last_attempt: DateTime<Utc>,
153 pub errors: Vec<String>,
155 pub checksum: Option<String>,
157 pub sequence_number: Option<u64>,
159 pub partition_key: Option<String>,
161 pub replay_count: u32,
163 pub last_replay_attempt: Option<DateTime<Utc>>,
165 pub replay_status: ReplayStatus,
167}
168
169impl ReliableMessage {
170 pub fn new(event: StreamEvent) -> Self {
172 let now = Utc::now();
173 Self {
174 message_id: Uuid::new_v4().to_string(),
175 event,
176 retry_count: 0,
177 first_attempt: now,
178 last_attempt: now,
179 errors: Vec::new(),
180 checksum: None,
181 sequence_number: None,
182 partition_key: None,
183 replay_count: 0,
184 last_replay_attempt: None,
185 replay_status: ReplayStatus::default(),
186 }
187 }
188
189 pub fn add_error(&mut self, error: String) {
191 self.errors.push(error);
192 self.retry_count += 1;
193 self.last_attempt = Utc::now();
194 }
195
196 pub fn should_retry(&self, max_retries: u32) -> bool {
198 self.retry_count < max_retries
199 }
200
201 pub fn next_retry_delay(&self, config: &ReliabilityConfig) -> Duration {
203 let base_delay = config.initial_backoff.as_millis() as f64
204 * config.backoff_multiplier.powi(self.retry_count as i32);
205
206 let mut delay = Duration::from_millis(base_delay as u64).min(config.max_backoff);
207
208 if config.backoff_jitter {
210 #[allow(unused_imports)]
211 use scirs2_core::random::{Random, Rng};
212 let mut random = Random::default();
213 let jitter = random.gen_range(0.8..1.2);
214 delay = Duration::from_millis((delay.as_millis() as f64 * jitter) as u64);
215 }
216
217 delay
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct DeliveryConfirmation {
224 pub message_id: String,
226 pub status: DeliveryStatus,
228 pub timestamp: DateTime<Utc>,
230 pub metadata: HashMap<String, String>,
232}
233
234#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
236pub enum DeliveryStatus {
237 Delivered,
239 Failed(String),
241 DeadLettered(String),
243 Pending,
245}
246
247pub struct ReliabilityManager {
249 config: ReliabilityConfig,
250 dedup_cache: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
252 in_flight: Arc<RwLock<HashMap<String, ReliableMessage>>>,
254 retry_queue: Arc<Mutex<VecDeque<ReliableMessage>>>,
256 dlq: Arc<Mutex<VecDeque<ReliableMessage>>>,
258 sequence_counter: Arc<RwLock<u64>>,
260 ack_tracker: Arc<RwLock<HashMap<String, Instant>>>,
262 in_flight_semaphore: Arc<Semaphore>,
264 shutdown_tx: Option<mpsc::Sender<()>>,
266 shutdown_rx: Arc<Mutex<Option<mpsc::Receiver<()>>>>,
267}
268
269impl ReliabilityManager {
270 pub fn new(config: ReliabilityConfig) -> Self {
272 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
273 let in_flight_semaphore = Arc::new(Semaphore::new(config.max_in_flight));
274
275 Self {
276 config,
277 dedup_cache: Arc::new(RwLock::new(HashMap::new())),
278 in_flight: Arc::new(RwLock::new(HashMap::new())),
279 retry_queue: Arc::new(Mutex::new(VecDeque::new())),
280 dlq: Arc::new(Mutex::new(VecDeque::new())),
281 sequence_counter: Arc::new(RwLock::new(0)),
282 ack_tracker: Arc::new(RwLock::new(HashMap::new())),
283 in_flight_semaphore,
284 shutdown_tx: Some(shutdown_tx),
285 shutdown_rx: Arc::new(Mutex::new(Some(shutdown_rx))),
286 }
287 }
288
289 pub async fn start(&self) -> Result<()> {
291 self.start_dedup_cleanup_task().await;
293
294 self.start_ack_timeout_checker().await;
296
297 self.start_retry_processor().await;
299
300 info!("Reliability manager started");
301 Ok(())
302 }
303
304 pub async fn prepare_message(&self, event: StreamEvent) -> Result<ReliableMessage> {
306 let mut message = ReliableMessage::new(event);
307
308 if self.config.preserve_ordering {
310 let mut counter = self.sequence_counter.write().await;
311 *counter += 1;
312 message.sequence_number = Some(*counter);
313 }
314
315 if self.config.enable_deduplication {
317 if self.is_duplicate(&message.message_id).await? {
318 return Err(anyhow!(
319 "Duplicate message detected: {}",
320 message.message_id
321 ));
322 }
323 self.record_message_id(&message.message_id).await?;
324 }
325
326 let _permit = self
328 .in_flight_semaphore
329 .acquire()
330 .await
331 .map_err(|_| anyhow!("Failed to acquire in-flight permit"))?;
332
333 self.in_flight
335 .write()
336 .await
337 .insert(message.message_id.clone(), message.clone());
338
339 self.ack_tracker
341 .write()
342 .await
343 .insert(message.message_id.clone(), Instant::now());
344
345 Ok(message)
346 }
347
348 pub async fn record_delivery(&self, message_id: &str) -> Result<()> {
350 self.in_flight.write().await.remove(message_id);
352
353 self.ack_tracker.write().await.remove(message_id);
355
356 debug!("Recorded successful delivery for message: {}", message_id);
359 Ok(())
360 }
361
362 pub async fn record_failure(&self, message_id: &str, error: String) -> Result<DeliveryStatus> {
364 let mut in_flight = self.in_flight.write().await;
365
366 if let Some(mut message) = in_flight.remove(message_id) {
367 message.add_error(error.clone());
368
369 if message.should_retry(self.config.max_retries) {
370 self.retry_queue.lock().await.push_back(message);
372 Ok(DeliveryStatus::Pending)
373 } else {
374 if let Some(dlq_config) = &self.config.dlq_config {
376 if dlq_config.enabled {
377 self.send_to_dlq(message).await?;
378 Ok(DeliveryStatus::DeadLettered(error))
379 } else {
380 Ok(DeliveryStatus::Failed(error))
381 }
382 } else {
383 Ok(DeliveryStatus::Failed(error))
384 }
385 }
386 } else {
387 Err(anyhow!(
388 "Message not found in in-flight tracking: {}",
389 message_id
390 ))
391 }
392 }
393
394 async fn is_duplicate(&self, message_id: &str) -> Result<bool> {
396 let cache = self.dedup_cache.read().await;
397 Ok(cache.contains_key(message_id))
398 }
399
400 async fn record_message_id(&self, message_id: &str) -> Result<()> {
402 let expiry = Utc::now()
403 + ChronoDuration::from_std(self.config.deduplication_window)
404 .map_err(|e| anyhow!("Invalid deduplication window: {}", e))?;
405
406 self.dedup_cache
407 .write()
408 .await
409 .insert(message_id.to_string(), expiry);
410
411 Ok(())
412 }
413
414 async fn send_to_dlq(&self, message: ReliableMessage) -> Result<()> {
416 let mut dlq = self.dlq.lock().await;
417
418 if let Some(dlq_config) = &self.config.dlq_config {
420 if dlq.len() >= dlq_config.max_size {
421 warn!("DLQ is full, dropping oldest message");
422 dlq.pop_front();
423 }
424 }
425
426 dlq.push_back(message.clone());
427 info!(
428 "Message {} sent to DLQ after {} retries",
429 message.message_id, message.retry_count
430 );
431
432 Ok(())
433 }
434
435 pub async fn get_retry_message(&self) -> Option<ReliableMessage> {
437 self.retry_queue.lock().await.pop_front()
438 }
439
440 pub async fn get_dlq_messages(&self, limit: usize) -> Vec<ReliableMessage> {
442 let dlq = self.dlq.lock().await;
443 dlq.iter().take(limit).cloned().collect()
444 }
445
446 pub async fn clear_dlq(&self) -> Result<()> {
448 self.dlq.lock().await.clear();
449 info!("Dead letter queue cleared");
450 Ok(())
451 }
452
453 pub async fn get_stats(&self) -> ReliabilityStats {
455 ReliabilityStats {
456 in_flight_count: self.in_flight.read().await.len(),
457 retry_queue_size: self.retry_queue.lock().await.len(),
458 dlq_size: self.dlq.lock().await.len(),
459 dedup_cache_size: self.dedup_cache.read().await.len(),
460 total_sequences: *self.sequence_counter.read().await,
461 }
462 }
463
464 async fn start_dedup_cleanup_task(&self) {
466 let cache = Arc::clone(&self.dedup_cache);
467 let interval = Duration::from_secs(60); let shutdown_rx = Arc::clone(&self.shutdown_rx);
469
470 tokio::spawn(async move {
471 let mut interval_timer = tokio::time::interval(interval);
472
473 loop {
474 if let Ok(mut rx) = shutdown_rx.try_lock() {
476 if let Some(rx) = rx.as_mut() {
477 if rx.try_recv().is_ok() {
478 break;
479 }
480 }
481 }
482
483 interval_timer.tick().await;
484
485 let now = Utc::now();
487 let mut cache_write = cache.write().await;
488 cache_write.retain(|_, expiry| *expiry > now);
489
490 debug!(
491 "Dedup cache cleanup: {} entries remaining",
492 cache_write.len()
493 );
494 }
495 });
496 }
497
498 async fn start_ack_timeout_checker(&self) {
500 let ack_tracker = Arc::clone(&self.ack_tracker);
501 let in_flight = Arc::clone(&self.in_flight);
502 let retry_queue = Arc::clone(&self.retry_queue);
503 let timeout = self.config.ack_timeout;
504 let shutdown_rx = Arc::clone(&self.shutdown_rx);
505
506 tokio::spawn(async move {
507 let mut interval_timer = tokio::time::interval(Duration::from_secs(5));
508
509 loop {
510 if let Ok(mut rx) = shutdown_rx.try_lock() {
512 if let Some(rx) = rx.as_mut() {
513 if rx.try_recv().is_ok() {
514 break;
515 }
516 }
517 }
518
519 interval_timer.tick().await;
520
521 let now = Instant::now();
522 let mut expired_messages = Vec::new();
523
524 {
526 let tracker = ack_tracker.read().await;
527 for (message_id, start_time) in tracker.iter() {
528 if now.duration_since(*start_time) > timeout {
529 expired_messages.push(message_id.clone());
530 }
531 }
532 }
533
534 for message_id in expired_messages {
536 warn!("Message {} timed out, adding to retry queue", message_id);
537
538 ack_tracker.write().await.remove(&message_id);
540
541 if let Some(message) = in_flight.write().await.remove(&message_id) {
543 retry_queue.lock().await.push_back(message);
544 }
545 }
546 }
547 });
548 }
549
550 async fn start_retry_processor(&self) {
552 let retry_queue = Arc::clone(&self.retry_queue);
553 let in_flight = Arc::clone(&self.in_flight);
554 let ack_tracker = Arc::clone(&self.ack_tracker);
555 let config = self.config.clone();
556 let shutdown_rx = Arc::clone(&self.shutdown_rx);
557
558 tokio::spawn(async move {
559 loop {
560 if let Ok(mut rx) = shutdown_rx.try_lock() {
562 if let Some(rx) = rx.as_mut() {
563 if rx.try_recv().is_ok() {
564 break;
565 }
566 }
567 }
568
569 let message = retry_queue.lock().await.pop_front();
571
572 if let Some(msg) = message {
573 let delay = msg.next_retry_delay(&config);
575
576 info!(
577 "Retrying message {} after {:?} (attempt {})",
578 msg.message_id,
579 delay,
580 msg.retry_count + 1
581 );
582
583 tokio::time::sleep(delay).await;
585
586 in_flight
588 .write()
589 .await
590 .insert(msg.message_id.clone(), msg.clone());
591
592 ack_tracker
594 .write()
595 .await
596 .insert(msg.message_id.clone(), Instant::now());
597 } else {
598 tokio::time::sleep(Duration::from_millis(100)).await;
600 }
601 }
602 });
603 }
604
605 pub async fn shutdown(&mut self) -> Result<()> {
607 if let Some(tx) = self.shutdown_tx.take() {
608 let _ = tx.send(()).await;
609 }
610
611 info!("Reliability manager shutdown");
612 Ok(())
613 }
614
615 pub async fn replay_message(&self, message_id: &str) -> Result<ReliableMessage> {
617 if !self
618 .config
619 .dlq_config
620 .as_ref()
621 .map(|dlq| dlq.enable_replay)
622 .unwrap_or(false)
623 {
624 return Err(anyhow!("Message replay is disabled"));
625 }
626
627 let mut dlq = self.dlq.lock().await;
628 let position = dlq.iter().position(|msg| msg.message_id == message_id);
629
630 if let Some(pos) = position {
631 let mut message = dlq[pos].clone();
632
633 if message.replay_status == ReplayStatus::Failed
635 || message.replay_count
636 >= self
637 .config
638 .dlq_config
639 .as_ref()
640 .map(|dlq| dlq.max_replay_attempts)
641 .unwrap_or(0)
642 {
643 return Err(anyhow!("Message has exceeded maximum replay attempts"));
644 }
645
646 message.replay_status = ReplayStatus::InProgress;
648 message.replay_count += 1;
649 message.last_replay_attempt = Some(Utc::now());
650 dlq[pos] = message.clone();
651
652 info!(
653 "Replaying message {} (attempt {})",
654 message_id, message.replay_count
655 );
656 Ok(message)
657 } else {
658 Err(anyhow!("Message not found in DLQ: {}", message_id))
659 }
660 }
661
662 pub async fn replay_messages_with_filter<F>(
664 &self,
665 filter: F,
666 limit: usize,
667 ) -> Result<Vec<ReliableMessage>>
668 where
669 F: Fn(&ReliableMessage) -> bool,
670 {
671 if !self
672 .config
673 .dlq_config
674 .as_ref()
675 .map(|dlq| dlq.enable_replay)
676 .unwrap_or(false)
677 {
678 return Err(anyhow!("Message replay is disabled"));
679 }
680
681 let mut dlq = self.dlq.lock().await;
682 let mut replayed_messages = Vec::new();
683 let mut updated_indices = Vec::new();
684
685 for (index, message) in dlq.iter().enumerate() {
686 if replayed_messages.len() >= limit {
687 break;
688 }
689
690 if filter(message)
691 && message.replay_status != ReplayStatus::Failed
692 && message.replay_count
693 < self
694 .config
695 .dlq_config
696 .as_ref()
697 .map(|dlq| dlq.max_replay_attempts)
698 .unwrap_or(0)
699 {
700 let mut updated_message = message.clone();
701 updated_message.replay_status = ReplayStatus::InProgress;
702 updated_message.replay_count += 1;
703 updated_message.last_replay_attempt = Some(Utc::now());
704
705 replayed_messages.push(updated_message.clone());
706 updated_indices.push((index, updated_message));
707 }
708 }
709
710 for (index, updated_message) in updated_indices {
712 dlq[index] = updated_message;
713 }
714
715 info!("Replaying {} messages from DLQ", replayed_messages.len());
716 Ok(replayed_messages)
717 }
718
719 pub async fn remove_from_dlq(&self, message_id: &str) -> Result<()> {
721 let mut dlq = self.dlq.lock().await;
722 let initial_len = dlq.len();
723 dlq.retain(|msg| msg.message_id != message_id);
724
725 if dlq.len() < initial_len {
726 info!(
727 "Removed successfully replayed message {} from DLQ",
728 message_id
729 );
730 Ok(())
731 } else {
732 Err(anyhow!("Message not found in DLQ: {}", message_id))
733 }
734 }
735
736 pub async fn update_replay_status(&self, message_id: &str, status: ReplayStatus) -> Result<()> {
738 let mut dlq = self.dlq.lock().await;
739 if let Some(message) = dlq.iter_mut().find(|msg| msg.message_id == message_id) {
740 message.replay_status = status;
741 debug!(
742 "Updated replay status for message {} to {:?}",
743 message_id, status
744 );
745 Ok(())
746 } else {
747 Err(anyhow!("Message not found in DLQ: {}", message_id))
748 }
749 }
750
751 pub async fn get_dlq_stats(&self) -> DlqStats {
753 let dlq = self.dlq.lock().await;
754
755 let mut error_categories = HashMap::new();
756 let mut status_counts = HashMap::new();
757 let mut oldest_message_age = 0u64;
758 let mut total_replay_attempts = 0u32;
759 let mut size_bytes = 0u64;
760
761 let now = Utc::now();
762
763 for message in dlq.iter() {
764 for error in &message.errors {
766 *error_categories.entry(error.clone()).or_insert(0) += 1;
767 }
768
769 let status_key = format!("{:?}", message.replay_status);
771 *status_counts.entry(status_key).or_insert(0) += 1;
772
773 let age_ms = (now - message.first_attempt).num_milliseconds().max(0) as u64;
775 oldest_message_age = oldest_message_age.max(age_ms);
776
777 total_replay_attempts += message.replay_count;
779
780 size_bytes += 1024; }
783
784 let messages_count = dlq.len() as u64;
785 let replay_success_rate = if total_replay_attempts > 0 {
786 (status_counts.get("Succeeded").unwrap_or(&0) * 100) as f64
787 / total_replay_attempts as f64
788 } else {
789 100.0
790 };
791
792 DlqStats {
793 messages_count,
794 oldest_message_age_ms: oldest_message_age,
795 error_categories,
796 status_counts,
797 total_replay_attempts,
798 replay_success_rate,
799 size_bytes,
800 retention_period_ms: self
801 .config
802 .dlq_config
803 .as_ref()
804 .map(|dlq| dlq.retention.as_millis() as u64)
805 .unwrap_or(0),
806 }
807 }
808
809 pub async fn bulk_replay_messages(&self, message_ids: Vec<String>) -> Result<BulkReplayResult> {
811 if !self
812 .config
813 .dlq_config
814 .as_ref()
815 .map(|dlq| dlq.enable_replay)
816 .unwrap_or(false)
817 {
818 return Err(anyhow!("Message replay is disabled"));
819 }
820
821 let batch_size = self
822 .config
823 .dlq_config
824 .as_ref()
825 .map(|dlq| dlq.replay_batch_size)
826 .unwrap_or(100);
827 let mut successful = Vec::new();
828 let mut failed = Vec::new();
829
830 for chunk in message_ids.chunks(batch_size) {
831 for message_id in chunk {
832 match self.replay_message(message_id).await {
833 Ok(message) => successful.push(message),
834 Err(e) => failed.push((message_id.clone(), e.to_string())),
835 }
836
837 tokio::time::sleep(
839 self.config
840 .dlq_config
841 .as_ref()
842 .map(|dlq| dlq.replay_backoff)
843 .unwrap_or(std::time::Duration::from_secs(1)),
844 )
845 .await;
846 }
847 }
848
849 Ok(BulkReplayResult {
850 successful_count: successful.len(),
851 failed_count: failed.len(),
852 successful_messages: successful,
853 failed_messages: failed,
854 })
855 }
856}
857
858#[derive(Debug, Clone, Serialize, Deserialize)]
860pub struct DlqStats {
861 pub messages_count: u64,
863 pub oldest_message_age_ms: u64,
865 pub error_categories: HashMap<String, u64>,
867 pub status_counts: HashMap<String, u64>,
869 pub total_replay_attempts: u32,
871 pub replay_success_rate: f64,
873 pub size_bytes: u64,
875 pub retention_period_ms: u64,
877}
878
879#[derive(Debug, Clone, Serialize, Deserialize)]
881pub struct BulkReplayResult {
882 pub successful_count: usize,
884 pub failed_count: usize,
886 pub successful_messages: Vec<ReliableMessage>,
888 pub failed_messages: Vec<(String, String)>, }
891
892#[derive(Debug, Clone, Serialize, Deserialize)]
894pub struct ReliabilityStats {
895 pub in_flight_count: usize,
896 pub retry_queue_size: usize,
897 pub dlq_size: usize,
898 pub dedup_cache_size: usize,
899 pub total_sequences: u64,
900}
901
902#[async_trait::async_trait]
904pub trait ReliablePublisher: Send + Sync {
905 async fn publish_reliable(&self, message: ReliableMessage) -> Result<DeliveryConfirmation>;
907
908 fn supports_idempotency(&self) -> bool;
910
911 fn reliability_capabilities(&self) -> PublisherCapabilities;
913}
914
915#[derive(Debug, Clone)]
917pub struct PublisherCapabilities {
918 pub supports_transactions: bool,
919 pub supports_idempotency: bool,
920 pub supports_ordering: bool,
921 pub supports_partitioning: bool,
922 pub max_message_size: usize,
923 pub max_batch_size: usize,
924}
925
926#[cfg(test)]
927mod tests {
928 use super::*;
929
930 #[tokio::test]
931 async fn test_reliability_manager_deduplication() {
932 let config = ReliabilityConfig {
933 enable_deduplication: true,
934 deduplication_window: Duration::from_secs(60),
935 ..Default::default()
936 };
937
938 let manager = ReliabilityManager::new(config);
939
940 let event = StreamEvent::Heartbeat {
942 timestamp: Utc::now(),
943 source: "test".to_string(),
944 metadata: crate::event::EventMetadata::default(),
945 };
946
947 let msg1 = manager.prepare_message(event.clone()).await.unwrap();
949
950 manager.record_message_id(&msg1.message_id).await.unwrap();
952 assert!(manager.is_duplicate(&msg1.message_id).await.unwrap());
953 }
954
955 #[tokio::test]
956 async fn test_retry_delay_calculation() {
957 let config = ReliabilityConfig {
958 initial_backoff: Duration::from_millis(100),
959 max_backoff: Duration::from_secs(10),
960 backoff_multiplier: 2.0,
961 backoff_jitter: false,
962 ..Default::default()
963 };
964
965 let event = StreamEvent::Heartbeat {
966 timestamp: Utc::now(),
967 source: "test".to_string(),
968 metadata: crate::event::EventMetadata::default(),
969 };
970
971 let mut message = ReliableMessage::new(event);
972
973 assert_eq!(
975 message.next_retry_delay(&config),
976 Duration::from_millis(100)
977 );
978
979 message.retry_count = 1;
980 assert_eq!(
981 message.next_retry_delay(&config),
982 Duration::from_millis(200)
983 );
984
985 message.retry_count = 2;
986 assert_eq!(
987 message.next_retry_delay(&config),
988 Duration::from_millis(400)
989 );
990
991 message.retry_count = 10;
993 assert_eq!(message.next_retry_delay(&config), Duration::from_secs(10));
994 }
995
996 #[tokio::test]
997 async fn test_dlq_management() {
998 let config = ReliabilityConfig {
999 max_retries: 1,
1000 dlq_config: Some(DlqConfig {
1001 enabled: true,
1002 max_size: 2,
1003 ..Default::default()
1004 }),
1005 ..Default::default()
1006 };
1007
1008 let manager = ReliabilityManager::new(config);
1009
1010 for i in 0..3 {
1012 let event = StreamEvent::Heartbeat {
1013 timestamp: Utc::now(),
1014 source: format!("test-{i}"),
1015 metadata: crate::event::EventMetadata::default(),
1016 };
1017
1018 let message = ReliableMessage::new(event);
1019 manager.send_to_dlq(message).await.unwrap();
1020 }
1021
1022 let dlq_messages = manager.get_dlq_messages(10).await;
1024 assert_eq!(dlq_messages.len(), 2);
1025 }
1026}