Skip to main content

oxirs_stream/
reliability.rs

1//! # Reliability Module
2//!
3//! Provides reliability guarantees for message producers including:
4//! - At-least-once delivery guarantee
5//! - Exactly-once semantics via idempotent publishing
6//! - Message deduplication
7//! - Retry mechanisms with exponential backoff
8//! - Dead letter queue (DLQ) support
9//! - Delivery confirmations and acknowledgments
10
11use anyhow::{anyhow, Result};
12use chrono::{DateTime, Duration as ChronoDuration, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
18use tracing::{debug, info, warn};
19use uuid::Uuid;
20
21use crate::StreamEvent;
22
23/// Delivery guarantee levels
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum DeliveryGuarantee {
26    /// Messages may be lost but will not be duplicated
27    AtMostOnce,
28    /// Messages will not be lost but may be duplicated
29    AtLeastOnce,
30    /// Messages will be delivered exactly once
31    ExactlyOnce,
32}
33
34/// Reliability configuration
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct ReliabilityConfig {
37    /// Delivery guarantee level
38    pub delivery_guarantee: DeliveryGuarantee,
39    /// Enable message deduplication
40    pub enable_deduplication: bool,
41    /// Deduplication window duration
42    pub deduplication_window: Duration,
43    /// Maximum message retries
44    pub max_retries: u32,
45    /// Initial retry backoff
46    pub initial_backoff: Duration,
47    /// Maximum retry backoff
48    pub max_backoff: Duration,
49    /// Backoff multiplier
50    pub backoff_multiplier: f64,
51    /// Enable exponential backoff jitter
52    pub backoff_jitter: bool,
53    /// Dead letter queue configuration
54    pub dlq_config: Option<DlqConfig>,
55    /// Message timeout for acknowledgment
56    pub ack_timeout: Duration,
57    /// Enable persistence for reliability state
58    pub enable_persistence: bool,
59    /// Maximum in-flight messages
60    pub max_in_flight: usize,
61    /// Enable message ordering guarantees
62    pub preserve_ordering: bool,
63}
64
65impl Default for ReliabilityConfig {
66    fn default() -> Self {
67        Self {
68            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
69            enable_deduplication: true,
70            deduplication_window: Duration::from_secs(300), // 5 minutes
71            max_retries: 3,
72            initial_backoff: Duration::from_millis(100),
73            max_backoff: Duration::from_secs(30),
74            backoff_multiplier: 2.0,
75            backoff_jitter: true,
76            dlq_config: Some(DlqConfig::default()),
77            ack_timeout: Duration::from_secs(30),
78            enable_persistence: false,
79            max_in_flight: 1000,
80            preserve_ordering: false,
81        }
82    }
83}
84
85/// Dead Letter Queue configuration
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct DlqConfig {
88    /// Enable DLQ
89    pub enabled: bool,
90    /// DLQ topic/queue name
91    pub topic: String,
92    /// Maximum DLQ size
93    pub max_size: usize,
94    /// DLQ retention duration
95    pub retention: Duration,
96    /// Include error details in DLQ messages
97    pub include_error_details: bool,
98    /// Enable message replay from DLQ
99    pub enable_replay: bool,
100    /// Maximum replay attempts per message
101    pub max_replay_attempts: u32,
102    /// Replay backoff duration
103    pub replay_backoff: Duration,
104    /// Replay batch size for bulk operations
105    pub replay_batch_size: usize,
106}
107
108impl Default for DlqConfig {
109    fn default() -> Self {
110        Self {
111            enabled: true,
112            topic: "oxirs-dlq".to_string(),
113            max_size: 10000,
114            retention: Duration::from_secs(86400 * 7), // 7 days
115            include_error_details: true,
116            enable_replay: true,
117            max_replay_attempts: 3,
118            replay_backoff: Duration::from_secs(60), // 1 minute
119            replay_batch_size: 100,
120        }
121    }
122}
123
124/// Replay status for DLQ messages
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
126pub enum ReplayStatus {
127    /// Message is available for replay
128    #[default]
129    Available,
130    /// Message is currently being replayed
131    InProgress,
132    /// Message replay completed successfully
133    Succeeded,
134    /// Message replay failed and cannot be retried
135    Failed,
136    /// Message replay is temporarily paused
137    Paused,
138}
139
140/// Message wrapper with reliability metadata
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ReliableMessage {
143    /// Unique message ID for deduplication
144    pub message_id: String,
145    /// Original event
146    pub event: StreamEvent,
147    /// Number of retry attempts
148    pub retry_count: u32,
149    /// First attempt timestamp
150    pub first_attempt: DateTime<Utc>,
151    /// Last attempt timestamp
152    pub last_attempt: DateTime<Utc>,
153    /// Error history
154    pub errors: Vec<String>,
155    /// Message checksum for integrity
156    pub checksum: Option<String>,
157    /// Sequence number for ordering
158    pub sequence_number: Option<u64>,
159    /// Partition key for ordering within partition
160    pub partition_key: Option<String>,
161    /// Number of replay attempts from DLQ
162    pub replay_count: u32,
163    /// Last replay attempt timestamp
164    pub last_replay_attempt: Option<DateTime<Utc>>,
165    /// Replay status
166    pub replay_status: ReplayStatus,
167}
168
169impl ReliableMessage {
170    /// Create a new reliable message
171    pub fn new(event: StreamEvent) -> Self {
172        let now = Utc::now();
173        Self {
174            message_id: Uuid::new_v4().to_string(),
175            event,
176            retry_count: 0,
177            first_attempt: now,
178            last_attempt: now,
179            errors: Vec::new(),
180            checksum: None,
181            sequence_number: None,
182            partition_key: None,
183            replay_count: 0,
184            last_replay_attempt: None,
185            replay_status: ReplayStatus::default(),
186        }
187    }
188
189    /// Add error to message history
190    pub fn add_error(&mut self, error: String) {
191        self.errors.push(error);
192        self.retry_count += 1;
193        self.last_attempt = Utc::now();
194    }
195
196    /// Check if message should be retried
197    pub fn should_retry(&self, max_retries: u32) -> bool {
198        self.retry_count < max_retries
199    }
200
201    /// Calculate next retry delay
202    pub fn next_retry_delay(&self, config: &ReliabilityConfig) -> Duration {
203        let base_delay = config.initial_backoff.as_millis() as f64
204            * config.backoff_multiplier.powi(self.retry_count as i32);
205
206        let mut delay = Duration::from_millis(base_delay as u64).min(config.max_backoff);
207
208        // Add jitter if enabled
209        if config.backoff_jitter {
210            #[allow(unused_imports)]
211            use scirs2_core::random::{Random, Rng};
212            let mut random = Random::default();
213            let jitter = random.gen_range(0.8..1.2);
214            delay = Duration::from_millis((delay.as_millis() as f64 * jitter) as u64);
215        }
216
217        delay
218    }
219}
220
221/// Delivery confirmation
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct DeliveryConfirmation {
224    /// Message ID
225    pub message_id: String,
226    /// Delivery status
227    pub status: DeliveryStatus,
228    /// Delivery timestamp
229    pub timestamp: DateTime<Utc>,
230    /// Backend-specific metadata
231    pub metadata: HashMap<String, String>,
232}
233
234/// Delivery status
235#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
236pub enum DeliveryStatus {
237    /// Successfully delivered
238    Delivered,
239    /// Failed to deliver
240    Failed(String),
241    /// Sent to DLQ
242    DeadLettered(String),
243    /// Delivery pending
244    Pending,
245}
246
247/// Reliability manager for message producers
248pub struct ReliabilityManager {
249    config: ReliabilityConfig,
250    /// Deduplication cache: message_id -> expiry time
251    dedup_cache: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
252    /// In-flight messages: message_id -> ReliableMessage
253    in_flight: Arc<RwLock<HashMap<String, ReliableMessage>>>,
254    /// Retry queue
255    retry_queue: Arc<Mutex<VecDeque<ReliableMessage>>>,
256    /// Dead letter queue
257    dlq: Arc<Mutex<VecDeque<ReliableMessage>>>,
258    /// Sequence counter for ordering
259    sequence_counter: Arc<RwLock<u64>>,
260    /// Acknowledgment tracking
261    ack_tracker: Arc<RwLock<HashMap<String, Instant>>>,
262    /// Semaphore for in-flight message limiting
263    in_flight_semaphore: Arc<Semaphore>,
264    /// Shutdown signal
265    shutdown_tx: Option<mpsc::Sender<()>>,
266    shutdown_rx: Arc<Mutex<Option<mpsc::Receiver<()>>>>,
267}
268
269impl ReliabilityManager {
270    /// Create a new reliability manager
271    pub fn new(config: ReliabilityConfig) -> Self {
272        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
273        let in_flight_semaphore = Arc::new(Semaphore::new(config.max_in_flight));
274
275        Self {
276            config,
277            dedup_cache: Arc::new(RwLock::new(HashMap::new())),
278            in_flight: Arc::new(RwLock::new(HashMap::new())),
279            retry_queue: Arc::new(Mutex::new(VecDeque::new())),
280            dlq: Arc::new(Mutex::new(VecDeque::new())),
281            sequence_counter: Arc::new(RwLock::new(0)),
282            ack_tracker: Arc::new(RwLock::new(HashMap::new())),
283            in_flight_semaphore,
284            shutdown_tx: Some(shutdown_tx),
285            shutdown_rx: Arc::new(Mutex::new(Some(shutdown_rx))),
286        }
287    }
288
289    /// Start background tasks for reliability management
290    pub async fn start(&self) -> Result<()> {
291        // Start deduplication cache cleanup task
292        self.start_dedup_cleanup_task().await;
293
294        // Start acknowledgment timeout checker
295        self.start_ack_timeout_checker().await;
296
297        // Start retry processor
298        self.start_retry_processor().await;
299
300        info!("Reliability manager started");
301        Ok(())
302    }
303
304    /// Prepare message for reliable delivery
305    pub async fn prepare_message(&self, event: StreamEvent) -> Result<ReliableMessage> {
306        let mut message = ReliableMessage::new(event);
307
308        // Add sequence number if ordering is enabled
309        if self.config.preserve_ordering {
310            let mut counter = self.sequence_counter.write().await;
311            *counter += 1;
312            message.sequence_number = Some(*counter);
313        }
314
315        // Check deduplication if enabled
316        if self.config.enable_deduplication {
317            if self.is_duplicate(&message.message_id).await? {
318                return Err(anyhow!(
319                    "Duplicate message detected: {}",
320                    message.message_id
321                ));
322            }
323            self.record_message_id(&message.message_id).await?;
324        }
325
326        // Acquire in-flight permit
327        let _permit = self
328            .in_flight_semaphore
329            .acquire()
330            .await
331            .map_err(|_| anyhow!("Failed to acquire in-flight permit"))?;
332
333        // Track in-flight message
334        self.in_flight
335            .write()
336            .await
337            .insert(message.message_id.clone(), message.clone());
338
339        // Track for acknowledgment timeout
340        self.ack_tracker
341            .write()
342            .await
343            .insert(message.message_id.clone(), Instant::now());
344
345        Ok(message)
346    }
347
348    /// Record successful delivery
349    pub async fn record_delivery(&self, message_id: &str) -> Result<()> {
350        // Remove from in-flight tracking
351        self.in_flight.write().await.remove(message_id);
352
353        // Remove from ack tracker
354        self.ack_tracker.write().await.remove(message_id);
355
356        // Release in-flight permit (implicitly done when permit is dropped)
357
358        debug!("Recorded successful delivery for message: {}", message_id);
359        Ok(())
360    }
361
362    /// Record delivery failure
363    pub async fn record_failure(&self, message_id: &str, error: String) -> Result<DeliveryStatus> {
364        let mut in_flight = self.in_flight.write().await;
365
366        if let Some(mut message) = in_flight.remove(message_id) {
367            message.add_error(error.clone());
368
369            if message.should_retry(self.config.max_retries) {
370                // Add to retry queue
371                self.retry_queue.lock().await.push_back(message);
372                Ok(DeliveryStatus::Pending)
373            } else {
374                // Max retries exceeded, send to DLQ
375                if let Some(dlq_config) = &self.config.dlq_config {
376                    if dlq_config.enabled {
377                        self.send_to_dlq(message).await?;
378                        Ok(DeliveryStatus::DeadLettered(error))
379                    } else {
380                        Ok(DeliveryStatus::Failed(error))
381                    }
382                } else {
383                    Ok(DeliveryStatus::Failed(error))
384                }
385            }
386        } else {
387            Err(anyhow!(
388                "Message not found in in-flight tracking: {}",
389                message_id
390            ))
391        }
392    }
393
394    /// Check if message is duplicate
395    async fn is_duplicate(&self, message_id: &str) -> Result<bool> {
396        let cache = self.dedup_cache.read().await;
397        Ok(cache.contains_key(message_id))
398    }
399
400    /// Record message ID for deduplication
401    async fn record_message_id(&self, message_id: &str) -> Result<()> {
402        let expiry = Utc::now()
403            + ChronoDuration::from_std(self.config.deduplication_window)
404                .map_err(|e| anyhow!("Invalid deduplication window: {}", e))?;
405
406        self.dedup_cache
407            .write()
408            .await
409            .insert(message_id.to_string(), expiry);
410
411        Ok(())
412    }
413
414    /// Send message to DLQ
415    async fn send_to_dlq(&self, message: ReliableMessage) -> Result<()> {
416        let mut dlq = self.dlq.lock().await;
417
418        // Check DLQ size limit
419        if let Some(dlq_config) = &self.config.dlq_config {
420            if dlq.len() >= dlq_config.max_size {
421                warn!("DLQ is full, dropping oldest message");
422                dlq.pop_front();
423            }
424        }
425
426        dlq.push_back(message.clone());
427        info!(
428            "Message {} sent to DLQ after {} retries",
429            message.message_id, message.retry_count
430        );
431
432        Ok(())
433    }
434
435    /// Get next message from retry queue
436    pub async fn get_retry_message(&self) -> Option<ReliableMessage> {
437        self.retry_queue.lock().await.pop_front()
438    }
439
440    /// Get DLQ messages
441    pub async fn get_dlq_messages(&self, limit: usize) -> Vec<ReliableMessage> {
442        let dlq = self.dlq.lock().await;
443        dlq.iter().take(limit).cloned().collect()
444    }
445
446    /// Clear DLQ
447    pub async fn clear_dlq(&self) -> Result<()> {
448        self.dlq.lock().await.clear();
449        info!("Dead letter queue cleared");
450        Ok(())
451    }
452
453    /// Get reliability statistics
454    pub async fn get_stats(&self) -> ReliabilityStats {
455        ReliabilityStats {
456            in_flight_count: self.in_flight.read().await.len(),
457            retry_queue_size: self.retry_queue.lock().await.len(),
458            dlq_size: self.dlq.lock().await.len(),
459            dedup_cache_size: self.dedup_cache.read().await.len(),
460            total_sequences: *self.sequence_counter.read().await,
461        }
462    }
463
464    /// Start deduplication cache cleanup task
465    async fn start_dedup_cleanup_task(&self) {
466        let cache = Arc::clone(&self.dedup_cache);
467        let interval = Duration::from_secs(60); // Cleanup every minute
468        let shutdown_rx = Arc::clone(&self.shutdown_rx);
469
470        tokio::spawn(async move {
471            let mut interval_timer = tokio::time::interval(interval);
472
473            loop {
474                // Check for shutdown
475                if let Ok(mut rx) = shutdown_rx.try_lock() {
476                    if let Some(rx) = rx.as_mut() {
477                        if rx.try_recv().is_ok() {
478                            break;
479                        }
480                    }
481                }
482
483                interval_timer.tick().await;
484
485                // Clean expired entries
486                let now = Utc::now();
487                let mut cache_write = cache.write().await;
488                cache_write.retain(|_, expiry| *expiry > now);
489
490                debug!(
491                    "Dedup cache cleanup: {} entries remaining",
492                    cache_write.len()
493                );
494            }
495        });
496    }
497
498    /// Start acknowledgment timeout checker
499    async fn start_ack_timeout_checker(&self) {
500        let ack_tracker = Arc::clone(&self.ack_tracker);
501        let in_flight = Arc::clone(&self.in_flight);
502        let retry_queue = Arc::clone(&self.retry_queue);
503        let timeout = self.config.ack_timeout;
504        let shutdown_rx = Arc::clone(&self.shutdown_rx);
505
506        tokio::spawn(async move {
507            let mut interval_timer = tokio::time::interval(Duration::from_secs(5));
508
509            loop {
510                // Check for shutdown
511                if let Ok(mut rx) = shutdown_rx.try_lock() {
512                    if let Some(rx) = rx.as_mut() {
513                        if rx.try_recv().is_ok() {
514                            break;
515                        }
516                    }
517                }
518
519                interval_timer.tick().await;
520
521                let now = Instant::now();
522                let mut expired_messages = Vec::new();
523
524                // Find expired messages
525                {
526                    let tracker = ack_tracker.read().await;
527                    for (message_id, start_time) in tracker.iter() {
528                        if now.duration_since(*start_time) > timeout {
529                            expired_messages.push(message_id.clone());
530                        }
531                    }
532                }
533
534                // Handle expired messages
535                for message_id in expired_messages {
536                    warn!("Message {} timed out, adding to retry queue", message_id);
537
538                    // Remove from trackers
539                    ack_tracker.write().await.remove(&message_id);
540
541                    // Move to retry queue
542                    if let Some(message) = in_flight.write().await.remove(&message_id) {
543                        retry_queue.lock().await.push_back(message);
544                    }
545                }
546            }
547        });
548    }
549
550    /// Start retry processor
551    async fn start_retry_processor(&self) {
552        let retry_queue = Arc::clone(&self.retry_queue);
553        let in_flight = Arc::clone(&self.in_flight);
554        let ack_tracker = Arc::clone(&self.ack_tracker);
555        let config = self.config.clone();
556        let shutdown_rx = Arc::clone(&self.shutdown_rx);
557
558        tokio::spawn(async move {
559            loop {
560                // Check for shutdown
561                if let Ok(mut rx) = shutdown_rx.try_lock() {
562                    if let Some(rx) = rx.as_mut() {
563                        if rx.try_recv().is_ok() {
564                            break;
565                        }
566                    }
567                }
568
569                // Process retry queue
570                let message = retry_queue.lock().await.pop_front();
571
572                if let Some(msg) = message {
573                    // Calculate retry delay
574                    let delay = msg.next_retry_delay(&config);
575
576                    info!(
577                        "Retrying message {} after {:?} (attempt {})",
578                        msg.message_id,
579                        delay,
580                        msg.retry_count + 1
581                    );
582
583                    // Wait for retry delay
584                    tokio::time::sleep(delay).await;
585
586                    // Re-add to in-flight tracking
587                    in_flight
588                        .write()
589                        .await
590                        .insert(msg.message_id.clone(), msg.clone());
591
592                    // Update ack tracker
593                    ack_tracker
594                        .write()
595                        .await
596                        .insert(msg.message_id.clone(), Instant::now());
597                } else {
598                    // No messages to retry, wait a bit
599                    tokio::time::sleep(Duration::from_millis(100)).await;
600                }
601            }
602        });
603    }
604
605    /// Shutdown reliability manager
606    pub async fn shutdown(&mut self) -> Result<()> {
607        if let Some(tx) = self.shutdown_tx.take() {
608            let _ = tx.send(()).await;
609        }
610
611        info!("Reliability manager shutdown");
612        Ok(())
613    }
614
615    /// Replay a single message from DLQ by message ID
616    pub async fn replay_message(&self, message_id: &str) -> Result<ReliableMessage> {
617        if !self
618            .config
619            .dlq_config
620            .as_ref()
621            .map(|dlq| dlq.enable_replay)
622            .unwrap_or(false)
623        {
624            return Err(anyhow!("Message replay is disabled"));
625        }
626
627        let mut dlq = self.dlq.lock().await;
628        let position = dlq.iter().position(|msg| msg.message_id == message_id);
629
630        if let Some(pos) = position {
631            let mut message = dlq[pos].clone();
632
633            // Check if message can be replayed
634            if message.replay_status == ReplayStatus::Failed
635                || message.replay_count
636                    >= self
637                        .config
638                        .dlq_config
639                        .as_ref()
640                        .map(|dlq| dlq.max_replay_attempts)
641                        .unwrap_or(0)
642            {
643                return Err(anyhow!("Message has exceeded maximum replay attempts"));
644            }
645
646            // Update replay status and metadata
647            message.replay_status = ReplayStatus::InProgress;
648            message.replay_count += 1;
649            message.last_replay_attempt = Some(Utc::now());
650            dlq[pos] = message.clone();
651
652            info!(
653                "Replaying message {} (attempt {})",
654                message_id, message.replay_count
655            );
656            Ok(message)
657        } else {
658            Err(anyhow!("Message not found in DLQ: {}", message_id))
659        }
660    }
661
662    /// Replay multiple messages from DLQ with optional filter
663    pub async fn replay_messages_with_filter<F>(
664        &self,
665        filter: F,
666        limit: usize,
667    ) -> Result<Vec<ReliableMessage>>
668    where
669        F: Fn(&ReliableMessage) -> bool,
670    {
671        if !self
672            .config
673            .dlq_config
674            .as_ref()
675            .map(|dlq| dlq.enable_replay)
676            .unwrap_or(false)
677        {
678            return Err(anyhow!("Message replay is disabled"));
679        }
680
681        let mut dlq = self.dlq.lock().await;
682        let mut replayed_messages = Vec::new();
683        let mut updated_indices = Vec::new();
684
685        for (index, message) in dlq.iter().enumerate() {
686            if replayed_messages.len() >= limit {
687                break;
688            }
689
690            if filter(message)
691                && message.replay_status != ReplayStatus::Failed
692                && message.replay_count
693                    < self
694                        .config
695                        .dlq_config
696                        .as_ref()
697                        .map(|dlq| dlq.max_replay_attempts)
698                        .unwrap_or(0)
699            {
700                let mut updated_message = message.clone();
701                updated_message.replay_status = ReplayStatus::InProgress;
702                updated_message.replay_count += 1;
703                updated_message.last_replay_attempt = Some(Utc::now());
704
705                replayed_messages.push(updated_message.clone());
706                updated_indices.push((index, updated_message));
707            }
708        }
709
710        // Update the messages in the DLQ
711        for (index, updated_message) in updated_indices {
712            dlq[index] = updated_message;
713        }
714
715        info!("Replaying {} messages from DLQ", replayed_messages.len());
716        Ok(replayed_messages)
717    }
718
719    /// Remove successfully replayed message from DLQ
720    pub async fn remove_from_dlq(&self, message_id: &str) -> Result<()> {
721        let mut dlq = self.dlq.lock().await;
722        let initial_len = dlq.len();
723        dlq.retain(|msg| msg.message_id != message_id);
724
725        if dlq.len() < initial_len {
726            info!(
727                "Removed successfully replayed message {} from DLQ",
728                message_id
729            );
730            Ok(())
731        } else {
732            Err(anyhow!("Message not found in DLQ: {}", message_id))
733        }
734    }
735
736    /// Update replay status for a message in DLQ
737    pub async fn update_replay_status(&self, message_id: &str, status: ReplayStatus) -> Result<()> {
738        let mut dlq = self.dlq.lock().await;
739        if let Some(message) = dlq.iter_mut().find(|msg| msg.message_id == message_id) {
740            message.replay_status = status;
741            debug!(
742                "Updated replay status for message {} to {:?}",
743                message_id, status
744            );
745            Ok(())
746        } else {
747            Err(anyhow!("Message not found in DLQ: {}", message_id))
748        }
749    }
750
751    /// Get DLQ statistics for monitoring
752    pub async fn get_dlq_stats(&self) -> DlqStats {
753        let dlq = self.dlq.lock().await;
754
755        let mut error_categories = HashMap::new();
756        let mut status_counts = HashMap::new();
757        let mut oldest_message_age = 0u64;
758        let mut total_replay_attempts = 0u32;
759        let mut size_bytes = 0u64;
760
761        let now = Utc::now();
762
763        for message in dlq.iter() {
764            // Count error categories
765            for error in &message.errors {
766                *error_categories.entry(error.clone()).or_insert(0) += 1;
767            }
768
769            // Count replay statuses
770            let status_key = format!("{:?}", message.replay_status);
771            *status_counts.entry(status_key).or_insert(0) += 1;
772
773            // Calculate oldest message age
774            let age_ms = (now - message.first_attempt).num_milliseconds().max(0) as u64;
775            oldest_message_age = oldest_message_age.max(age_ms);
776
777            // Sum replay attempts
778            total_replay_attempts += message.replay_count;
779
780            // Estimate size (rough calculation)
781            size_bytes += 1024; // Average estimate per message
782        }
783
784        let messages_count = dlq.len() as u64;
785        let replay_success_rate = if total_replay_attempts > 0 {
786            (status_counts.get("Succeeded").unwrap_or(&0) * 100) as f64
787                / total_replay_attempts as f64
788        } else {
789            100.0
790        };
791
792        DlqStats {
793            messages_count,
794            oldest_message_age_ms: oldest_message_age,
795            error_categories,
796            status_counts,
797            total_replay_attempts,
798            replay_success_rate,
799            size_bytes,
800            retention_period_ms: self
801                .config
802                .dlq_config
803                .as_ref()
804                .map(|dlq| dlq.retention.as_millis() as u64)
805                .unwrap_or(0),
806        }
807    }
808
809    /// Bulk replay messages with batching
810    pub async fn bulk_replay_messages(&self, message_ids: Vec<String>) -> Result<BulkReplayResult> {
811        if !self
812            .config
813            .dlq_config
814            .as_ref()
815            .map(|dlq| dlq.enable_replay)
816            .unwrap_or(false)
817        {
818            return Err(anyhow!("Message replay is disabled"));
819        }
820
821        let batch_size = self
822            .config
823            .dlq_config
824            .as_ref()
825            .map(|dlq| dlq.replay_batch_size)
826            .unwrap_or(100);
827        let mut successful = Vec::new();
828        let mut failed = Vec::new();
829
830        for chunk in message_ids.chunks(batch_size) {
831            for message_id in chunk {
832                match self.replay_message(message_id).await {
833                    Ok(message) => successful.push(message),
834                    Err(e) => failed.push((message_id.clone(), e.to_string())),
835                }
836
837                // Add backoff between replays
838                tokio::time::sleep(
839                    self.config
840                        .dlq_config
841                        .as_ref()
842                        .map(|dlq| dlq.replay_backoff)
843                        .unwrap_or(std::time::Duration::from_secs(1)),
844                )
845                .await;
846            }
847        }
848
849        Ok(BulkReplayResult {
850            successful_count: successful.len(),
851            failed_count: failed.len(),
852            successful_messages: successful,
853            failed_messages: failed,
854        })
855    }
856}
857
858/// Dead Letter Queue statistics
859#[derive(Debug, Clone, Serialize, Deserialize)]
860pub struct DlqStats {
861    /// Total number of messages in DLQ
862    pub messages_count: u64,
863    /// Age of oldest message in milliseconds
864    pub oldest_message_age_ms: u64,
865    /// Error categories and their counts
866    pub error_categories: HashMap<String, u64>,
867    /// Replay status counts
868    pub status_counts: HashMap<String, u64>,
869    /// Total replay attempts across all messages
870    pub total_replay_attempts: u32,
871    /// Success rate of replay operations (percentage)
872    pub replay_success_rate: f64,
873    /// Estimated DLQ size in bytes
874    pub size_bytes: u64,
875    /// DLQ retention period in milliseconds
876    pub retention_period_ms: u64,
877}
878
879/// Result of bulk replay operation
880#[derive(Debug, Clone, Serialize, Deserialize)]
881pub struct BulkReplayResult {
882    /// Number of successfully replayed messages
883    pub successful_count: usize,
884    /// Number of failed replay attempts
885    pub failed_count: usize,
886    /// Successfully replayed messages
887    pub successful_messages: Vec<ReliableMessage>,
888    /// Failed messages with error details
889    pub failed_messages: Vec<(String, String)>, // (message_id, error)
890}
891
892/// Reliability statistics
893#[derive(Debug, Clone, Serialize, Deserialize)]
894pub struct ReliabilityStats {
895    pub in_flight_count: usize,
896    pub retry_queue_size: usize,
897    pub dlq_size: usize,
898    pub dedup_cache_size: usize,
899    pub total_sequences: u64,
900}
901
902/// Message publisher interface for reliable delivery
903#[async_trait::async_trait]
904pub trait ReliablePublisher: Send + Sync {
905    /// Publish message with reliability guarantees
906    async fn publish_reliable(&self, message: ReliableMessage) -> Result<DeliveryConfirmation>;
907
908    /// Check if publisher supports idempotency
909    fn supports_idempotency(&self) -> bool;
910
911    /// Get publisher reliability capabilities
912    fn reliability_capabilities(&self) -> PublisherCapabilities;
913}
914
915/// Publisher reliability capabilities
916#[derive(Debug, Clone)]
917pub struct PublisherCapabilities {
918    pub supports_transactions: bool,
919    pub supports_idempotency: bool,
920    pub supports_ordering: bool,
921    pub supports_partitioning: bool,
922    pub max_message_size: usize,
923    pub max_batch_size: usize,
924}
925
926#[cfg(test)]
927mod tests {
928    use super::*;
929
930    #[tokio::test]
931    async fn test_reliability_manager_deduplication() {
932        let config = ReliabilityConfig {
933            enable_deduplication: true,
934            deduplication_window: Duration::from_secs(60),
935            ..Default::default()
936        };
937
938        let manager = ReliabilityManager::new(config);
939
940        // Create test event
941        let event = StreamEvent::Heartbeat {
942            timestamp: Utc::now(),
943            source: "test".to_string(),
944            metadata: crate::event::EventMetadata::default(),
945        };
946
947        // First message should succeed
948        let msg1 = manager.prepare_message(event.clone()).await.unwrap();
949
950        // Duplicate with same ID should fail
951        manager.record_message_id(&msg1.message_id).await.unwrap();
952        assert!(manager.is_duplicate(&msg1.message_id).await.unwrap());
953    }
954
955    #[tokio::test]
956    async fn test_retry_delay_calculation() {
957        let config = ReliabilityConfig {
958            initial_backoff: Duration::from_millis(100),
959            max_backoff: Duration::from_secs(10),
960            backoff_multiplier: 2.0,
961            backoff_jitter: false,
962            ..Default::default()
963        };
964
965        let event = StreamEvent::Heartbeat {
966            timestamp: Utc::now(),
967            source: "test".to_string(),
968            metadata: crate::event::EventMetadata::default(),
969        };
970
971        let mut message = ReliableMessage::new(event);
972
973        // Test exponential backoff
974        assert_eq!(
975            message.next_retry_delay(&config),
976            Duration::from_millis(100)
977        );
978
979        message.retry_count = 1;
980        assert_eq!(
981            message.next_retry_delay(&config),
982            Duration::from_millis(200)
983        );
984
985        message.retry_count = 2;
986        assert_eq!(
987            message.next_retry_delay(&config),
988            Duration::from_millis(400)
989        );
990
991        // Test max backoff cap
992        message.retry_count = 10;
993        assert_eq!(message.next_retry_delay(&config), Duration::from_secs(10));
994    }
995
996    #[tokio::test]
997    async fn test_dlq_management() {
998        let config = ReliabilityConfig {
999            max_retries: 1,
1000            dlq_config: Some(DlqConfig {
1001                enabled: true,
1002                max_size: 2,
1003                ..Default::default()
1004            }),
1005            ..Default::default()
1006        };
1007
1008        let manager = ReliabilityManager::new(config);
1009
1010        // Create test messages
1011        for i in 0..3 {
1012            let event = StreamEvent::Heartbeat {
1013                timestamp: Utc::now(),
1014                source: format!("test-{i}"),
1015                metadata: crate::event::EventMetadata::default(),
1016            };
1017
1018            let message = ReliableMessage::new(event);
1019            manager.send_to_dlq(message).await.unwrap();
1020        }
1021
1022        // Check DLQ size limit
1023        let dlq_messages = manager.get_dlq_messages(10).await;
1024        assert_eq!(dlq_messages.len(), 2);
1025    }
1026}