rust_rabbit/
consumer.rs

1use crate::{
2    connection::Connection,
3    error::RustRabbitError,
4    message::{ErrorType, MessageEnvelope, WireMessage},
5    retry::RetryConfig,
6};
7use futures_lite::stream::StreamExt;
8use lapin::{
9    options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions},
10    types::{AMQPValue, FieldTable},
11    BasicProperties, Channel,
12};
13use serde::de::DeserializeOwned;
14use std::future::Future;
15use std::sync::Arc;
16use tokio::sync::Semaphore;
17use tracing::{debug, error, warn};
18
19/// Message wrapper with retry tracking
20#[derive(Debug)]
21pub struct Message<T>
22where
23    T: Clone,
24{
25    pub data: T,
26    pub retry_attempt: u32,
27    tag: u64,
28    channel: Arc<Channel>,
29}
30
31impl<T> Clone for Message<T>
32where
33    T: Clone,
34{
35    fn clone(&self) -> Self {
36        Self {
37            data: self.data.clone(),
38            retry_attempt: self.retry_attempt,
39            tag: self.tag,
40            channel: Arc::clone(&self.channel),
41        }
42    }
43}
44
45impl<T> Message<T>
46where
47    T: Clone,
48{
49    /// Acknowledge the message
50    pub async fn ack(&self) -> Result<(), RustRabbitError> {
51        self.channel
52            .basic_ack(self.tag, BasicAckOptions::default())
53            .await
54            .map_err(RustRabbitError::from)
55    }
56
57    /// Reject and requeue the message
58    pub async fn nack(&self, requeue: bool) -> Result<(), RustRabbitError> {
59        self.channel
60            .basic_nack(
61                self.tag,
62                lapin::options::BasicNackOptions {
63                    multiple: false,
64                    requeue,
65                },
66            )
67            .await
68            .map_err(RustRabbitError::from)
69    }
70}
71
72/// Consumer configuration builder
73pub struct ConsumerBuilder {
74    connection: Arc<Connection>,
75    queue_name: String,
76    exchange_name: Option<String>,
77    routing_key: Option<String>,
78    retry_config: Option<RetryConfig>,
79    prefetch_count: Option<u16>,
80    auto_ack: bool,
81}
82
83impl ConsumerBuilder {
84    pub fn new(connection: Arc<Connection>, queue_name: impl Into<String>) -> Self {
85        Self {
86            connection,
87            queue_name: queue_name.into(),
88            exchange_name: None,
89            routing_key: None,
90            retry_config: None,
91            prefetch_count: Some(10),
92            auto_ack: true,
93        }
94    }
95
96    /// Bind to an exchange with routing key
97    pub fn bind_to_exchange(
98        mut self,
99        exchange: impl Into<String>,
100        routing_key: impl Into<String>,
101    ) -> Self {
102        self.exchange_name = Some(exchange.into());
103        self.routing_key = Some(routing_key.into());
104        self
105    }
106
107    /// Set routing key (for use with bind_to_exchange)
108    pub fn routing_key(mut self, routing_key: impl Into<String>) -> Self {
109        self.routing_key = Some(routing_key.into());
110        self
111    }
112
113    /// Set concurrency level (same as prefetch count)
114    pub fn concurrency(mut self, count: u16) -> Self {
115        self.prefetch_count = Some(count);
116        self
117    }
118
119    /// Configure retry behavior
120    pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
121        self.retry_config = Some(retry_config);
122        self
123    }
124
125    /// Set prefetch count
126    pub fn with_prefetch(mut self, count: u16) -> Self {
127        self.prefetch_count = Some(count);
128        self
129    }
130
131    /// Disable auto-acknowledge (manual ack required)
132    pub fn manual_ack(mut self) -> Self {
133        self.auto_ack = false;
134        self
135    }
136
137    /// Build the consumer
138    pub fn build(self) -> Consumer {
139        Consumer {
140            connection: self.connection,
141            queue_name: self.queue_name,
142            exchange_name: self.exchange_name,
143            routing_key: self.routing_key,
144            retry_config: self.retry_config,
145            prefetch_count: self.prefetch_count.unwrap_or(10),
146            auto_ack: self.auto_ack,
147        }
148    }
149}
150
151/// Simplified Consumer for message consumption
152pub struct Consumer {
153    connection: Arc<Connection>,
154    queue_name: String,
155    exchange_name: Option<String>,
156    routing_key: Option<String>,
157    retry_config: Option<RetryConfig>,
158    prefetch_count: u16,
159    auto_ack: bool,
160}
161
162impl Consumer {
163    /// Create a new consumer builder
164    pub fn builder(connection: Arc<Connection>, queue_name: impl Into<String>) -> ConsumerBuilder {
165        ConsumerBuilder::new(connection, queue_name)
166    }
167
168    /// Create retry queue with TTL
169    async fn create_retry_queue(
170        &self,
171        channel: &Channel,
172        retry_attempt: u32,
173        delay: std::time::Duration,
174    ) -> Result<String, RustRabbitError> {
175        let retry_queue_name = format!("{}.retry.{}", self.queue_name, retry_attempt);
176        let delay_ms = delay.as_millis() as i64;
177
178        // Create retry queue with TTL that routes back to original queue
179        let mut args = FieldTable::default();
180        args.insert("x-message-ttl".into(), AMQPValue::LongLongInt(delay_ms));
181        args.insert(
182            "x-dead-letter-exchange".into(),
183            AMQPValue::LongString("".into()),
184        ); // Default exchange
185        args.insert(
186            "x-dead-letter-routing-key".into(),
187            AMQPValue::LongString(self.queue_name.clone().into()),
188        );
189
190        channel
191            .queue_declare(
192                &retry_queue_name,
193                QueueDeclareOptions {
194                    durable: true,
195                    ..Default::default()
196                },
197                args,
198            )
199            .await?;
200
201        debug!(
202            "Created retry queue: {} with TTL: {}ms",
203            retry_queue_name, delay_ms
204        );
205        Ok(retry_queue_name)
206    }
207
208    /// Create DLQ (Dead Letter Queue)
209    async fn create_dlq(&self, channel: &Channel) -> Result<String, RustRabbitError> {
210        let dlq_name = format!("{}.dlq", self.queue_name);
211
212        channel
213            .queue_declare(
214                &dlq_name,
215                QueueDeclareOptions {
216                    durable: true,
217                    ..Default::default()
218                },
219                FieldTable::default(),
220            )
221            .await?;
222
223        debug!("Created DLQ: {}", dlq_name);
224        Ok(dlq_name)
225    }
226
227    /// Send message to retry queue with delay
228    async fn send_to_retry_queue(
229        &self,
230        channel: &Channel,
231        message_data: &[u8],
232        retry_attempt: u32,
233        delay: std::time::Duration,
234    ) -> Result<(), RustRabbitError> {
235        let retry_queue_name = self
236            .create_retry_queue(channel, retry_attempt, delay)
237            .await?;
238
239        // Publish to retry queue
240        channel
241            .basic_publish(
242                "", // Default exchange
243                &retry_queue_name,
244                BasicPublishOptions::default(),
245                message_data,
246                BasicProperties::default()
247                    .with_content_type("application/json".into())
248                    .with_delivery_mode(2), // Persistent
249            )
250            .await?
251            .await?;
252
253        debug!("Sent message to retry queue: {}", retry_queue_name);
254        Ok(())
255    }
256
257    /// Send message to DLQ
258    async fn send_to_dlq_simple(
259        &self,
260        channel: &Channel,
261        message_data: &[u8],
262    ) -> Result<(), RustRabbitError> {
263        let dlq_name = self.create_dlq(channel).await?;
264
265        // Publish to DLQ
266        channel
267            .basic_publish(
268                "", // Default exchange
269                &dlq_name,
270                BasicPublishOptions::default(),
271                message_data,
272                BasicProperties::default()
273                    .with_content_type("application/json".into())
274                    .with_delivery_mode(2), // Persistent
275            )
276            .await?
277            .await?;
278
279        debug!("Sent message to DLQ: {}", dlq_name);
280        Ok(())
281    }
282
283    /// Start consuming messages
284    pub async fn consume<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
285    where
286        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
287        H: Fn(Message<T>) -> Fut + Send + Sync + Clone + 'static,
288        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
289    {
290        let channel = self.connection.create_channel().await?;
291
292        // Set prefetch count
293        channel
294            .basic_qos(
295                self.prefetch_count,
296                lapin::options::BasicQosOptions::default(),
297            )
298            .await?;
299
300        // Setup infrastructure (queues, exchanges)
301        self.setup_infrastructure(&channel).await?;
302
303        // Start consuming
304        let mut consumer = channel
305            .basic_consume(
306                &self.queue_name,
307                "",
308                BasicConsumeOptions::default(),
309                FieldTable::default(),
310            )
311            .await?;
312
313        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
314
315        debug!("Started consuming from queue: {}", self.queue_name);
316
317        // Process messages
318        while let Some(delivery_result) = consumer.next().await {
319            let delivery = delivery_result?;
320            let permit = semaphore.clone().acquire_owned().await.unwrap();
321            let handler_clone = handler.clone();
322            let auto_ack = self.auto_ack;
323            let channel_clone = Arc::new(channel.clone());
324            let retry_config = self.retry_config.clone();
325            let consumer_self = Consumer {
326                connection: self.connection.clone(),
327                queue_name: self.queue_name.clone(),
328                exchange_name: self.exchange_name.clone(),
329                routing_key: self.routing_key.clone(),
330                retry_config: self.retry_config.clone(),
331                prefetch_count: self.prefetch_count,
332                auto_ack: self.auto_ack,
333            };
334
335            tokio::spawn(async move {
336                let _permit = permit;
337
338                // Deserialize as WireMessage format
339                match serde_json::from_slice::<crate::message::WireMessage<T>>(&delivery.data) {
340                    Ok(wire_msg) => {
341                        let message = Message {
342                            data: wire_msg.data,
343                            retry_attempt: wire_msg.retry_attempt,
344                            tag: delivery.delivery_tag,
345                            channel: channel_clone.clone(),
346                        };
347
348                        // Process message
349                        match handler_clone(message.clone()).await {
350                            Ok(()) => {
351                                if auto_ack {
352                                    if let Err(e) = message.ack().await {
353                                        error!("Failed to ack message: {}", e);
354                                    }
355                                }
356                                debug!("Message processed successfully");
357                            }
358                            Err(e) => {
359                                error!("Handler error: {}", e);
360                                if auto_ack {
361                                    // Check if retry is configured
362                                    if let Some(retry_cfg) = &retry_config {
363                                        if message.retry_attempt < retry_cfg.max_retries {
364                                            // Calculate delay for next retry
365                                            if let Some(delay) =
366                                                retry_cfg.calculate_delay(message.retry_attempt)
367                                            {
368                                                warn!(
369                                                    "Scheduling retry {} with delay {:?} for message",
370                                                    message.retry_attempt + 1,
371                                                    delay
372                                                );
373
374                                                // Update retry attempt in wire message
375                                                let wire_msg = WireMessage {
376                                                    data: message.data.clone(),
377                                                    retry_attempt: message.retry_attempt + 1,
378                                                };
379
380                                                let retry_payload =
381                                                    match serde_json::to_vec(&wire_msg) {
382                                                        Ok(payload) => payload,
383                                                        Err(e) => {
384                                                            error!(
385                                                            "Failed to serialize retry message: {}",
386                                                            e
387                                                        );
388                                                            if let Err(e) =
389                                                                message.nack(false).await
390                                                            {
391                                                                error!(
392                                                                    "Failed to nack message: {}",
393                                                                    e
394                                                                );
395                                                            }
396                                                            return;
397                                                        }
398                                                    };
399
400                                                // Send to retry queue with delay
401                                                if let Err(e) = consumer_self
402                                                    .send_to_retry_queue(
403                                                        &channel_clone,
404                                                        &retry_payload,
405                                                        message.retry_attempt + 1,
406                                                        delay,
407                                                    )
408                                                    .await
409                                                {
410                                                    error!("Failed to send to retry queue: {}", e);
411                                                    if let Err(e) = message.nack(false).await {
412                                                        error!("Failed to nack message: {}", e);
413                                                    }
414                                                    return;
415                                                }
416
417                                                // ACK original message (it's now in retry queue)
418                                                if let Err(e) = message.ack().await {
419                                                    error!(
420                                                        "Failed to ack message after retry: {}",
421                                                        e
422                                                    );
423                                                }
424                                            } else {
425                                                // No more retries, send to DLQ
426                                                warn!("Retry exhausted, sending to DLQ");
427                                                if let Err(e) = consumer_self
428                                                    .send_to_dlq_simple(
429                                                        &channel_clone,
430                                                        &delivery.data,
431                                                    )
432                                                    .await
433                                                {
434                                                    error!("Failed to send to DLQ: {}", e);
435                                                }
436                                                if let Err(e) = message.ack().await {
437                                                    error!(
438                                                        "Failed to ack message after DLQ: {}",
439                                                        e
440                                                    );
441                                                }
442                                            }
443                                        } else {
444                                            // Retry exhausted, send to DLQ
445                                            warn!("Max retries reached, sending to DLQ");
446                                            if let Err(e) = consumer_self
447                                                .send_to_dlq_simple(&channel_clone, &delivery.data)
448                                                .await
449                                            {
450                                                error!("Failed to send to DLQ: {}", e);
451                                            }
452                                            if let Err(e) = message.ack().await {
453                                                error!("Failed to ack message after DLQ: {}", e);
454                                            }
455                                        }
456                                    } else {
457                                        // No retry config, just nack
458                                        if let Err(e) = message.nack(false).await {
459                                            error!("Failed to nack message: {}", e);
460                                        }
461                                    }
462                                }
463                            }
464                        }
465                    }
466                    Err(e) => {
467                        error!("Failed to deserialize message: {}", e);
468                        if auto_ack {
469                            // Reject malformed messages
470                            if let Err(e) = channel_clone
471                                .basic_nack(
472                                    delivery.delivery_tag,
473                                    lapin::options::BasicNackOptions {
474                                        multiple: false,
475                                        requeue: false,
476                                    },
477                                )
478                                .await
479                            {
480                                error!("Failed to nack malformed message: {}", e);
481                            }
482                        }
483                    }
484                }
485            });
486        }
487
488        Ok(())
489    }
490
491    /// Setup queue and exchange infrastructure
492    async fn setup_infrastructure(&self, channel: &Channel) -> Result<(), RustRabbitError> {
493        // Declare queue
494        channel
495            .queue_declare(
496                &self.queue_name,
497                QueueDeclareOptions {
498                    durable: true,
499                    ..Default::default()
500                },
501                FieldTable::default(),
502            )
503            .await?;
504
505        // Bind to exchange if specified
506        if let (Some(exchange), Some(routing_key)) = (&self.exchange_name, &self.routing_key) {
507            channel
508                .queue_bind(
509                    &self.queue_name,
510                    exchange,
511                    routing_key,
512                    lapin::options::QueueBindOptions::default(),
513                    FieldTable::default(),
514                )
515                .await?;
516        }
517
518        Ok(())
519    }
520
521    /// Start consuming message envelopes with full retry support
522    pub async fn consume_envelopes<T, H, Fut>(&self, handler: H) -> Result<(), RustRabbitError>
523    where
524        T: DeserializeOwned + Send + Clone + Sync + 'static + serde::Serialize,
525        H: Fn(MessageEnvelope<T>) -> Fut + Send + Sync + Clone + 'static,
526        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send,
527    {
528        let channel = self.connection.create_channel().await?;
529        let retry_config = self.retry_config.clone();
530
531        // Set prefetch count
532        channel
533            .basic_qos(
534                self.prefetch_count,
535                lapin::options::BasicQosOptions::default(),
536            )
537            .await?;
538
539        // Setup queue and exchange
540        self.setup_infrastructure(&channel).await?;
541
542        // Create consumer
543        let mut consumer = channel
544            .basic_consume(
545                &self.queue_name,
546                "rust-rabbit-envelope-consumer",
547                BasicConsumeOptions::default(),
548                FieldTable::default(),
549            )
550            .await?;
551
552        let semaphore = Arc::new(Semaphore::new(self.prefetch_count as usize));
553
554        debug!(
555            "Started consuming envelopes from queue: {}",
556            self.queue_name
557        );
558
559        // Process message envelopes with retry support
560        while let Some(delivery_result) = consumer.next().await {
561            let delivery = delivery_result?;
562            let permit = semaphore.clone().acquire_owned().await.unwrap();
563            let handler_clone = handler.clone();
564            let auto_ack = self.auto_ack;
565            let channel_clone = Arc::new(channel.clone());
566            let retry_config_clone = retry_config.clone();
567            let queue_name = self.queue_name.clone();
568            let connection = self.connection.clone();
569
570            tokio::spawn(async move {
571                let _permit = permit;
572
573                // Try to deserialize as MessageEnvelope
574                match serde_json::from_slice::<MessageEnvelope<T>>(&delivery.data) {
575                    Ok(mut envelope) => {
576                        debug!(
577                            "Processing envelope {} (attempt {}/{})",
578                            envelope.metadata.message_id,
579                            envelope.metadata.retry_attempt + 1,
580                            envelope.metadata.max_retries + 1
581                        );
582
583                        // Process message
584                        match handler_clone(envelope.clone()).await {
585                            Ok(()) => {
586                                if auto_ack {
587                                    if let Err(e) = channel_clone
588                                        .basic_ack(
589                                            delivery.delivery_tag,
590                                            BasicAckOptions::default(),
591                                        )
592                                        .await
593                                    {
594                                        error!("Failed to ack message: {}", e);
595                                    }
596                                }
597                                debug!(
598                                    "Envelope {} processed successfully",
599                                    envelope.metadata.message_id
600                                );
601                            }
602                            Err(e) => {
603                                error!(
604                                    "Handler error for envelope {}: {}",
605                                    envelope.metadata.message_id, e
606                                );
607
608                                // Determine error type (simplified classification)
609                                let error_type = classify_error(e.as_ref());
610
611                                // Add error to envelope
612                                envelope = envelope.with_error(
613                                    &e.to_string(),
614                                    error_type,
615                                    Some(&format!("Queue: {}", queue_name)),
616                                );
617
618                                if auto_ack {
619                                    // Check if we should retry
620                                    if let Some(retry_cfg) = &retry_config_clone {
621                                        if !envelope.is_retry_exhausted() {
622                                            // Calculate delay and schedule retry
623                                            if let Some(delay) = retry_cfg
624                                                .calculate_delay(envelope.metadata.retry_attempt)
625                                            {
626                                                warn!(
627                                                    "Scheduling retry {} for envelope {} with delay {:?}",
628                                                    envelope.metadata.retry_attempt + 1,
629                                                    envelope.metadata.message_id,
630                                                    delay
631                                                );
632
633                                                // Increment retry attempt in envelope
634                                                envelope.metadata.retry_attempt += 1;
635
636                                                // Serialize updated envelope
637                                                match serde_json::to_vec(&envelope) {
638                                                    Ok(retry_payload) => {
639                                                        // Create consumer instance for access to methods
640                                                        let consumer_self = Consumer {
641                                                            connection: connection.clone(),
642                                                            queue_name: queue_name.clone(),
643                                                            exchange_name: None,
644                                                            routing_key: None,
645                                                            retry_config: retry_config_clone
646                                                                .clone(),
647                                                            prefetch_count: 10,
648                                                            auto_ack: true,
649                                                        };
650
651                                                        // Send to retry queue with delay
652                                                        if let Err(e) = consumer_self
653                                                            .send_to_retry_queue(
654                                                                &channel_clone,
655                                                                &retry_payload,
656                                                                envelope.metadata.retry_attempt,
657                                                                delay,
658                                                            )
659                                                            .await
660                                                        {
661                                                            error!("Failed to send envelope to retry queue: {}", e);
662                                                            // Fallback to simple nack
663                                                            if let Err(e) = channel_clone
664                                                                .basic_nack(
665                                                                    delivery.delivery_tag,
666                                                                    lapin::options::BasicNackOptions {
667                                                                        multiple: false,
668                                                                        requeue: false,
669                                                                    },
670                                                                )
671                                                                .await
672                                                            {
673                                                                error!("Failed to nack message: {}", e);
674                                                            }
675                                                            return;
676                                                        }
677
678                                                        // ACK original message (it's now in retry queue)
679                                                        if let Err(e) = channel_clone
680                                                            .basic_ack(
681                                                                delivery.delivery_tag,
682                                                                BasicAckOptions::default(),
683                                                            )
684                                                            .await
685                                                        {
686                                                            error!("Failed to ack message after retry: {}", e);
687                                                        }
688                                                    }
689                                                    Err(e) => {
690                                                        error!("Failed to serialize envelope for retry: {}", e);
691                                                        // Fallback to simple nack
692                                                        if let Err(e) = channel_clone
693                                                            .basic_nack(
694                                                                delivery.delivery_tag,
695                                                                lapin::options::BasicNackOptions {
696                                                                    multiple: false,
697                                                                    requeue: false,
698                                                                },
699                                                            )
700                                                            .await
701                                                        {
702                                                            error!("Failed to nack message: {}", e);
703                                                        }
704                                                    }
705                                                }
706                                            } else {
707                                                // No more retries, send to DLQ
708                                                Self::send_to_dlq(
709                                                    &envelope,
710                                                    retry_cfg,
711                                                    &connection,
712                                                    &queue_name,
713                                                )
714                                                .await;
715
716                                                // ACK original message
717                                                if let Err(e) = channel_clone
718                                                    .basic_ack(
719                                                        delivery.delivery_tag,
720                                                        BasicAckOptions::default(),
721                                                    )
722                                                    .await
723                                                {
724                                                    error!(
725                                                        "Failed to ack message after DLQ: {}",
726                                                        e
727                                                    );
728                                                }
729                                            }
730                                        } else {
731                                            // Retry exhausted, send to DLQ
732                                            warn!(
733                                                "Retry exhausted for envelope {}",
734                                                envelope.metadata.message_id
735                                            );
736                                            Self::send_to_dlq(
737                                                &envelope,
738                                                retry_cfg,
739                                                &connection,
740                                                &queue_name,
741                                            )
742                                            .await;
743
744                                            // ACK original message
745                                            if let Err(e) = channel_clone
746                                                .basic_ack(
747                                                    delivery.delivery_tag,
748                                                    BasicAckOptions::default(),
749                                                )
750                                                .await
751                                            {
752                                                error!("Failed to ack message after DLQ: {}", e);
753                                            }
754                                        }
755                                    } else {
756                                        // No retry config, just nack
757                                        if let Err(e) = channel_clone
758                                            .basic_nack(
759                                                delivery.delivery_tag,
760                                                lapin::options::BasicNackOptions {
761                                                    multiple: false,
762                                                    requeue: false,
763                                                },
764                                            )
765                                            .await
766                                        {
767                                            error!("Failed to nack message: {}", e);
768                                        }
769                                    }
770                                }
771                            }
772                        }
773                    }
774                    Err(e) => {
775                        error!("Failed to deserialize message envelope: {}", e);
776                        if auto_ack {
777                            // Reject malformed messages
778                            if let Err(e) = channel_clone
779                                .basic_nack(
780                                    delivery.delivery_tag,
781                                    lapin::options::BasicNackOptions {
782                                        multiple: false,
783                                        requeue: false,
784                                    },
785                                )
786                                .await
787                            {
788                                error!("Failed to nack malformed envelope: {}", e);
789                            }
790                        }
791                    }
792                }
793            });
794        }
795
796        Ok(())
797    }
798
799    /// Send failed message to Dead Letter Queue
800    async fn send_to_dlq<T>(
801        envelope: &MessageEnvelope<T>,
802        retry_config: &RetryConfig,
803        connection: &Arc<Connection>,
804        queue_name: &str,
805    ) where
806        T: serde::Serialize,
807    {
808        match connection.create_channel().await {
809            Ok(dlq_channel) => {
810                let dlq_name = retry_config.get_dead_letter_queue(queue_name);
811
812                // Declare DLQ
813                if let Err(e) = dlq_channel
814                    .queue_declare(
815                        &dlq_name,
816                        QueueDeclareOptions {
817                            durable: true,
818                            ..Default::default()
819                        },
820                        FieldTable::default(),
821                    )
822                    .await
823                {
824                    error!("Failed to declare DLQ {}: {}", dlq_name, e);
825                    return;
826                }
827
828                // Publish to DLQ with failure summary
829                let failure_summary = envelope.get_failure_summary();
830                let dlq_payload = serde_json::json!({
831                    "envelope": envelope,
832                    "failure_summary": failure_summary,
833                    "sent_to_dlq_at": chrono::Utc::now(),
834                });
835
836                if let Ok(payload_bytes) = serde_json::to_vec(&dlq_payload) {
837                    if let Err(e) = dlq_channel
838                        .basic_publish(
839                            "",
840                            &dlq_name,
841                            lapin::options::BasicPublishOptions::default(),
842                            &payload_bytes,
843                            lapin::BasicProperties::default(),
844                        )
845                        .await
846                    {
847                        error!("Failed to publish to DLQ {}: {}", dlq_name, e);
848                    } else {
849                        warn!(
850                            "Sent envelope {} to DLQ: {}",
851                            envelope.metadata.message_id, failure_summary
852                        );
853                    }
854                }
855            }
856            Err(e) => {
857                error!("Failed to create DLQ channel: {}", e);
858            }
859        }
860    }
861}
862
863/// Classify error type based on error message (simplified heuristics)
864fn classify_error(error: &(dyn std::error::Error + Send + Sync)) -> ErrorType {
865    let error_msg = error.to_string().to_lowercase();
866
867    if error_msg.contains("timeout")
868        || error_msg.contains("connection")
869        || error_msg.contains("network")
870        || error_msg.contains("temporary")
871    {
872        ErrorType::Transient
873    } else if error_msg.contains("rate limit")
874        || error_msg.contains("quota")
875        || error_msg.contains("resource")
876    {
877        ErrorType::Resource
878    } else if error_msg.contains("validation")
879        || error_msg.contains("authentication")
880        || error_msg.contains("authorization")
881        || error_msg.contains("invalid")
882        || error_msg.contains("bad request")
883    {
884        ErrorType::Permanent
885    } else {
886        ErrorType::Unknown
887    }
888}