foxtive-worker 0.4.0

Foxtive Worker - Background worker framework for message processing
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
use async_trait::async_trait;
use lapin::options::{
    BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
    BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use lapin::types::{AMQPValue, FieldTable};
use lapin::BasicProperties;
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tracing::error;
use crate::MessageProperties;
use crate::backends::ReceiveResult;
use crate::backends::contract::MessageBackend;
use crate::error::{WorkerError, WorkerResult};
use crate::message::{Message, MessageMetadata, ReceivedMessage};

/// RabbitMQ acknowledgment handle.
pub struct RabbitMqAckHandle {
    delivery_tag: u64,
    /// Shared channel reference with mutex for thread-safe ack operations
    ///
    /// OPTIMIZATION: Uses batch acknowledgments (multiple=true) when possible
    /// to reduce mutex contention under high throughput. Individual acks still
    /// require the mutex, but batch mode allows acknowledging multiple messages
    /// in a single operation.
    ///
    /// Delivery tags are channel-specific in AMQP, so we must use the same channel.
    ack_channel: Arc<Mutex<lapin::Channel>>,
    /// Optional retry publisher for delayed retries
    retry_publisher: Option<Arc<dyn RetryPublisher + Send + Sync>>,
}

impl std::fmt::Debug for RabbitMqAckHandle {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RabbitMqAckHandle")
            .field("delivery_tag", &self.delivery_tag)
            .field("retry_publisher", &self.retry_publisher.as_ref().map(|_| "<RetryPublisher>"))
            .finish()
    }
}

#[async_trait]
impl crate::message::AckHandle for RabbitMqAckHandle {
    async fn ack(&self) -> WorkerResult<()> {
        // Lock the channel to ensure sequential ack operations
        tracing::debug!("Attempting to ack delivery tag {}", self.delivery_tag);
        let channel = self.ack_channel.lock().await;

        match channel
            .basic_ack(
                self.delivery_tag,
                BasicAckOptions {
                    multiple: false, // Individual ack - change to true for batch mode
                },
            )
            .await
        {
            Ok(_) => {
                tracing::debug!("Successfully acked delivery tag {}", self.delivery_tag);
                Ok(())
            }
            Err(e) => {
                error!("Failed to ack delivery tag {}: {}", self.delivery_tag, e);
                Err(WorkerError::BackendError(format!(
                    "Failed to ack message: {}",
                    e
                )))
            }
        }
    }

    async fn nack(&self, requeue: bool) -> WorkerResult<()> {
        // Lock the channel to ensure sequential nack operations
        tracing::debug!(
            "Attempting to nack delivery tag {} (requeue={})",
            self.delivery_tag,
            requeue
        );
        
        // If requeue is false and we have DLQ configured, message will be discarded
        // The pool layer should have already published to DLQ before calling nack(false)
        let channel = self.ack_channel.lock().await;

        channel
            .basic_nack(
                self.delivery_tag,
                lapin::options::BasicNackOptions {
                    multiple: false, // Individual nack
                    requeue,
                },
            )
            .await
            .map_err(|e| {
                error!("Failed to nack delivery tag {}: {}", self.delivery_tag, e);
                WorkerError::BackendError(format!("Failed to nack message: {}", e))
            })?;

        Ok(())
    }

    async fn retry_with_delay(
        &self,
        message: &crate::message::Message<serde_json::Value>,
        delay_ms: u64,
    ) -> WorkerResult<()> {
        tracing::info!(
            "[RabbitMqAckHandle] retry_with_delay called for message {} with delay {}ms",
            message.id,
            delay_ms
        );
        
        if let Some(ref publisher) = self.retry_publisher {
            tracing::info!(
                "[RabbitMqAckHandle] Retry publisher available for message {}, attempting delayed retry",
                message.id
            );
            // Use delayed retry via DLX+TTL
            match publisher.publish_retry(message, delay_ms).await {
                Ok(()) => {
                    tracing::info!(
                        "Successfully published message {} to retry queue with {}ms delay",
                        message.id,
                        delay_ms
                    );
                    // Acknowledge the original message since we've republished it
                    self.ack().await
                }
                Err(e) => {
                    error!(
                        "Failed to publish message {} to retry queue: {}. Falling back to immediate nack.",
                        message.id,
                        e
                    );
                    // Fallback to immediate requeue
                    self.nack(true).await
                }
            }
        } else {
            tracing::warn!(
                "[RabbitMqAckHandle] Retry publisher NOT available for message {}. Using immediate nack.",
                message.id
            );
            // Fallback to immediate requeue if retry publisher not available
            self.nack(true).await
        }
    }

    async fn send_to_dlq(
        &self,
        message: &crate::message::Message<serde_json::Value>,
        error_message: &str,
    ) -> WorkerResult<()> {
        tracing::info!(
            "[RabbitMqAckHandle] send_to_dlq called for message {}",
            message.id
        );
        
        if let Some(ref backend) = self.retry_publisher {
            // Cast the Arc<dyn RetryPublisher> back to RabbitMqBackend to call publish_to_dlq
            // This is safe because we only create RabbitMqBackend instances as RetryPublisher
            if let Some(rabbit_backend) = backend.as_any().downcast_ref::<RabbitMqBackend>() {
                match rabbit_backend.publish_to_dlq(message, error_message).await {
                    Ok(()) => {
                        tracing::info!(
                            "Successfully published message {} to DLQ after retries exhausted",
                            message.id
                        );
                        // Acknowledge the original message since it's now in DLQ
                        self.ack().await
                    }
                    Err(e) => {
                        error!(
                            "Failed to publish message {} to DLQ: {}. Falling back to nack(false).",
                            message.id,
                            e
                        );
                        // Fallback: nack without requeue (message will be discarded)
                        self.nack(false).await
                    }
                }
            } else {
                tracing::warn!("[RabbitMqAckHandle] retry_publisher is not RabbitMqBackend, using nack(false)");
                self.nack(false).await
            }
        } else {
            tracing::warn!(
                "[RabbitMqAckHandle] Retry publisher NOT available for message {}. Using nack(false).",
                message.id
            );
            // Fallback: nack without requeue
            self.nack(false).await
        }
    }
}

/// Trait for publishing messages to retry queue
#[async_trait]
pub trait RetryPublisher {
    async fn publish_retry(&self, message: &Message<serde_json::Value>, delay_ms: u64) -> WorkerResult<()>;
    
    /// Cast self to Any for downcasting
    fn as_any(&self) -> &dyn std::any::Any;
}

/// Configuration for RabbitMQ consumer.
#[derive(Debug, Clone)]
pub struct RabbitMqConsumerConfig {
    /// Queue name to consume from
    pub queue_name: String,
    /// Consumer tag (identifier)
    pub consumer_tag: String,
    /// Whether to auto-ack messages (not recommended)
    pub auto_ack: bool,
    /// Prefetch count (max unacked messages)
    pub prefetch_count: u16,
    /// Whether to requeue on nack by default
    pub requeue_on_nack: bool,
    /// Enable delayed retry using Dead Letter Exchange (DLX) with TTL
    /// When enabled, failed messages are published to a retry queue with TTL
    /// instead of being immediately requeued
    pub enable_delayed_retry: bool,
    /// Name of the retry queue (auto-generated if None)
    pub retry_queue_name: Option<String>,
    /// Name of the dead letter exchange for retry queue
    pub retry_exchange_name: Option<String>,
    /// Maximum retry delay in milliseconds (default: 1 hour = 3_600_000ms)
    /// RabbitMQ TTL has no strict maximum, but practical limits apply
    pub max_retry_delay_ms: u64,
    /// Minimum retry delay in milliseconds (default: 1 second = 1_000ms)
    pub min_retry_delay_ms: u64,
}

impl Default for RabbitMqConsumerConfig {
    fn default() -> Self {
        Self {
            queue_name: "worker_queue".to_string(),
            consumer_tag: "foxtive-worker".to_string(),
            auto_ack: false,
            prefetch_count: 10,
            requeue_on_nack: true,
            enable_delayed_retry: false,
            retry_queue_name: None,
            retry_exchange_name: None,
            max_retry_delay_ms: 3_600_000, // 1 hour
            min_retry_delay_ms: 1_000,     // 1 second
        }
    }
}

/// Internal message envelope for passing messages through the channel
struct MessageEnvelope {
    delivery_tag: u64,
    message: Message<serde_json::Value>,
}

/// RabbitMQ message backend using foxtive's RabbitMQ client.
///
/// This backend uses a **persistent consumer** with a shared message channel
/// for high-throughput message processing. It creates a single background task
/// that forwards messages from RabbitMQ to an internal mpsc channel, eliminating
/// the overhead of creating consumers per receive() call.
///
/// # Architecture
/// ```text
/// RabbitMQ → [Persistent Consumer] → mpsc::channel → receive() calls
/// ```
///
/// # Example
/// ```rust,no_run
/// use foxtive_worker::backends::RabbitMqBackend;
/// use foxtive_worker::backends::rabbitmq::RabbitMqConsumerConfig;
///
/// #[tokio::main]
/// async fn main() {
///     let config = RabbitMqConsumerConfig {
///         queue_name: "my-queue".to_string(),
///         ..Default::default()
///     };
///     
///     let backend = RabbitMqBackend::new("amqp://localhost", config).await.unwrap();
/// }
/// ```
pub struct RabbitMqBackend {
    /// Shared message channel receiver (wrapped in Mutex for concurrent access)
    message_rx: Arc<Mutex<tokio::sync::mpsc::Receiver<MessageEnvelope>>>,
    /// Connection pool for health checks
    pub pool: deadpool_lapin::Pool,
    /// The consume channel - wrapped in Mutex for thread-safe ack/nack operations
    /// All delivery tags are only valid on this specific channel
    consume_channel: Arc<Mutex<lapin::Channel>>,
    /// Config reference
    config: RabbitMqConsumerConfig,
    /// Shutdown signal
    shutdown_notify: Arc<Notify>,
    /// Handle to the background consumer task
    _consumer_handle: tokio::task::JoinHandle<()>,
    /// Retry queue name (if delayed retry is enabled)
    pub retry_queue_name: Option<String>,
    /// Retry exchange name (if delayed retry is enabled)
    pub retry_exchange_name: Option<String>,
    /// Dead Letter Queue name for exhausted retries
    pub dlq_name: Option<String>,
}

impl std::fmt::Debug for RabbitMqBackend {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RabbitMqBackend")
            .field("queue", &self.config.queue_name)
            .field("consumer_tag", &self.config.consumer_tag)
            .finish()
    }
}

impl RabbitMqBackend {
    /// Create a new RabbitMQ backend with a persistent consumer.
    ///
    /// This creates a **single long-lived consumer** that runs in a background task
    /// and forwards messages to an internal channel. Multiple `receive()` calls
    /// will pull from this shared channel, avoiding the overhead of creating
    /// new consumers for each message.
    ///
    /// # Arguments
    /// * `amqp_url` - RabbitMQ connection URL (e.g., "amqp://localhost:5672")
    /// * `config` - Consumer configuration
    ///
    /// # Errors
    /// Returns error if connection or channel setup fails
    pub async fn new(
        amqp_url: impl Into<String>,
        config: RabbitMqConsumerConfig,
    ) -> WorkerResult<Self> {
        // Create connection pool
        let manager =
            deadpool_lapin::Manager::new(amqp_url.into(), lapin::ConnectionProperties::default());

        let pool = deadpool_lapin::Pool::builder(manager)
            .build()
            .map_err(|e| {
                WorkerError::BackendError(format!("Failed to create connection pool: {}", e))
            })?;

        // Get a connection and create consume channel
        let conn = pool
            .get()
            .await
            .map_err(|e| WorkerError::BackendError(format!("Failed to get connection: {}", e)))?;

        let consume_channel = conn
            .create_channel()
            .await
            .map_err(|e| WorkerError::BackendError(format!("Failed to create channel: {}", e)))?;

        // Set QoS prefetch
        consume_channel
            .basic_qos(config.prefetch_count, BasicQosOptions { global: false })
            .await
            .map_err(|e| WorkerError::BackendError(format!("Failed to set QoS: {}", e)))?;

        // Declare queue (idempotent)
        consume_channel
            .queue_declare(
                &config.queue_name,
                QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                lapin::types::FieldTable::default(),
            )
            .await
            .map_err(|e| WorkerError::BackendError(format!("Failed to declare queue: {}", e)))?;

        // Setup retry infrastructure if delayed retry is enabled
        let (retry_queue_name, retry_exchange_name, dlq_name) = if config.enable_delayed_retry {
            Self::setup_retry_infrastructure(&consume_channel, &config).await?
        } else {
            (None, None, None)
        };

        // Start persistent consumer
        // Buffer size increased to 500 for high-volume queues (12K+ messages)
        // This provides better backpressure handling when workers are slower than delivery rate
        let (tx, rx) = tokio::sync::mpsc::channel(500);
        let shutdown_notify = Arc::new(Notify::new());

        let consumer_tag = config.consumer_tag.clone();
        let queue_name = config.queue_name.clone();

        // Create the lapin consumer
        let mut lapin_consumer = consume_channel
            .basic_consume(
                &queue_name,
                &consumer_tag,
                BasicConsumeOptions {
                    no_ack: config.auto_ack,
                    ..Default::default()
                },
                lapin::types::FieldTable::default(),
            )
            .await
            .map_err(|e| WorkerError::BackendError(format!("Failed to start consumer: {}", e)))?;

        // Spawn background task to forward messages
        let notify_clone = shutdown_notify.clone();
        let consumer_handle = tokio::spawn(async move {
            use futures_util::StreamExt;

            loop {
                tokio::select! {
                    _ = notify_clone.notified() => {
                        tracing::debug!("[{}] Consumer shutting down", consumer_tag);
                        break;
                    }
                    delivery = lapin_consumer.next() => {
                        match delivery {
                            Some(Ok(delivery)) => {
                                // Extract delivery tag
                                let delivery_tag = delivery.delivery_tag;

                                // Parse payload - fail on invalid JSON
                                let payload: serde_json::Value = match serde_json::from_slice(&delivery.data) {
                                    Ok(p) => p,
                                    Err(e) => {
                                        tracing::error!(
                                            "Failed to deserialize message payload: {} (message_id: {:?}, data length: {})",
                                            e,
                                            delivery.properties.message_id(),
                                            delivery.data.len()
                                        );
                                        // Nack the malformed message without requeue to prevent poison pill
                                        if let Err(nack_err) = delivery.nack(BasicNackOptions::default()).await {
                                            tracing::error!("Failed to nack malformed message: {:?}", nack_err);
                                        }
                                        continue; // Skip this message
                                    }
                                };

                                // Extract message ID
                                let message_id = delivery.properties.message_id()
                                    .as_ref()
                                    .map(|v| v.to_string())
                                    .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());

                                // Extract routing key from delivery info
                                let mut routing_key = delivery.routing_key.clone();
                                
                                // Track if this is a redelivery and extract attempt count
                                let mut retry_attempt: Option<u32> = None;

                                // Check if we have stored metadata in headers (from retry)
                                if let Some(headers) = delivery.properties.headers() {
                                    // Restore original routing key if present
                                    if let Some(AMQPValue::LongString(original_rk)) = headers.inner().get("x-original-routing-key") {
                                        tracing::debug!(
                                            "Restoring original routing key '{}' from x-original-routing-key header (delivery routing_key was '{}')",
                                            original_rk,
                                            routing_key
                                        );
                                        // Convert LongString to ShortString for routing_key
                                        routing_key = lapin::types::ShortString::from(original_rk.to_string());
                                    }
                                    
                                    // Restore retry attempt count if present
                                    if let Some(AMQPValue::LongInt(attempt_val)) = headers.inner().get("x-retry-attempt") {
                                        retry_attempt = Some(*attempt_val as u32);
                                        tracing::debug!(
                                            "Restored retry attempt {} from x-retry-attempt header",
                                            attempt_val
                                        );
                                    }
                                }

                                // Extract message properties from AMQP BasicProperties
                                let mut properties = MessageProperties {
                                    content_type: delivery.properties.content_type()
                                        .as_ref()
                                        .map(|v| v.to_string()),
                                    content_encoding: delivery.properties.content_encoding()
                                        .as_ref()
                                        .map(|v| v.to_string()),
                                    priority: *delivery.properties.priority(),
                                    expiration: delivery.properties.expiration()
                                        .as_ref()
                                        .and_then(|v| v.to_string().parse::<u64>().ok()),
                                    message_type: None, // Not available in lapin (use headers instead)
                                    user_id: delivery.properties.user_id()
                                        .as_ref()
                                        .map(|v| v.to_string()),
                                    app_id: delivery.properties.app_id()
                                        .as_ref()
                                        .map(|v| v.to_string()),
                                    cluster_id: None, // Not available in lapin
                                    reply_to: delivery.properties.reply_to()
                                        .as_ref()
                                        .map(|v| v.to_string()),
                                    headers: None,
                                };

                                // Extract custom headers from FieldTable
                                if let Some(field_table) = delivery.properties.headers() {
                                    let mut headers_map = std::collections::HashMap::new();
                                    // FieldTable has an inner() method that returns the HashMap
                                    for (key, value) in field_table.inner().iter() {
                                        // Convert AMQP values to strings
                                        let value_str = match value {
                                            AMQPValue::ShortString(s) => Some(s.to_string()),
                                            AMQPValue::LongString(s) => Some(s.to_string()),
                                            AMQPValue::LongInt(i) => Some(i.to_string()),
                                            AMQPValue::Timestamp(t) => Some(t.to_string()),
                                            _ => None, // Skip unsupported types
                                        };
                                        if let Some(v) = value_str {
                                            headers_map.insert(key.to_string(), v);
                                        }
                                    }
                                    if !headers_map.is_empty() {
                                        properties.headers = Some(headers_map);
                                    }
                                };

                                // Build metadata with routing key, properties, and restored attempt count
                                tracing::debug!(
                                    "Creating message metadata with routing_key='{}', queue_name='{}'",
                                    routing_key,
                                    queue_name
                                );
                                let mut metadata = MessageMetadata::new(&queue_name)
                                    .with_routing_key(routing_key)
                                    .with_properties(properties);
                                
                                // Restore attempt count from retry headers if present
                                if let Some(attempt) = retry_attempt {
                                    metadata.attempt = attempt;
                                    tracing::info!(
                                        "Restored attempt count {} for redelivered message {}",
                                        attempt,
                                        message_id
                                    );
                                }

                                // Create worker message
                                tracing::info!(
                                    "Created message {} with metadata.routing_key={:?}",
                                    message_id,
                                    metadata.routing_key
                                );
                                let message = Message {
                                    id: message_id,
                                    payload,
                                    metadata,
                                };

                                // Send to channel (drop if channel closed)
                                let envelope = MessageEnvelope {
                                    delivery_tag,
                                    message,
                                };

                                if tx.send(envelope).await.is_err() {
                                    tracing::debug!("[{}] Receiver dropped, stopping consumer", consumer_tag);
                                    break;
                                }
                            }
                            Some(Err(e)) => {
                                tracing::error!("[{}] Consumer error: {:?}", consumer_tag, e);
                                // Continue on error
                            }
                            None => {
                                tracing::warn!("[{}] Consumer stream ended", consumer_tag);
                                break;
                            }
                        }
                    }
                }
            }
        });

        Ok(Self {
            message_rx: Arc::new(Mutex::new(rx)),
            pool,
            consume_channel: Arc::new(Mutex::new(consume_channel)),
            config,
            shutdown_notify,
            _consumer_handle: consumer_handle,
            retry_queue_name,
            retry_exchange_name,
            dlq_name,
        })
    }

    /// Create a new backend with default configuration.
    pub async fn with_defaults(amqp_url: &str) -> WorkerResult<Self> {
        Self::new(amqp_url, RabbitMqConsumerConfig::default()).await
    }

    /// Setup retry infrastructure: exchange, retry queue with DLX, and DLQ
    async fn setup_retry_infrastructure(
        channel: &lapin::Channel,
        config: &RabbitMqConsumerConfig,
    ) -> WorkerResult<(Option<String>, Option<String>, Option<String>)> {
        // Generate retry queue and exchange names if not provided
        let retry_queue = config
            .retry_queue_name
            .clone()
            .unwrap_or_else(|| format!("{}_retry", config.queue_name));
        let retry_exchange = config
            .retry_exchange_name
            .clone()
            .unwrap_or_else(|| format!("{}_retry_exchange", config.queue_name));

        tracing::info!(
            "Setting up retry infrastructure: queue={}, exchange={}, dlx={}",
            retry_queue,
            retry_exchange,
            config.queue_name
        );

        // Declare the dead letter exchange as Topic to support wildcard routing
        channel
            .exchange_declare(
                &retry_exchange,
                lapin::ExchangeKind::Topic,
                ExchangeDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                FieldTable::default(),
            )
            .await
            .map_err(|e| {
                WorkerError::BackendError(format!("Failed to declare retry exchange: {}", e))
            })?;

        // Declare the retry queue with DLX pointing back to the main queue
        // When TTL expires, dead-letter to default exchange with queue name as routing key
        let mut args = FieldTable::default();
        args.insert(
            "x-dead-letter-exchange".into(),
            AMQPValue::LongString("".into()), // Empty string means default exchange
        );
        // Set routing key to queue name so default exchange routes back to main queue
        args.insert(
            "x-dead-letter-routing-key".into(),
            AMQPValue::LongString(config.queue_name.clone().into()),
        );

        channel
            .queue_declare(
                &retry_queue,
                QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                args,
            )
            .await
            .map_err(|e| {
                WorkerError::BackendError(format!("Failed to declare retry queue: {}", e))
            })?;

        // Bind retry queue to retry exchange
        // Use '#' wildcard to match all routing keys, since messages can have different routing keys
        channel
            .queue_bind(
                &retry_queue,
                &retry_exchange,
                "#", // Match all routing keys
                QueueBindOptions::default(),
                FieldTable::default(),
            )
            .await
            .map_err(|e| {
                WorkerError::BackendError(format!("Failed to bind retry queue: {}", e))
            })?;

        // Create Dead Letter Queue for exhausted retries
        let dlq_name = format!("{}-dlq", config.queue_name);
        channel
            .queue_declare(
                &dlq_name,
                QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                FieldTable::default(),
            )
            .await
            .map_err(|e| {
                WorkerError::BackendError(format!("Failed to declare DLQ: {}", e))
            })?;

        tracing::info!(
            "Retry infrastructure setup complete: retry_queue={}, dlq={}",
            retry_queue,
            dlq_name
        );

        Ok((Some(retry_queue), Some(retry_exchange), Some(dlq_name)))
    }

    /// Publish a message to the retry queue with a delay (TTL)
    ///
    /// This method publishes the message to the retry queue with an x-message-ttl header.
    /// When the TTL expires, RabbitMQ will dead-letter the message back to the main queue.
    ///
    /// The delay will be clamped between min_retry_delay_ms and max_retry_delay_ms from config.
    ///
    /// # Arguments
    /// * `message` - The message to retry
    /// * `delay_ms` - Delay in milliseconds before the message should be redelivered
    ///
    /// Will be clamped to [min_retry_delay_ms, max_retry_delay_ms]
    ///
    /// # Returns
    /// Ok if successfully published, Err if publishing failed
    pub async fn publish_to_retry_queue(
        &self,
        message: &Message<serde_json::Value>,
        delay_ms: u64,
    ) -> WorkerResult<()> {
        tracing::info!(
            "[RabbitMqBackend] publish_to_retry_queue called for message {} (requested delay: {}ms)",
            message.id,
            delay_ms
        );

        let retry_queue = self
            .retry_queue_name
            .as_ref()
            .ok_or_else(|| {
                error!("[RabbitMqBackend] Retry queue not configured for message {}", message.id);
                WorkerError::BackendError(
                    "Retry queue not configured. Enable delayed retry in config.".to_string(),
                )
            })?;

        let retry_exchange = self
            .retry_exchange_name
            .as_ref()
            .ok_or_else(|| {
                error!("[RabbitMqBackend] Retry exchange not configured for message {}", message.id);
                WorkerError::BackendError(
                    "Retry exchange not configured. Enable delayed retry in config.".to_string(),
                )
            })?
            .clone();

        // Clamp delay between configured min and max
        let clamped_delay = delay_ms
            .max(self.config.min_retry_delay_ms)
            .min(self.config.max_retry_delay_ms);

        if clamped_delay != delay_ms {
            tracing::info!(
                "Clamping retry delay from {}ms to {}ms (range: {}-{}ms)",
                delay_ms,
                clamped_delay,
                self.config.min_retry_delay_ms,
                self.config.max_retry_delay_ms
            );
        }

        // Serialize message payload
        let payload = serde_json::to_vec(&message.payload).map_err(|e| {
            WorkerError::SerializationError(e)
        })?;

        // Preserve the original routing key from message metadata, or fall back to queue name
        tracing::info!(
            "[publish_to_retry_queue] Message {} metadata.routing_key = {:?}, metadata.source = '{}'",
            message.id,
            message.metadata.routing_key,
            message.metadata.source
        );
        
        let routing_key = message
            .metadata
            .routing_key
            .as_deref()
            .unwrap_or(&self.config.queue_name);

        tracing::debug!(
            "Using routing key '{}' for retry (original: {:?})",
            routing_key,
            message.metadata.routing_key
        );

        // Store original routing key and incremented attempt count in headers so they survive DLX round-trip
        let mut headers = FieldTable::default();
        if let Some(original_rk) = &message.metadata.routing_key {
            headers.insert(
                "x-original-routing-key".into(),
                AMQPValue::LongString(original_rk.clone().into()),
            );
            tracing::debug!(
                "Stored original routing key '{}' in x-original-routing-key header",
                original_rk
            );
        }
        
        // Increment and store attempt count for retry tracking
        let next_attempt = message.metadata.attempt + 1;
        headers.insert(
            "x-retry-attempt".into(),
            AMQPValue::LongInt(next_attempt as i32),
        );
        tracing::debug!(
            "Stored retry attempt {} in x-retry-attempt header (current: {})",
            next_attempt,
            message.metadata.attempt
        );

        // Set TTL via expiration property with custom headers
        let properties = BasicProperties::default()
            .with_message_id(message.id.clone().into())
            .with_content_type("application/json".into())
            .with_expiration(clamped_delay.to_string().into()) // TTL in milliseconds
            .with_headers(headers);

        // Get a connection from pool for publishing
        let conn = self.pool.get().await.map_err(|e| {
            error!("[RabbitMqBackend] Failed to get connection for retry: {}", e);
            WorkerError::BackendError(format!("Failed to get connection for retry: {}", e))
        })?;

        let channel = conn.create_channel().await.map_err(|e| {
            error!("[RabbitMqBackend] Failed to create channel for retry: {}", e);
            WorkerError::BackendError(format!("Failed to create channel for retry: {}", e))
        })?;

        // Publish to retry exchange with the ORIGINAL routing key (preserved from message metadata)
        channel
            .basic_publish(
                &retry_exchange,
                routing_key, // Use original routing key, not queue name
                BasicPublishOptions::default(),
                &payload,
                properties,
            )
            .await
            .map_err(|e| {
                error!("[RabbitMqBackend] Failed to publish to retry queue: {}", e);
                WorkerError::BackendError(format!("Failed to publish to retry queue: {}", e))
            })?;

        tracing::info!(
            "Published message {} to retry queue '{}' via exchange '{}' with {}ms delay",
            message.id,
            retry_queue,
            retry_exchange,
            clamped_delay
        );

        Ok(())
    }

    /// Publish a failed message to the Dead Letter Queue (DLQ) after retries are exhausted
    ///
    /// This method publishes the message to a dedicated DLQ with failure metadata in headers.
    /// The DLQ serves as permanent storage for messages that have failed all retry attempts,
    /// allowing for manual inspection, debugging, or reprocessing.
    ///
    /// # Arguments
    /// * `message` - The message that has exhausted all retries
    /// * `error_message` - Description of why the message failed
    ///
    /// # Returns
    /// Ok if successfully published to DLQ, Err if publishing failed
    pub async fn publish_to_dlq(
        &self,
        message: &Message<serde_json::Value>,
        error_message: &str,
    ) -> WorkerResult<()> {
        let dlq_name = self
            .dlq_name
            .as_ref()
            .ok_or_else(|| {
                error!("[RabbitMqBackend] DLQ not configured for message {}", message.id);
                WorkerError::BackendError(
                    "DLQ not configured. Enable delayed retry in config.".to_string(),
                )
            })?;

        // Serialize message payload
        let payload = serde_json::to_vec(&message.payload).map_err(|e| {
            WorkerError::SerializationError(e)
        })?;

        // Create headers with failure metadata
        let mut headers = FieldTable::default();
        
        // Store original routing key
        if let Some(original_rk) = &message.metadata.routing_key {
            headers.insert(
                "x-original-routing-key".into(),
                AMQPValue::LongString(original_rk.clone().into()),
            );
        }
        
        // Store failure information
        headers.insert(
            "x-failure-reason".into(),
            AMQPValue::LongString(error_message.into()),
        );
        
        // Store attempt count
        headers.insert(
            "x-final-attempt".into(),
            AMQPValue::LongInt(message.metadata.attempt as i32),
        );
        
        // Store timestamp
        use chrono::Utc;
        headers.insert(
            "x-failed-at".into(),
            AMQPValue::LongString(Utc::now().to_rfc3339().into()),
        );

        let properties = BasicProperties::default()
            .with_message_id(message.id.clone().into())
            .with_content_type("application/json".into())
            .with_headers(headers);

        // Get a connection from pool for publishing
        let conn = self.pool.get().await.map_err(|e| {
            error!("[RabbitMqBackend] Failed to get connection for DLQ: {}", e);
            WorkerError::BackendError(format!("Failed to get connection for DLQ: {}", e))
        })?;

        let channel = conn.create_channel().await.map_err(|e| {
            error!("[RabbitMqBackend] Failed to create channel for DLQ: {}", e);
            WorkerError::BackendError(format!("Failed to create channel for DLQ: {}", e))
        })?;

        // Publish to DLQ using default exchange with DLQ name as routing key
        channel
            .basic_publish(
                "", // Default exchange
                dlq_name, // Routing key = DLQ queue name
                BasicPublishOptions::default(),
                &payload,
                properties,
            )
            .await
            .map_err(|e| {
                error!("[RabbitMqBackend] Failed to publish to DLQ: {}", e);
                WorkerError::BackendError(format!("Failed to publish to DLQ: {}", e))
            })?;

        tracing::info!(
            "Published message {} to DLQ '{}' after {} failed attempts: {}",
            message.id,
            dlq_name,
            message.metadata.attempt,
            error_message
        );

        Ok(())
    }

    /// Get the queue name.
    pub fn queue_name(&self) -> &str {
        &self.config.queue_name
    }

    /// Acknowledge all messages up to and including the given delivery tag.
    ///
    /// This is a batch operation that acknowledges multiple messages in a single call,
    /// significantly reducing mutex contention under high throughput.
    ///
    /// # Arguments
    /// * `delivery_tag` - The highest delivery tag to acknowledge (all lower tags are also acked)
    ///
    /// # Example
    /// ```rust,no_run
    /// # use foxtive_worker::backends::RabbitMqBackend;
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// # let backend = RabbitMqBackend::with_defaults("amqp://localhost").await?;
    /// // Acknowledge all messages up to tag 1000
    /// backend.batch_ack(1000).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn batch_ack(&self, delivery_tag: u64) -> WorkerResult<()> {
        let channel = self.consume_channel.lock().await;

        channel
            .basic_ack(
                delivery_tag,
                BasicAckOptions {
                    multiple: true, // Batch mode - ack all messages up to this tag
                },
            )
            .await
            .map_err(|e| {
                error!(
                    "Failed to batch ack up to delivery tag {}: {}",
                    delivery_tag,
                    e
                );
                WorkerError::BackendError(format!("Failed to batch ack messages: {}", e))
            })?;

        Ok(())
    }

    /// Adjust the prefetch count dynamically based on processing performance.
    ///
    /// This allows tuning the number of unacknowledged messages the broker will deliver,
    /// optimizing for throughput vs. memory usage.
    ///
    /// # Arguments
    /// * `prefetch_count` - New prefetch count (recommended: 10-100 depending on message size)
    ///
    /// # Guidelines
    /// - Increase prefetch when workers process messages quickly (<10ms avg)
    /// - Decrease prefetch when workers are slow (>100ms avg) or messages are large
    /// - Monitor memory usage - higher prefetch = more messages in flight
    ///
    /// # Example
    /// ```rust,no_run
    /// # use foxtive_worker::backends::RabbitMqBackend;
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// # let backend = RabbitMqBackend::with_defaults("amqp://localhost").await?;
    /// // Increase prefetch for fast workers
    /// backend.adjust_prefetch(50).await?;
    ///
    /// // Decrease prefetch for slow/large messages
    /// backend.adjust_prefetch(5).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn adjust_prefetch(&self, prefetch_count: u16) -> WorkerResult<()> {
        let channel = self.consume_channel.lock().await;

        channel
            .basic_qos(prefetch_count, BasicQosOptions { global: false })
            .await
            .map_err(|e| {
                error!("Failed to adjust prefetch to {}: {}", prefetch_count, e);
                WorkerError::BackendError(format!("Failed to adjust prefetch: {}", e))
            })?;

        tracing::info!("Adjusted prefetch count to {}", prefetch_count);
        Ok(())
    }
}

#[async_trait]
impl RetryPublisher for RabbitMqBackend {
    async fn publish_retry(
        &self,
        message: &Message<serde_json::Value>,
        delay_ms: u64,
    ) -> WorkerResult<()> {
        self.publish_to_retry_queue(message, delay_ms).await
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[async_trait]
impl MessageBackend for RabbitMqBackend {
    async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
        // Check if shutdown was requested first
        // Note: We can't directly check Notify state, so we rely on channel closure

        let mut rx = self.message_rx.lock().await;

        match rx.recv().await {
            Some(envelope) => {
                // Create ack handle with delivery tag and retry publisher
                let ack_handle = Arc::new(RabbitMqAckHandle {
                    delivery_tag: envelope.delivery_tag,
                    ack_channel: self.consume_channel.clone(),
                    retry_publisher: if self.retry_queue_name.is_some() {
                        Some(Arc::new(Self {
                            message_rx: self.message_rx.clone(),
                            pool: self.pool.clone(),
                            consume_channel: self.consume_channel.clone(),
                            config: self.config.clone(),
                            shutdown_notify: self.shutdown_notify.clone(),
                            _consumer_handle: tokio::spawn(async {}), // Dummy handle
                            retry_queue_name: self.retry_queue_name.clone(),
                            retry_exchange_name: self.retry_exchange_name.clone(),
                            dlq_name: self.dlq_name.clone(),
                        }))
                    } else {
                        None
                    },
                });

                let message = ReceivedMessage::new(envelope.message, ack_handle);
                Ok(ReceiveResult::Message(Box::new(message)))
            }
            None => {
                // Channel closed - determine why
                // If shutdown_notify was triggered, it's a graceful shutdown
                // Otherwise, it's likely a connection loss or consumer crash

                // For now, we'll assume connection lost since shutdown() doesn't close the channel
                // In a future enhancement, we could track shutdown state more explicitly
                Ok(ReceiveResult::ConnectionLost {
                    reason: "Consumer stream ended unexpectedly".to_string(),
                })
            }
        }
    }

    async fn ack(&self, _message_id: &str) -> WorkerResult<()> {
        // For RabbitMQ, we use the delivery-specific ack handle
        // This method is for batch operations which aren't directly supported
        Err(WorkerError::BackendError(
            "Direct ack by ID not supported for RabbitMQ. Use AckHandle from receive()."
                .to_string(),
        ))
    }

    async fn nack(&self, _message_id: &str, _requeue: bool) -> WorkerResult<()> {
        // For RabbitMQ, we use the delivery-specific nack handle
        Err(WorkerError::BackendError(
            "Direct nack by ID not supported for RabbitMQ. Use AckHandle from receive()."
                .to_string(),
        ))
    }

    async fn health_check(&self) -> WorkerResult<()> {
        // Check if we can get a connection from the pool
        let _ = self.pool.get().await.map_err(|e| {
            WorkerError::BackendError(format!("RabbitMQ health check failed: {}", e))
        })?;

        Ok(())
    }

    async fn shutdown(&self) -> WorkerResult<()> {
        // Signal the background consumer task to stop
        self.shutdown_notify.notify_one();

        // The consumer task will exit when it receives the notification
        // and the JoinHandle will clean up automatically
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // Note: These tests require a running RabbitMQ instance
    // They are marked with #[ignore] to skip in normal test runs

    #[tokio::test]
    #[ignore]
    async fn test_connect_and_health() {
        let backend = RabbitMqBackend::with_defaults("amqp://localhost")
            .await
            .unwrap();
        assert!(backend.health_check().await.is_ok());
    }

    #[tokio::test]
    #[ignore]
    async fn test_receive_timeout() {
        let backend = RabbitMqBackend::with_defaults("amqp://localhost")
            .await
            .unwrap();

        // Should timeout waiting for message on empty queue
        let result =
            tokio::time::timeout(std::time::Duration::from_millis(100), backend.receive()).await;

        // Will timeout (no messages)
        assert!(result.is_err());
    }
}