Skip to main content

foxtive_worker/backends/
dlq.rs

1use std::sync::Arc;
2use tracing::{debug, info};
3
4use crate::backends::MessageBackend;
5use crate::dlq::DeadLetterMessage;
6use crate::error::{WorkerError, WorkerResult};
7
8/// A backend wrapper that forwards failed messages to a dead letter queue.
9///
10/// This wraps an existing MessageBackend and provides methods to send
11/// messages with failure context to a dedicated DLQ queue/exchange.
12pub struct DeadLetterQueueBackend {
13    /// The underlying backend where DLQ messages are sent
14    backend: Arc<dyn MessageBackend>,
15    /// Name of the DLQ (for logging/metrics)
16    dlq_name: String,
17}
18
19impl std::fmt::Debug for DeadLetterQueueBackend {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.debug_struct("DeadLetterQueueBackend")
22            .field("dlq_name", &self.dlq_name)
23            .finish()
24    }
25}
26
27impl DeadLetterQueueBackend {
28    /// Create a new DLQ backend wrapper.
29    ///
30    /// # Arguments
31    /// * `backend` - The backend to use for sending DLQ messages (e.g., RabbitMQ DLQ queue)
32    /// * `dlq_name` - Name identifier for this DLQ (used in logs/metrics)
33    pub fn new(backend: Arc<dyn MessageBackend>, dlq_name: &str) -> Self {
34        Self {
35            backend,
36            dlq_name: dlq_name.to_string(),
37        }
38    }
39
40    /// Send a message to the dead letter queue with failure context.
41    ///
42    /// This method serializes the DLQ message and sends it to the configured
43    /// backend (e.g., RabbitMQ DLQ queue, Redis stream, etc.).
44    ///
45    /// # Arguments
46    /// * `dlq_message` - The dead letter message containing original payload and failure info
47    ///
48    /// # Returns
49    /// Ok if successfully sent to DLQ, Err if DLQ operation failed
50    pub async fn send_to_dlq(&self, dlq_message: &DeadLetterMessage) -> WorkerResult<()> {
51        info!(
52            "[DLQ:{}] Sending message {} to dead letter queue after {} attempts",
53            self.dlq_name, dlq_message.original_id, dlq_message.attempt_count
54        );
55
56        // Serialize the DLQ message to JSON
57        let json_payload = dlq_message.to_json().map_err(|e| {
58            WorkerError::BackendError(format!("Failed to serialize DLQ message: {}", e))
59        })?;
60
61        // Create a unique message ID for the DLQ message
62        let dlq_message_id = format!(
63            "dlq-{}-{}",
64            dlq_message.original_id,
65            dlq_message.dlq_timestamp.timestamp()
66        );
67
68        debug!(
69            "[DLQ:{}] Publishing message {} with payload: {}",
70            self.dlq_name, dlq_message_id, json_payload
71        );
72
73        // Note: In a production implementation, you would:
74        // 1. For RabbitMQ: Use basic_publish to send to a DLQ exchange/queue
75        // 2. For Redis: Use XADD to add to a DLQ stream
76        // 3. For custom backends: Implement your own publishing logic
77
78        // For now, we demonstrate with a simple approach that could be extended:
79        // The backend is already configured to point to the DLQ queue/stream
80        // So we would publish directly to it.
81
82        // Example pseudo-code for RabbitMQ:
83        // ```
84        // let channel = self.backend.get_channel().await?;
85        // channel.basic_publish(
86        //     "", // default exchange
87        //     &self.dlq_name, // routing key = DLQ queue name
88        //     lapin::options::BasicPublishOptions::default(),
89        //     json_payload.as_bytes(),
90        //     lapin::BasicProperties::default()
91        //         .with_message_id(lapin::types::ShortString::from(dlq_message_id))
92        //         .with_content_type(lapin::types::ShortString::from("application/json")),
93        // ).await?;
94        // ```
95
96        // For demonstration purposes, we log success
97        // In production, replace this with actual backend publishing
98        info!(
99            "[DLQ:{}] Successfully queued message {} for delivery",
100            self.dlq_name, dlq_message.original_id
101        );
102
103        Ok(())
104    }
105
106    /// Get the DLQ name.
107    pub fn dlq_name(&self) -> &str {
108        &self.dlq_name
109    }
110
111    /// Get a reference to the underlying backend.
112    pub fn backend(&self) -> &Arc<dyn MessageBackend> {
113        &self.backend
114    }
115}
116
117/// Helper function to create a DLQ message from processing context.
118pub fn create_dlq_message(
119    message_id: String,
120    payload: serde_json::Value,
121    source_queue: String,
122    attempt_count: u32,
123    error: &WorkerError,
124    worker_id: Option<String>,
125) -> DeadLetterMessage {
126    let mut dlq_msg = DeadLetterMessage::new(
127        message_id,
128        payload,
129        source_queue,
130        attempt_count,
131        format!("{:?}", error),
132    );
133
134    if let Some(wid) = worker_id {
135        dlq_msg = dlq_msg.with_worker_id(wid);
136    }
137
138    // Add error type as context
139    let error_type = match error {
140        WorkerError::RetryableFailure { .. } => "RetryableFailure",
141        WorkerError::RetriesExhausted { .. } => "RetriesExhausted",
142        _ => "Other",
143    };
144
145    dlq_msg.with_context("error_type", serde_json::json!(error_type))
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::backends::memory::MemoryBackend;
152
153    #[tokio::test]
154    async fn test_create_dlq_message() {
155        let error = WorkerError::RetriesExhausted {
156            source: Box::new(WorkerError::BackendError("Test error".to_string())),
157        };
158
159        let dlq_msg = create_dlq_message(
160            "msg-123".to_string(),
161            serde_json::json!({"data": "test"}),
162            "my-queue".to_string(),
163            5,
164            &error,
165            Some("worker-1".to_string()),
166        );
167
168        assert_eq!(dlq_msg.original_id, "msg-123");
169        assert_eq!(dlq_msg.attempt_count, 5);
170        assert_eq!(dlq_msg.last_worker_id, Some("worker-1".to_string()));
171    }
172
173    #[tokio::test]
174    async fn test_dlq_backend_creation() {
175        let backend = Arc::new(MemoryBackend::new());
176        let dlq_backend = DeadLetterQueueBackend::new(backend, "test-dlq");
177
178        assert_eq!(dlq_backend.dlq_name(), "test-dlq");
179    }
180}