actr_runtime_mailbox/dlq.rs
1//! Dead Letter Queue (DLQ)
2//!
3//! DLQ stores poison messages that cannot be processed due to corruption or decoding failures.
4//! Unlike the regular mailbox, DLQ messages are preserved indefinitely for manual intervention
5//! and debugging.
6//!
7//! ## Design Principles
8//!
9//! - **Poison message classification**: Only DecodeFailure and severe corruption
10//! - **Forensic data preservation**: Store raw bytes, error context, trace_id
11//! - **Manual intervention workflow**: Redrive API after fixes
12//! - **No auto-retry**: Poison messages don't retry automatically
13//!
14//! ## Workflow
15//!
16//! 1. **Framework detects poison message** (Protobuf decode failure)
17//! 2. **Write to DLQ** with full context (raw_bytes, error_message, trace_id, etc.)
18//! 3. **Alert operators** via metrics/logging (severity = 9)
19//! 4. **Manual investigation** using DLQ query APIs
20//! 5. **Fix and redrive** after resolving root cause (schema update, etc.)
21
22use crate::error::StorageResult;
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use serde::{Deserialize, Serialize};
26use uuid::Uuid;
27
28/// Dead Letter Queue record
29///
30/// Stores complete forensic information for poison messages.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DlqRecord {
33 /// Unique DLQ entry ID
34 pub id: Uuid,
35
36 /// Original message ID (if available from envelope)
37 pub original_message_id: Option<String>,
38
39 /// Sender ActrId (Protobuf bytes, if decodable from envelope)
40 ///
41 /// May be None if even the envelope is corrupted.
42 pub from: Option<Vec<u8>>,
43
44 /// Target ActrId (Protobuf bytes, if this was intended for specific Actor)
45 pub to: Option<Vec<u8>>,
46
47 /// Raw message bytes (complete original data for forensic analysis)
48 pub raw_bytes: Vec<u8>,
49
50 /// Error message describing why this is poison
51 pub error_message: String,
52
53 /// Error category (e.g., "protobuf_decode", "invalid_envelope", "corrupted_data")
54 pub error_category: String,
55
56 /// Distributed trace ID (for correlating with logs)
57 pub trace_id: String,
58
59 /// Request ID (if available)
60 pub request_id: Option<String>,
61
62 /// Timestamp when message was added to DLQ
63 pub created_at: DateTime<Utc>,
64
65 /// Number of redrive attempts (incremented on each redrive failure)
66 pub redrive_attempts: u32,
67
68 /// Last redrive attempt timestamp
69 pub last_redrive_at: Option<DateTime<Utc>>,
70
71 /// Additional context (JSON-encoded metadata like transport type, connection ID, etc.)
72 pub context: Option<String>,
73}
74
75/// DLQ statistics
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77pub struct DlqStats {
78 /// Total messages in DLQ
79 pub total_messages: u64,
80
81 /// Messages by error category
82 pub messages_by_category: std::collections::HashMap<String, u64>,
83
84 /// Messages with redrive attempts > 0
85 pub messages_with_redrive_attempts: u64,
86
87 /// Oldest message timestamp
88 pub oldest_message_at: Option<DateTime<Utc>>,
89}
90
91/// Query filter for DLQ records
92#[derive(Debug, Clone, Default)]
93pub struct DlqQuery {
94 /// Filter by error category
95 pub error_category: Option<String>,
96
97 /// Filter by trace_id
98 pub trace_id: Option<String>,
99
100 /// Filter by sender ActrId (exact match on bytes)
101 pub from: Option<Vec<u8>>,
102
103 /// Maximum number of records to return
104 pub limit: Option<u32>,
105
106 /// Return only messages created after this timestamp
107 pub created_after: Option<DateTime<Utc>>,
108}
109
110/// Dead Letter Queue interface
111///
112/// Provides persistence and query capabilities for poison messages.
113#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
114#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
115pub trait DeadLetterQueue: Send + Sync {
116 /// Add a poison message to DLQ
117 ///
118 /// # Parameters
119 ///
120 /// - `record`: Complete DLQ record with forensic data
121 async fn enqueue(&self, record: DlqRecord) -> StorageResult<Uuid>;
122
123 /// Query DLQ records with filtering
124 ///
125 /// # Parameters
126 ///
127 /// - `query`: Filter criteria for records
128 async fn query(&self, query: DlqQuery) -> StorageResult<Vec<DlqRecord>>;
129
130 /// Get a single DLQ record by ID
131 async fn get(&self, id: Uuid) -> StorageResult<Option<DlqRecord>>;
132
133 /// Delete a DLQ record (after manual resolution)
134 ///
135 /// # Parameters
136 ///
137 /// - `id`: DLQ record ID to delete
138 async fn delete(&self, id: Uuid) -> StorageResult<()>;
139
140 /// Increment redrive attempt counter
141 ///
142 /// Called when attempting to reprocess a DLQ message.
143 async fn record_redrive_attempt(&self, id: Uuid) -> StorageResult<()>;
144
145 /// Get DLQ statistics
146 async fn stats(&self) -> StorageResult<DlqStats>;
147}