Skip to main content

actr_runtime_mailbox/
dlq.rs

1//! Dead Letter Queue (DLQ)
2//!
3//! DLQ stores poison messages that cannot be processed due to corruption or decoding failures.
4//! Unlike the regular mailbox, DLQ messages are preserved indefinitely for manual intervention
5//! and debugging.
6//!
7//! ## Design Principles
8//!
9//! - **Poison message classification**: Only DecodeFailure and severe corruption
10//! - **Forensic data preservation**: Store raw bytes, error context, trace_id
11//! - **Manual intervention workflow**: Redrive API after fixes
12//! - **No auto-retry**: Poison messages don't retry automatically
13//!
14//! ## Workflow
15//!
16//! 1. **Framework detects poison message** (Protobuf decode failure)
17//! 2. **Write to DLQ** with full context (raw_bytes, error_message, trace_id, etc.)
18//! 3. **Alert operators** via metrics/logging (severity = 9)
19//! 4. **Manual investigation** using DLQ query APIs
20//! 5. **Fix and redrive** after resolving root cause (schema update, etc.)
21
22use crate::error::StorageResult;
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use uuid::Uuid;
27
28/// Dead Letter Queue record
29///
30/// Stores complete forensic information for poison messages.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DlqRecord {
33    /// Unique DLQ entry ID
34    pub id: Uuid,
35
36    /// Original message ID (if available from envelope)
37    pub original_message_id: Option<String>,
38
39    /// Sender ActrId (Protobuf bytes, if decodable from envelope)
40    ///
41    /// May be None if even the envelope is corrupted.
42    pub from: Option<Vec<u8>>,
43
44    /// Target ActrId (Protobuf bytes, if this was intended for specific Actor)
45    pub to: Option<Vec<u8>>,
46
47    /// Raw message bytes (complete original data for forensic analysis)
48    pub raw_bytes: Vec<u8>,
49
50    /// Error message describing why this is poison
51    pub error_message: String,
52
53    /// Error category (e.g., "protobuf_decode", "invalid_envelope", "corrupted_data")
54    pub error_category: String,
55
56    /// Distributed trace ID (for correlating with logs)
57    pub trace_id: String,
58
59    /// Request ID (if available)
60    pub request_id: Option<String>,
61
62    /// Timestamp when message was added to DLQ
63    pub created_at: DateTime<Utc>,
64
65    /// Number of redrive attempts (incremented on each redrive failure)
66    pub redrive_attempts: u32,
67
68    /// Last redrive attempt timestamp
69    pub last_redrive_at: Option<DateTime<Utc>>,
70
71    /// Additional context (JSON-encoded metadata like transport type, connection ID, etc.)
72    pub context: Option<String>,
73}
74
75/// DLQ statistics
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77pub struct DlqStats {
78    /// Total messages in DLQ
79    pub total_messages: u64,
80
81    /// Messages by error category
82    pub messages_by_category: std::collections::HashMap<String, u64>,
83
84    /// Messages with redrive attempts > 0
85    pub messages_with_redrive_attempts: u64,
86
87    /// Oldest message timestamp
88    pub oldest_message_at: Option<DateTime<Utc>>,
89}
90
91/// Query filter for DLQ records
92#[derive(Debug, Clone, Default)]
93pub struct DlqQuery {
94    /// Filter by error category
95    pub error_category: Option<String>,
96
97    /// Filter by trace_id
98    pub trace_id: Option<String>,
99
100    /// Filter by sender ActrId (exact match on bytes)
101    pub from: Option<Vec<u8>>,
102
103    /// Maximum number of records to return
104    pub limit: Option<u32>,
105
106    /// Return only messages created after this timestamp
107    pub created_after: Option<DateTime<Utc>>,
108}
109
110/// Dead Letter Queue interface
111///
112/// Provides persistence and query capabilities for poison messages.
113#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
114#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
115pub trait DeadLetterQueue: Send + Sync {
116    /// Add a poison message to DLQ
117    ///
118    /// # Parameters
119    ///
120    /// - `record`: Complete DLQ record with forensic data
121    async fn enqueue(&self, record: DlqRecord) -> StorageResult<Uuid>;
122
123    /// Query DLQ records with filtering
124    ///
125    /// # Parameters
126    ///
127    /// - `query`: Filter criteria for records
128    async fn query(&self, query: DlqQuery) -> StorageResult<Vec<DlqRecord>>;
129
130    /// Get a single DLQ record by ID
131    async fn get(&self, id: Uuid) -> StorageResult<Option<DlqRecord>>;
132
133    /// Delete a DLQ record (after manual resolution)
134    ///
135    /// # Parameters
136    ///
137    /// - `id`: DLQ record ID to delete
138    async fn delete(&self, id: Uuid) -> StorageResult<()>;
139
140    /// Increment redrive attempt counter
141    ///
142    /// Called when attempting to reprocess a DLQ message.
143    async fn record_redrive_attempt(&self, id: Uuid) -> StorageResult<()>;
144
145    /// Get DLQ statistics
146    async fn stats(&self) -> StorageResult<DlqStats>;
147}