foxtive_worker/backends/
dlq.rs1use std::sync::Arc;
2use tracing::{debug, info};
3
4use crate::backends::MessageBackend;
5use crate::dlq::DeadLetterMessage;
6use crate::error::{WorkerError, WorkerResult};
7
8pub struct DeadLetterQueueBackend {
13 backend: Arc<dyn MessageBackend>,
15 dlq_name: String,
17}
18
19impl std::fmt::Debug for DeadLetterQueueBackend {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 f.debug_struct("DeadLetterQueueBackend")
22 .field("dlq_name", &self.dlq_name)
23 .finish()
24 }
25}
26
27impl DeadLetterQueueBackend {
28 pub fn new(backend: Arc<dyn MessageBackend>, dlq_name: &str) -> Self {
34 Self {
35 backend,
36 dlq_name: dlq_name.to_string(),
37 }
38 }
39
40 pub async fn send_to_dlq(&self, dlq_message: &DeadLetterMessage) -> WorkerResult<()> {
51 info!(
52 "[DLQ:{}] Sending message {} to dead letter queue after {} attempts",
53 self.dlq_name, dlq_message.original_id, dlq_message.attempt_count
54 );
55
56 let json_payload = dlq_message.to_json().map_err(|e| {
58 WorkerError::BackendError(format!("Failed to serialize DLQ message: {}", e))
59 })?;
60
61 let dlq_message_id = format!(
63 "dlq-{}-{}",
64 dlq_message.original_id,
65 dlq_message.dlq_timestamp.timestamp()
66 );
67
68 debug!(
69 "[DLQ:{}] Publishing message {} with payload: {}",
70 self.dlq_name, dlq_message_id, json_payload
71 );
72
73 info!(
99 "[DLQ:{}] Successfully queued message {} for delivery",
100 self.dlq_name, dlq_message.original_id
101 );
102
103 Ok(())
104 }
105
106 pub fn dlq_name(&self) -> &str {
108 &self.dlq_name
109 }
110
111 pub fn backend(&self) -> &Arc<dyn MessageBackend> {
113 &self.backend
114 }
115}
116
117pub fn create_dlq_message(
119 message_id: String,
120 payload: serde_json::Value,
121 source_queue: String,
122 attempt_count: u32,
123 error: &WorkerError,
124 worker_id: Option<String>,
125) -> DeadLetterMessage {
126 let mut dlq_msg = DeadLetterMessage::new(
127 message_id,
128 payload,
129 source_queue,
130 attempt_count,
131 format!("{:?}", error),
132 );
133
134 if let Some(wid) = worker_id {
135 dlq_msg = dlq_msg.with_worker_id(wid);
136 }
137
138 let error_type = match error {
140 WorkerError::RetryableFailure { .. } => "RetryableFailure",
141 WorkerError::RetriesExhausted { .. } => "RetriesExhausted",
142 _ => "Other",
143 };
144
145 dlq_msg.with_context("error_type", serde_json::json!(error_type))
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::backends::memory::MemoryBackend;
152
153 #[tokio::test]
154 async fn test_create_dlq_message() {
155 let error = WorkerError::RetriesExhausted {
156 source: Box::new(WorkerError::BackendError("Test error".to_string())),
157 };
158
159 let dlq_msg = create_dlq_message(
160 "msg-123".to_string(),
161 serde_json::json!({"data": "test"}),
162 "my-queue".to_string(),
163 5,
164 &error,
165 Some("worker-1".to_string()),
166 );
167
168 assert_eq!(dlq_msg.original_id, "msg-123");
169 assert_eq!(dlq_msg.attempt_count, 5);
170 assert_eq!(dlq_msg.last_worker_id, Some("worker-1".to_string()));
171 }
172
173 #[tokio::test]
174 async fn test_dlq_backend_creation() {
175 let backend = Arc::new(MemoryBackend::new());
176 let dlq_backend = DeadLetterQueueBackend::new(backend, "test-dlq");
177
178 assert_eq!(dlq_backend.dlq_name(), "test-dlq");
179 }
180}