1use crate::error::Result;
8#[cfg(feature = "routing")]
9use crate::sink::EventSink;
10use crate::types::ReceivedEvent;
11use crate::types::now_millis;
12use async_trait::async_trait;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16#[cfg(any(feature = "routing", test))]
18use crate::types::Event;
19
20#[derive(Debug, Clone)]
22pub struct DeadLetterEvent {
23 pub event: ReceivedEvent,
25
26 pub reason: String,
28
29 pub dead_lettered_at: u64,
31
32 pub original_subject: Option<String>,
34
35 pub delivery_attempts: Option<u64>,
37
38 pub first_failure_at: Option<u64>,
40}
41
42#[async_trait]
47pub trait DlqHandler: Send + Sync {
48 async fn handle(&self, event: DeadLetterEvent) -> Result<()>;
54
55 async fn count(&self) -> Result<usize>;
57
58 async fn list(&self, limit: usize) -> Result<Vec<DeadLetterEvent>>;
60}
61
62pub struct MemoryDlqHandler {
66 events: Arc<RwLock<Vec<DeadLetterEvent>>>,
67 max_events: usize,
68}
69
70impl MemoryDlqHandler {
71 pub fn new(max_events: usize) -> Self {
73 Self {
74 events: Arc::new(RwLock::new(Vec::new())),
75 max_events,
76 }
77 }
78}
79
80impl Default for MemoryDlqHandler {
81 fn default() -> Self {
82 Self::new(10_000)
83 }
84}
85
86#[async_trait]
87impl DlqHandler for MemoryDlqHandler {
88 async fn handle(&self, event: DeadLetterEvent) -> Result<()> {
89 tracing::warn!(
90 event_id = %event.event.event.id,
91 subject = %event.event.event.subject,
92 num_delivered = event.event.num_delivered,
93 reason = %event.reason,
94 "Event dead-lettered"
95 );
96
97 let mut events = self.events.write().await;
98 events.push(event);
99
100 if self.max_events > 0 && events.len() > self.max_events {
102 let drain_count = events.len() - self.max_events;
103 events.drain(..drain_count);
104 }
105
106 Ok(())
107 }
108
109 async fn count(&self) -> Result<usize> {
110 let events = self.events.read().await;
111 Ok(events.len())
112 }
113
114 async fn list(&self, limit: usize) -> Result<Vec<DeadLetterEvent>> {
115 let events = self.events.read().await;
116 let result: Vec<DeadLetterEvent> = events.iter().rev().take(limit).cloned().collect();
117 Ok(result)
118 }
119}
120
121pub fn should_dead_letter(event: &ReceivedEvent, max_deliver: u64) -> bool {
123 max_deliver > 0 && event.num_delivered >= max_deliver
124}
125
126
127impl DeadLetterEvent {
128 pub fn new(event: ReceivedEvent, reason: impl Into<String>) -> Self {
130 Self {
131 event,
132 reason: reason.into(),
133 dead_lettered_at: now_millis(),
134 original_subject: None,
135 delivery_attempts: None,
136 first_failure_at: None,
137 }
138 }
139
140 pub fn with_original_subject(mut self, subject: impl Into<String>) -> Self {
142 self.original_subject = Some(subject.into());
143 self
144 }
145
146 pub fn with_delivery_attempts(mut self, attempts: u64) -> Self {
148 self.delivery_attempts = Some(attempts);
149 self
150 }
151
152 pub fn with_first_failure_at(mut self, timestamp: u64) -> Self {
154 self.first_failure_at = Some(timestamp);
155 self
156 }
157}
158
159#[cfg(feature = "routing")]
165pub struct SinkDlqHandler {
166 sink: Arc<dyn EventSink>,
167 events: Arc<RwLock<Vec<DeadLetterEvent>>>,
168 max_events: usize,
169}
170
171#[cfg(feature = "routing")]
172impl SinkDlqHandler {
173 pub fn new(sink: Arc<dyn EventSink>, max_events: usize) -> Self {
175 Self {
176 sink,
177 events: Arc::new(RwLock::new(Vec::new())),
178 max_events,
179 }
180 }
181
182 fn to_dlq_event(dle: &DeadLetterEvent) -> Event {
184 let mut event = Event::typed(
185 format!("events.dlq.{}", dle.event.event.subject),
186 "dlq",
187 "a3s.dlq.dead_letter",
188 1,
189 format!("Dead letter: {}", dle.reason),
190 "dlq-handler",
191 dle.event.event.payload.clone(),
192 )
193 .with_metadata("dlq_reason", &dle.reason)
194 .with_metadata("dlq_original_id", &dle.event.event.id)
195 .with_metadata(
196 "dlq_dead_lettered_at",
197 dle.dead_lettered_at.to_string(),
198 );
199
200 if let Some(ref subj) = dle.original_subject {
201 event = event.with_metadata("dlq_original_subject", subj);
202 }
203 if let Some(attempts) = dle.delivery_attempts {
204 event = event.with_metadata("dlq_delivery_attempts", attempts.to_string());
205 }
206 if let Some(first_fail) = dle.first_failure_at {
207 event = event.with_metadata("dlq_first_failure_at", first_fail.to_string());
208 }
209
210 event
211 }
212}
213
214#[cfg(feature = "routing")]
215#[async_trait]
216impl DlqHandler for SinkDlqHandler {
217 async fn handle(&self, event: DeadLetterEvent) -> Result<()> {
218 let dlq_event = Self::to_dlq_event(&event);
220 self.sink.deliver(&dlq_event).await?;
221
222 let mut events = self.events.write().await;
224 events.push(event);
225
226 if self.max_events > 0 && events.len() > self.max_events {
227 let drain_count = events.len() - self.max_events;
228 events.drain(..drain_count);
229 }
230
231 Ok(())
232 }
233
234 async fn count(&self) -> Result<usize> {
235 let events = self.events.read().await;
236 Ok(events.len())
237 }
238
239 async fn list(&self, limit: usize) -> Result<Vec<DeadLetterEvent>> {
240 let events = self.events.read().await;
241 let result: Vec<DeadLetterEvent> = events.iter().rev().take(limit).cloned().collect();
242 Ok(result)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 fn test_received_event(num_delivered: u64) -> ReceivedEvent {
251 ReceivedEvent {
252 event: Event::new(
253 "events.test.a",
254 "test",
255 "Test event",
256 "test",
257 serde_json::json!({}),
258 ),
259 sequence: 1,
260 num_delivered,
261 stream: "test".to_string(),
262 }
263 }
264
265 #[test]
266 fn test_should_dead_letter() {
267 assert!(!should_dead_letter(&test_received_event(1), 5));
268 assert!(!should_dead_letter(&test_received_event(4), 5));
269 assert!(should_dead_letter(&test_received_event(5), 5));
270 assert!(should_dead_letter(&test_received_event(10), 5));
271 }
272
273 #[test]
274 fn test_should_dead_letter_zero_max() {
275 assert!(!should_dead_letter(&test_received_event(100), 0));
277 }
278
279 #[test]
280 fn test_dead_letter_event_creation() {
281 let received = test_received_event(5);
282 let dle = DeadLetterEvent::new(received.clone(), "Max retries exceeded");
283 assert_eq!(dle.reason, "Max retries exceeded");
284 assert_eq!(dle.event.event.id, received.event.id);
285 assert!(dle.dead_lettered_at > 0);
286 }
287
288 #[tokio::test]
289 async fn test_memory_dlq_handle_and_count() {
290 let dlq = MemoryDlqHandler::default();
291 assert_eq!(dlq.count().await.unwrap(), 0);
292
293 let dle = DeadLetterEvent::new(test_received_event(5), "failed");
294 dlq.handle(dle).await.unwrap();
295
296 assert_eq!(dlq.count().await.unwrap(), 1);
297 }
298
299 #[tokio::test]
300 async fn test_memory_dlq_list() {
301 let dlq = MemoryDlqHandler::default();
302
303 for i in 0..5 {
304 let mut received = test_received_event(3);
305 received.sequence = i;
306 let dle = DeadLetterEvent::new(received, format!("reason {}", i));
307 dlq.handle(dle).await.unwrap();
308 }
309
310 let list = dlq.list(3).await.unwrap();
311 assert_eq!(list.len(), 3);
312 assert_eq!(list[0].reason, "reason 4");
314 assert_eq!(list[2].reason, "reason 2");
315 }
316
317 #[tokio::test]
318 async fn test_memory_dlq_max_capacity() {
319 let dlq = MemoryDlqHandler::new(3);
320
321 for i in 0..5 {
322 let dle = DeadLetterEvent::new(test_received_event(1), format!("reason {}", i));
323 dlq.handle(dle).await.unwrap();
324 }
325
326 assert_eq!(dlq.count().await.unwrap(), 3);
327 let list = dlq.list(10).await.unwrap();
328 assert_eq!(list[0].reason, "reason 4");
330 assert_eq!(list[2].reason, "reason 2");
331 }
332
333 #[test]
334 fn test_dead_letter_event_builder_methods() {
335 let received = test_received_event(5);
336 let dle = DeadLetterEvent::new(received, "timeout")
337 .with_original_subject("events.payment.process")
338 .with_delivery_attempts(5)
339 .with_first_failure_at(1700000000000);
340
341 assert_eq!(dle.original_subject.as_deref(), Some("events.payment.process"));
342 assert_eq!(dle.delivery_attempts, Some(5));
343 assert_eq!(dle.first_failure_at, Some(1700000000000));
344 }
345
346 #[test]
347 fn test_dead_letter_event_optional_fields_default_none() {
348 let received = test_received_event(3);
349 let dle = DeadLetterEvent::new(received, "failed");
350
351 assert!(dle.original_subject.is_none());
352 assert!(dle.delivery_attempts.is_none());
353 assert!(dle.first_failure_at.is_none());
354 }
355
356 #[cfg(feature = "routing")]
357 #[tokio::test]
358 async fn test_sink_dlq_handler() {
359 use crate::sink::CollectorSink;
360
361 let collector = Arc::new(CollectorSink::new("dlq-collector"));
362 let dlq = SinkDlqHandler::new(collector.clone(), 100);
363
364 let received = test_received_event(5);
365 let dle = DeadLetterEvent::new(received, "processing error")
366 .with_original_subject("events.order.process")
367 .with_delivery_attempts(5);
368
369 dlq.handle(dle).await.unwrap();
370
371 assert_eq!(dlq.count().await.unwrap(), 1);
372
373 let events = collector.events().await;
375 assert_eq!(events.len(), 1);
376 assert_eq!(events[0].event_type, "a3s.dlq.dead_letter");
377 assert_eq!(events[0].category, "dlq");
378 assert_eq!(events[0].metadata["dlq_reason"], "processing error");
379 assert_eq!(
380 events[0].metadata["dlq_original_subject"],
381 "events.order.process"
382 );
383 assert_eq!(events[0].metadata["dlq_delivery_attempts"], "5");
384 }
385
386 #[cfg(feature = "routing")]
387 #[tokio::test]
388 async fn test_sink_dlq_handler_list() {
389 use crate::sink::CollectorSink;
390
391 let collector = Arc::new(CollectorSink::new("dlq-collector"));
392 let dlq = SinkDlqHandler::new(collector, 100);
393
394 for i in 0..3 {
395 let dle = DeadLetterEvent::new(
396 test_received_event(1),
397 format!("error {}", i),
398 );
399 dlq.handle(dle).await.unwrap();
400 }
401
402 assert_eq!(dlq.count().await.unwrap(), 3);
403 let list = dlq.list(2).await.unwrap();
404 assert_eq!(list.len(), 2);
405 assert_eq!(list[0].reason, "error 2");
406 }
407
408 #[cfg(feature = "routing")]
409 #[tokio::test]
410 async fn test_sink_dlq_handler_max_capacity() {
411 use crate::sink::CollectorSink;
412
413 let collector = Arc::new(CollectorSink::new("dlq-collector"));
414 let dlq = SinkDlqHandler::new(collector, 2);
415
416 for i in 0..5 {
417 let dle = DeadLetterEvent::new(
418 test_received_event(1),
419 format!("error {}", i),
420 );
421 dlq.handle(dle).await.unwrap();
422 }
423
424 assert_eq!(dlq.count().await.unwrap(), 2);
425 }
426}