Skip to main content

celers_broker_redis/
lib.rs

1//! Redis broker implementation for CeleRS
2//!
3//! Provides Kombu-compatible Redis broker with:
4//! - Visibility timeout using Lua scripts
5//! - Priority queue support
6//! - Dead Letter Queue (DLQ)
7//! - Task cancellation via Pub/Sub
8//! - Health checks and monitoring
9//! - Queue pause/resume functionality
10//! - Task deduplication
11//! - Circuit breaker for resilience
12//! - Rate limiting (local and distributed)
13//! - Automatic retry with configurable backoff
14//! - Geo-distribution with multi-region replication
15
16use async_trait::async_trait;
17use celers_core::{Broker, BrokerMessage, CelersError, Result, SerializedTask, TaskId};
18use redis::{AsyncCommands, Client};
19use tracing::{debug, error, info, warn};
20
21#[cfg(feature = "metrics")]
22use celers_metrics::{
23    DLQ_SIZE, PROCESSING_QUEUE_SIZE, QUEUE_SIZE, TASKS_ENQUEUED_BY_TYPE, TASKS_ENQUEUED_TOTAL,
24};
25
26pub mod advanced_queue;
27pub mod authorization;
28pub mod backup_restore;
29pub mod batch_ext;
30pub mod bulkhead;
31pub mod circuit_breaker;
32pub mod cluster;
33pub mod compression;
34pub mod connection;
35pub mod cron_scheduler;
36pub mod dedup;
37pub mod degradation;
38pub mod dlq_analytics;
39pub mod dlq_archival;
40pub mod dlq_replay;
41pub mod encryption;
42pub mod geo;
43pub mod health;
44pub mod hooks;
45pub mod hooks_advanced;
46pub mod integrity;
47pub mod locks;
48pub mod lua_scripts;
49pub mod metrics_ext;
50pub mod monitoring;
51pub mod otel_integration;
52pub mod partitioning;
53pub mod pipeline_advanced;
54pub mod pool;
55pub mod pool_advanced;
56pub mod priority_mgmt;
57pub mod queue_control;
58pub mod quota_mgmt;
59pub mod rate_limit;
60pub mod result_backend;
61pub mod retry;
62pub mod sentinel;
63pub mod streams;
64pub mod structured_logging;
65pub mod task_groups;
66pub mod task_query;
67pub mod telemetry;
68pub mod utilities;
69pub mod visibility;
70
71pub use advanced_queue::{
72    AdvancedQueueManager, PriorityAgingConfig, PriorityAgingConfigBuilder, QueueWeight,
73    StarvationPrevention, StarvationStats, TaskAge, WeightedQueueSelector,
74};
75pub use authorization::{AuthorizationPolicy, UserPermissions};
76pub use backup_restore::{BackupManager, QueueSnapshot, SnapshotComparison};
77pub use batch_ext::{BatchOperations, TaskFilter};
78pub use bulkhead::{Bulkhead, BulkheadConfig, BulkheadManager, BulkheadPermit, BulkheadStats};
79pub use circuit_breaker::{
80    CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
81};
82pub use cluster::{
83    ClusterConfig, ClusterConfigBuilder, ClusterNode, ClusterNodeRole, ClusterTopology, HashSlot,
84    RedisMode,
85};
86#[allow(deprecated)]
87pub use compression::{CompressionAlgorithm, CompressionConfig, CompressionStats, Compressor};
88pub use connection::{ConnectionStats, RedisConfig, TlsConfig};
89pub use cron_scheduler::{CronExpression, CronScheduler, ScheduledTask};
90pub use dedup::{DedupResult, DedupStrategy, Deduplicator};
91pub use degradation::{DegradationManager, DegradationMode, DegradationStats, QueuedOperation};
92pub use dlq_analytics::{
93    DLQAnalyzer, ErrorCategory, ErrorCluster, FailurePattern, FailureTrend, RootCause,
94    TemporalAnalysis,
95};
96pub use dlq_archival::{
97    ArchivalConfig, ArchiveSearchCriteria, ArchiveStats, ArchivedTask, DLQArchivalManager,
98    RetentionPolicy, StorageBackend,
99};
100pub use dlq_replay::{
101    ReplayCondition, ReplayPolicy, ReplayPolicyType, ReplayResult, ReplayScheduler, ReplayStats,
102};
103pub use encryption::{
104    EncryptedData, EncryptionAlgorithm, EncryptionConfig, EncryptionManager, EncryptionStats,
105};
106pub use geo::{
107    ConflictResolution, GeoReplicationManager, Region, RegionId, RegionStats, RegionStatsSnapshot,
108    RegionalReadRouter, ReplicationConfig, ReplicationConfigBuilder, RoutingStrategy, SyncMode,
109};
110pub use health::{HealthChecker, KeyspaceStats, QueueStats, RedisHealthStatus, ReplicationInfo};
111pub use hooks::{
112    CompletionHook, CompletionStatus, DequeueHook, EnqueueHook, HookContext, HookResult, LogLevel,
113    LoggingHook, MetricsHook, PayloadSizeValidator, TaskHookRegistry, TimestampEnrichmentHook,
114};
115pub use hooks_advanced::{
116    ConditionalHook, HookCondition, HookErrorStrategy, ParallelHookExecutor, PrioritizedHook,
117    RetryableHook, SequentialHooks,
118};
119pub use integrity::{ChecksumAlgorithm, IntegrityStats, IntegrityValidator, IntegrityWrappedTask};
120pub use locks::{DistributedLock, LockConfig, LockGuard, LockToken};
121pub use lua_scripts::{ScriptId, ScriptManager, ScriptPerformance, ScriptStats, SCRIPT_VERSION};
122pub use metrics_ext::{
123    HistogramSnapshot, LatencyStats, MetricsSnapshot, MetricsTracker, SlowOperation,
124    SlowOperationLogger, TaskAgeHistogram,
125};
126pub use monitoring::{
127    analyze_performance_trend, analyze_redis_broker_performance, analyze_redis_consumer_lag,
128    analyze_redis_dlq_health, analyze_redis_fragmentation, analyze_redis_memory_efficiency,
129    analyze_redis_slowlog, analyze_task_completion_patterns,
130    calculate_redis_message_age_distribution, calculate_redis_message_velocity,
131    calculate_redis_queue_health_score, detect_performance_regression, detect_queue_burst,
132    detect_redis_queue_anomaly, detect_redis_queue_saturation, estimate_redis_monthly_cost,
133    estimate_redis_processing_capacity, generate_queue_health_report, predict_redis_queue_size,
134    recommend_alert_thresholds, recommend_redis_scaling_strategy, suggest_redis_worker_scaling,
135    AlertThresholds, AnomalyDetection, BurstDetection, ConsumerLagAnalysis, DLQAnalysis,
136    FragmentationAnalysis, MemoryEfficiencyAnalysis, MessageAgeDistribution, MessageVelocity,
137    PerformanceTrend, ProcessingCapacity, QueueHealthReport, QueueSaturationAnalysis,
138    QueueSizePrediction, QueueTrend, RedisCostEstimate, RegressionDetection, SaturationLevel,
139    ScalingRecommendation, ScalingStrategy, SlowlogAnalysis, TaskCompletionAnalysis,
140    WorkerScalingSuggestion,
141};
142pub use otel_integration::{
143    OtelBrokerInstrumentation, OtelConfig, SpanInfo, TracingBackend, W3CTraceContext,
144};
145pub use partitioning::{
146    ConsistentHashRing, HashAlgorithm, PartitionManager, PartitionStats, PartitionStrategy,
147};
148pub use pipeline_advanced::{
149    AdvancedPipeline, PipelineConfig, PipelineExecutionResult, PipelineOperation, PipelineStats,
150};
151pub use pool::{ConnectionPool, PoolConfig, PoolStats};
152pub use pool_advanced::{AdaptiveConnectionPool, AdaptivePoolConfig, AdaptivePoolStats};
153pub use priority_mgmt::{PriorityAdjustment, PriorityManager};
154pub use queue_control::{QueueController, QueueState};
155pub use quota_mgmt::{QuotaConfig, QuotaManager, QuotaPeriod, QuotaUsage};
156pub use rate_limit::{
157    DistributedRateLimiter, QueueRateLimitConfig, QueueRateLimiter, TokenBucketLimiter,
158    TrackedRateLimiter,
159};
160pub use result_backend::{ResultBackend, ResultBackendConfig, TaskResult, TaskStatus};
161pub use retry::{BackoffStrategy, RetryConfig, RetryExecutor, RetryResult};
162pub use sentinel::{
163    MasterAddress, SentinelClient, SentinelConfig, SentinelConfigBuilder, SentinelRole,
164};
165pub use streams::{
166    StreamConfig, StreamConfigBuilder, StreamEntry, StreamMessageId, StreamStats, StreamsClient,
167};
168pub use structured_logging::{
169    CorrelationAnalysis, LogConfig, LogContext, LogEntry, PerformanceAnalysis, StructuredLogLevel,
170    StructuredLogger,
171};
172pub use task_groups::{GroupConfig, GroupMetadata, GroupStatus, TaskGroup};
173pub use task_query::{TaskQuery, TaskSearchCriteria, TaskStats};
174pub use telemetry::{SpanBuilder, SpanEvent, TracingContext};
175pub use utilities::{
176    analyze_redis_command_performance, analyze_redis_queue_balance,
177    calculate_optimal_redis_batch_size, calculate_optimal_redis_pool_size,
178    calculate_redis_capacity_headroom, calculate_redis_key_ttl_by_priority,
179    calculate_redis_migration_batch_size, calculate_redis_optimal_shard_count,
180    calculate_redis_pipeline_size, calculate_redis_queue_efficiency,
181    calculate_redis_sla_compliance, calculate_redis_timeout_values, calculate_worker_distribution,
182    estimate_redis_migration_time, estimate_redis_queue_drain_time, estimate_redis_queue_memory,
183    estimate_redis_scaling_time, optimize_task_priority, recommend_queue_rebalancing,
184    suggest_redis_data_retention, suggest_redis_persistence_strategy,
185    suggest_redis_pipeline_strategy, CapacityHeadroom, MigrationStrategy, PriorityOptimization,
186    QueueEfficiency, RebalancingRecommendation, SLACompliance, ScalingTimeEstimate,
187    WorkerDistribution,
188};
189use visibility::VisibilityManager;
190
191/// Queue mode for Redis broker
192#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
193pub enum QueueMode {
194    /// Standard FIFO queue using Redis lists
195    Fifo,
196    /// Priority queue using Redis sorted sets (higher priority = processed first)
197    Priority,
198}
199
200impl QueueMode {
201    /// Check if this is FIFO mode
202    pub fn is_fifo(&self) -> bool {
203        matches!(self, QueueMode::Fifo)
204    }
205
206    /// Check if this is Priority mode
207    pub fn is_priority(&self) -> bool {
208        matches!(self, QueueMode::Priority)
209    }
210}
211
212impl std::fmt::Display for QueueMode {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        match self {
215            QueueMode::Fifo => write!(f, "FIFO"),
216            QueueMode::Priority => write!(f, "Priority"),
217        }
218    }
219}
220
221/// Redis-based broker implementation
222pub struct RedisBroker {
223    client: Client,
224    queue_name: String,
225    processing_queue: String,
226    dlq_name: String,
227    delayed_queue: String,
228    cancel_channel: String,
229    mode: QueueMode,
230    #[allow(dead_code)]
231    visibility_manager: VisibilityManager,
232    visibility_timeout_secs: u64,
233}
234
235impl RedisBroker {
236    /// Create a new Redis broker with FIFO mode
237    pub fn new(redis_url: &str, queue_name: &str) -> Result<Self> {
238        Self::with_mode(redis_url, queue_name, QueueMode::Fifo)
239    }
240
241    /// Create a new Redis broker with specified queue mode
242    pub fn with_mode(redis_url: &str, queue_name: &str, mode: QueueMode) -> Result<Self> {
243        let client = Client::open(redis_url)
244            .map_err(|e| CelersError::Broker(format!("Failed to connect to Redis: {}", e)))?;
245
246        Ok(Self {
247            client,
248            queue_name: queue_name.to_string(),
249            processing_queue: format!("{}:processing", queue_name),
250            dlq_name: format!("{}:dlq", queue_name),
251            delayed_queue: format!("{}:delayed", queue_name),
252            cancel_channel: format!("{}:cancel", queue_name),
253            mode,
254            visibility_manager: VisibilityManager::new(),
255            visibility_timeout_secs: 300, // 5 minutes default
256        })
257    }
258
259    /// Create a new Redis broker from a RedisConfig
260    pub fn from_config(config: &RedisConfig, queue_name: &str, mode: QueueMode) -> Result<Self> {
261        let client = config.build_client()?;
262
263        debug!("Created Redis broker with config: {}", config.describe());
264
265        Ok(Self {
266            client,
267            queue_name: queue_name.to_string(),
268            processing_queue: format!("{}:processing", queue_name),
269            dlq_name: format!("{}:dlq", queue_name),
270            delayed_queue: format!("{}:delayed", queue_name),
271            cancel_channel: format!("{}:cancel", queue_name),
272            mode,
273            visibility_manager: VisibilityManager::new(),
274            visibility_timeout_secs: 300, // 5 minutes default
275        })
276    }
277
278    /// Create a new Redis broker from a SentinelClient (for high availability)
279    pub async fn from_sentinel(
280        sentinel: &SentinelClient,
281        queue_name: &str,
282        mode: QueueMode,
283    ) -> Result<Self> {
284        let client = sentinel.get_client().await?;
285
286        info!("Created Redis broker with Sentinel support");
287
288        Ok(Self {
289            client,
290            queue_name: queue_name.to_string(),
291            processing_queue: format!("{}:processing", queue_name),
292            dlq_name: format!("{}:dlq", queue_name),
293            delayed_queue: format!("{}:delayed", queue_name),
294            cancel_channel: format!("{}:cancel", queue_name),
295            mode,
296            visibility_manager: VisibilityManager::new(),
297            visibility_timeout_secs: 300, // 5 minutes default
298        })
299    }
300
301    /// Set visibility timeout (default: 300 seconds)
302    pub fn with_visibility_timeout(mut self, timeout_secs: u64) -> Self {
303        self.visibility_timeout_secs = timeout_secs;
304        self
305    }
306
307    /// Get the queue mode
308    pub fn mode(&self) -> QueueMode {
309        self.mode
310    }
311
312    /// Get the number of tasks in the Dead Letter Queue
313    pub async fn dlq_size(&self) -> Result<usize> {
314        let mut conn = self
315            .client
316            .get_multiplexed_async_connection()
317            .await
318            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
319
320        let size: usize = conn
321            .llen(&self.dlq_name)
322            .await
323            .map_err(|e| CelersError::Broker(format!("Failed to get DLQ size: {}", e)))?;
324
325        Ok(size)
326    }
327
328    /// Inspect tasks in the Dead Letter Queue
329    pub async fn inspect_dlq(&self, limit: isize) -> Result<Vec<SerializedTask>> {
330        let mut conn = self
331            .client
332            .get_multiplexed_async_connection()
333            .await
334            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
335
336        let items: Vec<String> = conn
337            .lrange(&self.dlq_name, 0, limit - 1)
338            .await
339            .map_err(|e| CelersError::Broker(format!("Failed to inspect DLQ: {}", e)))?;
340
341        let mut tasks = Vec::new();
342        for item in items {
343            let task: SerializedTask = serde_json::from_str(&item)
344                .map_err(|e| CelersError::Deserialization(e.to_string()))?;
345            tasks.push(task);
346        }
347
348        Ok(tasks)
349    }
350
351    /// Replay a task from the Dead Letter Queue back to the main queue
352    pub async fn replay_from_dlq(&self, task_id: &TaskId) -> Result<bool> {
353        let mut conn = self
354            .client
355            .get_multiplexed_async_connection()
356            .await
357            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
358
359        // Get all DLQ items
360        let items: Vec<String> = conn
361            .lrange(&self.dlq_name, 0, -1)
362            .await
363            .map_err(|e| CelersError::Broker(format!("Failed to get DLQ items: {}", e)))?;
364
365        // Find the task by ID
366        for item in items {
367            let task: SerializedTask = serde_json::from_str(&item)
368                .map_err(|e| CelersError::Deserialization(e.to_string()))?;
369
370            if &task.metadata.id == task_id {
371                // Remove from DLQ
372                conn.lrem::<_, _, ()>(&self.dlq_name, 1, &item)
373                    .await
374                    .map_err(|e| {
375                        CelersError::Broker(format!("Failed to remove from DLQ: {}", e))
376                    })?;
377
378                // Reset retry count in task metadata
379                let mut replayed_task = task;
380                replayed_task.metadata.state = celers_core::TaskState::Pending;
381
382                let serialized = serde_json::to_string(&replayed_task)
383                    .map_err(|e| CelersError::Serialization(e.to_string()))?;
384
385                // Re-enqueue based on mode
386                match self.mode {
387                    QueueMode::Fifo => {
388                        conn.rpush::<_, _, ()>(&self.queue_name, &serialized)
389                            .await
390                            .map_err(|e| {
391                                CelersError::Broker(format!("Failed to replay task: {}", e))
392                            })?;
393                    }
394                    QueueMode::Priority => {
395                        let score = -replayed_task.metadata.priority as f64;
396                        conn.zadd::<_, _, _, ()>(&self.queue_name, &serialized, score)
397                            .await
398                            .map_err(|e| {
399                                CelersError::Broker(format!("Failed to replay task: {}", e))
400                            })?;
401                    }
402                }
403
404                info!("Replayed task {} from DLQ", task_id);
405                return Ok(true);
406            }
407        }
408
409        Ok(false)
410    }
411
412    /// Clear all tasks from the Dead Letter Queue
413    pub async fn clear_dlq(&self) -> Result<usize> {
414        let mut conn = self
415            .client
416            .get_multiplexed_async_connection()
417            .await
418            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
419
420        let count: usize = conn
421            .llen(&self.dlq_name)
422            .await
423            .map_err(|e| CelersError::Broker(format!("Failed to get DLQ size: {}", e)))?;
424
425        conn.del::<_, ()>(&self.dlq_name)
426            .await
427            .map_err(|e| CelersError::Broker(format!("Failed to clear DLQ: {}", e)))?;
428
429        info!("Cleared {} tasks from DLQ", count);
430        Ok(count)
431    }
432
433    /// Get the cancellation channel name (for workers to subscribe)
434    pub fn cancel_channel(&self) -> &str {
435        &self.cancel_channel
436    }
437
438    /// Create a PubSub connection for listening to cancellation messages
439    pub async fn create_pubsub(&self) -> Result<redis::aio::PubSub> {
440        let pubsub = self.client.get_async_pubsub().await.map_err(|e| {
441            CelersError::Broker(format!("Failed to create PubSub connection: {}", e))
442        })?;
443        Ok(pubsub)
444    }
445
446    /// Create a health checker for monitoring Redis status
447    pub fn health_checker(&self) -> HealthChecker {
448        HealthChecker::new(self.client.clone())
449    }
450
451    /// Create a queue controller for pause/resume operations
452    pub fn queue_controller(&self) -> QueueController {
453        QueueController::new(self.client.clone(), &self.queue_name)
454    }
455
456    /// Create a task deduplicator
457    pub fn deduplicator(&self) -> Deduplicator {
458        Deduplicator::new(self.client.clone(), &self.queue_name)
459    }
460
461    /// Create a script manager for Lua script optimization
462    pub fn script_manager(&self) -> ScriptManager {
463        ScriptManager::new(self.client.clone())
464    }
465
466    /// Create a partition manager for distributed queues
467    pub fn partition_manager(
468        &self,
469        num_partitions: usize,
470        strategy: PartitionStrategy,
471    ) -> PartitionManager {
472        PartitionManager::new(num_partitions, strategy, &self.queue_name)
473    }
474
475    /// Create a batch operations handler for advanced batch processing
476    pub fn batch_operations(&self) -> BatchOperations {
477        BatchOperations::new(self.client.clone(), self.queue_name.clone(), self.mode)
478    }
479
480    /// Create a priority manager for dynamic priority adjustments
481    pub fn priority_manager(&self) -> PriorityManager {
482        PriorityManager::new(self.client.clone(), self.queue_name.clone(), self.mode)
483    }
484
485    /// Create a task query interface for inspecting tasks
486    pub fn task_query(&self) -> TaskQuery {
487        TaskQuery::new(
488            self.client.clone(),
489            self.queue_name.clone(),
490            self.processing_queue.clone(),
491            self.dlq_name.clone(),
492            self.mode,
493        )
494    }
495
496    /// Create a backup manager for queue backup and restore
497    pub fn backup_manager(&self) -> BackupManager {
498        BackupManager::new(
499            self.client.clone(),
500            self.queue_name.clone(),
501            self.processing_queue.clone(),
502            self.dlq_name.clone(),
503            self.delayed_queue.clone(),
504            self.mode,
505        )
506    }
507
508    /// Set TTL (time-to-live) for tasks in all queues
509    ///
510    /// This helps prevent unbounded queue growth by automatically expiring old tasks.
511    /// TTL is set in seconds.
512    pub async fn set_queue_ttl(&self, ttl_secs: u64) -> Result<()> {
513        let mut conn = self
514            .client
515            .get_multiplexed_async_connection()
516            .await
517            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
518
519        // Set TTL on main queue
520        let _: () = redis::cmd("EXPIRE")
521            .arg(&self.queue_name)
522            .arg(ttl_secs)
523            .query_async(&mut conn)
524            .await
525            .map_err(|e| CelersError::Broker(format!("Failed to set queue TTL: {}", e)))?;
526
527        // Set TTL on processing queue
528        let _: () = redis::cmd("EXPIRE")
529            .arg(&self.processing_queue)
530            .arg(ttl_secs)
531            .query_async(&mut conn)
532            .await
533            .map_err(|e| {
534                CelersError::Broker(format!("Failed to set processing queue TTL: {}", e))
535            })?;
536
537        // Set TTL on delayed queue
538        let _: () = redis::cmd("EXPIRE")
539            .arg(&self.delayed_queue)
540            .arg(ttl_secs)
541            .query_async(&mut conn)
542            .await
543            .map_err(|e| CelersError::Broker(format!("Failed to set delayed queue TTL: {}", e)))?;
544
545        debug!("Set TTL of {} seconds on all queues", ttl_secs);
546
547        Ok(())
548    }
549
550    /// Clean up old tasks from DLQ (older than specified age in seconds)
551    pub async fn cleanup_dlq(&self, _max_age_secs: u64) -> Result<usize> {
552        let mut conn = self
553            .client
554            .get_multiplexed_async_connection()
555            .await
556            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
557
558        // Get all DLQ items
559        let items: Vec<String> = conn
560            .lrange(&self.dlq_name, 0, -1)
561            .await
562            .map_err(|e| CelersError::Broker(format!("Failed to get DLQ items: {}", e)))?;
563
564        let mut removed_count = 0;
565
566        for item in items {
567            // Try to parse task to validate it's a proper task
568            if serde_json::from_str::<SerializedTask>(&item).is_ok() {
569                // This is a simplified version - in real use, you'd track DLQ entry time
570                // and only remove tasks older than max_age_secs
571                conn.lrem::<_, _, ()>(&self.dlq_name, 1, &item)
572                    .await
573                    .map_err(|e| {
574                        CelersError::Broker(format!("Failed to remove from DLQ: {}", e))
575                    })?;
576                removed_count += 1;
577            }
578        }
579
580        if removed_count > 0 {
581            info!("Cleaned up {} old tasks from DLQ", removed_count);
582        }
583
584        Ok(removed_count)
585    }
586
587    /// Get queue statistics (pending, processing, DLQ, delayed counts)
588    pub async fn get_queue_stats(&self) -> Result<QueueStats> {
589        self.health_checker()
590            .get_queue_stats(&self.queue_name, self.mode.is_priority())
591            .await
592    }
593
594    /// Check Redis health status
595    pub async fn check_health(&self) -> RedisHealthStatus {
596        self.health_checker().check_health().await
597    }
598
599    /// Ping Redis and return latency in milliseconds
600    pub async fn ping(&self) -> Result<u64> {
601        self.health_checker().ping().await
602    }
603
604    /// Get the visibility timeout in seconds
605    pub fn visibility_timeout(&self) -> u64 {
606        self.visibility_timeout_secs
607    }
608
609    /// Get the Redis client (for advanced operations)
610    pub fn client(&self) -> &Client {
611        &self.client
612    }
613
614    /// Get the delayed queue name
615    pub fn delayed_queue_name(&self) -> &str {
616        &self.delayed_queue
617    }
618
619    /// Get the processing queue name
620    pub fn processing_queue_name(&self) -> &str {
621        &self.processing_queue
622    }
623
624    /// Get the DLQ name
625    pub fn dlq_name(&self) -> &str {
626        &self.dlq_name
627    }
628
629    /// Get the main queue name
630    pub fn queue_name(&self) -> &str {
631        &self.queue_name
632    }
633
634    /// Get all queue names managed by this broker
635    pub fn queue_names(&self) -> Vec<String> {
636        vec![
637            self.queue_name.clone(),
638            self.processing_queue.clone(),
639            self.dlq_name.clone(),
640            self.delayed_queue.clone(),
641        ]
642    }
643
644    /// Check if a specific queue exists and has tasks
645    pub async fn queue_exists(&self, queue_name: &str) -> Result<bool> {
646        let mut conn = self
647            .client
648            .get_multiplexed_async_connection()
649            .await
650            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
651
652        let exists: bool = redis::cmd("EXISTS")
653            .arg(queue_name)
654            .query_async(&mut conn)
655            .await
656            .map_err(|e| CelersError::Broker(format!("Failed to check queue existence: {}", e)))?;
657
658        Ok(exists)
659    }
660
661    /// Get the size of a specific queue by name
662    pub async fn get_queue_size_by_name(&self, queue_name: &str) -> Result<usize> {
663        let mut conn = self
664            .client
665            .get_multiplexed_async_connection()
666            .await
667            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
668
669        // Try both list and sorted set
670        let list_size: usize = conn.llen(queue_name).await.unwrap_or(0);
671        if list_size > 0 {
672            return Ok(list_size);
673        }
674
675        let zset_size: usize = conn.zcard(queue_name).await.unwrap_or(0);
676        Ok(zset_size)
677    }
678
679    /// Delete a specific queue (use with caution!)
680    pub async fn delete_queue(&self, queue_name: &str) -> Result<()> {
681        let mut conn = self
682            .client
683            .get_multiplexed_async_connection()
684            .await
685            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
686
687        conn.del::<_, ()>(queue_name)
688            .await
689            .map_err(|e| CelersError::Broker(format!("Failed to delete queue: {}", e)))?;
690
691        warn!("Deleted queue: {}", queue_name);
692        Ok(())
693    }
694
695    /// Purge all queues (main, processing, DLQ, delayed)
696    pub async fn purge_all_queues(&self) -> Result<usize> {
697        let mut total_removed = 0;
698
699        for queue_name in self.queue_names() {
700            if let Ok(size) = self.get_queue_size_by_name(&queue_name).await {
701                total_removed += size;
702                self.delete_queue(&queue_name).await?;
703            }
704        }
705
706        warn!("Purged all queues, removed {} tasks total", total_removed);
707        Ok(total_removed)
708    }
709
710    /// Get a summary of all queue sizes
711    pub async fn get_all_queue_sizes(&self) -> Result<std::collections::HashMap<String, usize>> {
712        let mut sizes = std::collections::HashMap::new();
713
714        for queue_name in self.queue_names() {
715            if let Ok(size) = self.get_queue_size_by_name(&queue_name).await {
716                sizes.insert(queue_name, size);
717            }
718        }
719
720        Ok(sizes)
721    }
722
723    /// Move all tasks from processing queue back to main queue
724    ///
725    /// Useful for recovering tasks that were being processed when a worker crashed.
726    /// Returns the number of tasks moved.
727    pub async fn recover_processing_tasks(&self) -> Result<usize> {
728        let mut conn = self
729            .client
730            .get_multiplexed_async_connection()
731            .await
732            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
733
734        let items: Vec<String> = conn
735            .lrange(&self.processing_queue, 0, -1)
736            .await
737            .map_err(|e| {
738                CelersError::Broker(format!("Failed to get processing queue items: {}", e))
739            })?;
740
741        if items.is_empty() {
742            return Ok(0);
743        }
744
745        let count = items.len();
746        let mut pipe = redis::pipe();
747
748        for item in &items {
749            // Parse to get task
750            if let Ok(task) = serde_json::from_str::<SerializedTask>(item) {
751                // Add back to main queue based on mode
752                match self.mode {
753                    QueueMode::Fifo => {
754                        pipe.rpush(&self.queue_name, item);
755                    }
756                    QueueMode::Priority => {
757                        let score = -(task.metadata.priority as f64);
758                        pipe.zadd(&self.queue_name, item, score);
759                    }
760                }
761                // Remove from processing queue
762                pipe.lrem(&self.processing_queue, 1, item);
763            }
764        }
765
766        pipe.query_async::<redis::Value>(&mut conn)
767            .await
768            .map_err(|e| {
769                CelersError::Broker(format!("Failed to recover processing tasks: {}", e))
770            })?;
771
772        info!("Recovered {} tasks from processing queue", count);
773        Ok(count)
774    }
775
776    /// Get the total number of tasks across all queues (main, processing, DLQ, delayed)
777    pub async fn total_task_count(&self) -> Result<usize> {
778        let sizes = self.get_all_queue_sizes().await?;
779        Ok(sizes.values().sum())
780    }
781
782    /// Check if the broker is idle (all queues empty)
783    pub async fn is_idle(&self) -> Result<bool> {
784        let total = self.total_task_count().await?;
785        Ok(total == 0)
786    }
787
788    /// Estimate memory usage of tasks in all queues (in bytes)
789    ///
790    /// This is an approximation based on serialized task sizes.
791    pub async fn estimate_memory_usage(&self) -> Result<usize> {
792        let mut conn = self
793            .client
794            .get_multiplexed_async_connection()
795            .await
796            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
797
798        let mut total_bytes = 0usize;
799
800        // Check each queue
801        for queue_name in self.queue_names() {
802            let memory: usize = redis::cmd("MEMORY")
803                .arg("USAGE")
804                .arg(&queue_name)
805                .query_async(&mut conn)
806                .await
807                .unwrap_or(0);
808            total_bytes += memory;
809        }
810
811        Ok(total_bytes)
812    }
813
814    /// Move tasks from DLQ back to main queue in bulk
815    ///
816    /// Returns the number of tasks moved.
817    pub async fn bulk_replay_from_dlq(&self, max_count: Option<usize>) -> Result<usize> {
818        let limit = max_count.unwrap_or(isize::MAX as usize) as isize;
819        let tasks = self.inspect_dlq(limit).await?;
820        let count = tasks.len();
821
822        for task in tasks {
823            self.replay_from_dlq(&task.metadata.id).await?;
824        }
825
826        Ok(count)
827    }
828
829    /// Peek at the next task without dequeueing it
830    ///
831    /// Useful for inspecting what will be processed next without removing it from the queue.
832    pub async fn peek_next(&self) -> Result<Option<SerializedTask>> {
833        let mut conn = self
834            .client
835            .get_multiplexed_async_connection()
836            .await
837            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
838
839        let data: Option<String> = match self.mode {
840            QueueMode::Fifo => {
841                // Get last item (will be next to dequeue)
842                conn.lindex(&self.queue_name, -1)
843                    .await
844                    .map_err(|e| CelersError::Broker(format!("Failed to peek: {}", e)))?
845            }
846            QueueMode::Priority => {
847                // Get first item in sorted set (lowest score = highest priority)
848                let items: Vec<String> = conn
849                    .zrange(&self.queue_name, 0, 0)
850                    .await
851                    .map_err(|e| CelersError::Broker(format!("Failed to peek: {}", e)))?;
852                items.into_iter().next()
853            }
854        };
855
856        if let Some(serialized) = data {
857            let task: SerializedTask = serde_json::from_str(&serialized)
858                .map_err(|e| CelersError::Deserialization(e.to_string()))?;
859            Ok(Some(task))
860        } else {
861            Ok(None)
862        }
863    }
864
865    /// Get queue depth percentage (current size / max recommended size)
866    ///
867    /// Returns a value between 0.0 and 1.0+ where > 0.8 suggests the queue is getting full.
868    /// max_recommended_size defaults to 10000 if not specified.
869    pub async fn queue_depth_percentage(&self, max_recommended_size: Option<usize>) -> Result<f64> {
870        let max_size = max_recommended_size.unwrap_or(10000) as f64;
871        let current_size = self.queue_size().await? as f64;
872        Ok(current_size / max_size)
873    }
874
875    /// Check if the queue is approaching capacity (> 80% of recommended max)
876    pub async fn is_near_capacity(&self, max_recommended_size: Option<usize>) -> Result<bool> {
877        let percentage = self.queue_depth_percentage(max_recommended_size).await?;
878        Ok(percentage > 0.8)
879    }
880
881    /// Move ready delayed tasks to the main queue
882    ///
883    /// Checks the delayed queue for tasks ready to execute (timestamp <= now)
884    /// and moves them to the main queue atomically.
885    async fn process_delayed_tasks(&self) -> Result<usize> {
886        let mut conn = self
887            .client
888            .get_multiplexed_async_connection()
889            .await
890            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
891
892        let now = std::time::SystemTime::now()
893            .duration_since(std::time::UNIX_EPOCH)
894            .map_err(|e| CelersError::Other(format!("Failed to get current time: {}", e)))?
895            .as_secs() as f64;
896
897        // Get all tasks ready for execution (score <= now)
898        let ready_tasks: Vec<String> =
899            conn.zrangebyscore(&self.delayed_queue, "-inf", now)
900                .await
901                .map_err(|e| CelersError::Broker(format!("Failed to get ready tasks: {}", e)))?;
902
903        if ready_tasks.is_empty() {
904            return Ok(0);
905        }
906
907        let count = ready_tasks.len();
908
909        // Move tasks to main queue based on queue mode
910        let mut pipe = redis::pipe();
911
912        for task_data in &ready_tasks {
913            // Deserialize to get priority for sorted queue
914            if let Ok(task) = serde_json::from_str::<SerializedTask>(task_data) {
915                match self.mode {
916                    QueueMode::Fifo => {
917                        pipe.rpush(&self.queue_name, task_data);
918                    }
919                    QueueMode::Priority => {
920                        let score = -(task.metadata.priority as f64);
921                        pipe.zadd(&self.queue_name, task_data, score);
922                    }
923                }
924            }
925
926            // Remove from delayed queue
927            pipe.zrem(&self.delayed_queue, task_data);
928        }
929
930        // Execute all operations atomically
931        pipe.query_async::<redis::Value>(&mut conn)
932            .await
933            .map_err(|e| CelersError::Broker(format!("Failed to move delayed tasks: {}", e)))?;
934
935        debug!("Moved {} delayed tasks to main queue", count);
936
937        Ok(count)
938    }
939}
940
941#[async_trait]
942impl Broker for RedisBroker {
943    async fn enqueue(&self, task: SerializedTask) -> Result<TaskId> {
944        let mut conn = self
945            .client
946            .get_multiplexed_async_connection()
947            .await
948            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
949
950        let task_id = task.metadata.id;
951        let priority = task.metadata.priority;
952        let serialized =
953            serde_json::to_string(&task).map_err(|e| CelersError::Serialization(e.to_string()))?;
954
955        match self.mode {
956            QueueMode::Fifo => {
957                // Use list for FIFO (priority ignored)
958                conn.rpush::<_, _, ()>(&self.queue_name, &serialized)
959                    .await
960                    .map_err(|e| CelersError::Broker(format!("Failed to enqueue task: {}", e)))?;
961            }
962            QueueMode::Priority => {
963                // Use sorted set with priority as score (negate for descending order)
964                // Higher priority values should be processed first
965                let score = -priority as f64;
966                conn.zadd::<_, _, _, ()>(&self.queue_name, &serialized, score)
967                    .await
968                    .map_err(|e| CelersError::Broker(format!("Failed to enqueue task: {}", e)))?;
969            }
970        }
971
972        debug!(
973            "Enqueued task {} to queue {} (priority: {})",
974            task_id, self.queue_name, priority
975        );
976
977        #[cfg(feature = "metrics")]
978        {
979            TASKS_ENQUEUED_TOTAL.inc();
980
981            // Track per-task-type metrics
982            let task_name = &task.metadata.name;
983            TASKS_ENQUEUED_BY_TYPE.with_label_values(&[task_name]).inc();
984        }
985
986        Ok(task_id)
987    }
988
989    async fn dequeue(&self) -> Result<Option<BrokerMessage>> {
990        // Process delayed tasks first (move ready tasks to main queue)
991        let _ = self.process_delayed_tasks().await;
992
993        let mut conn = self
994            .client
995            .get_multiplexed_async_connection()
996            .await
997            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
998
999        let result = match self.mode {
1000            QueueMode::Fifo => {
1001                // Use BRPOPLPUSH for atomic dequeue and move to processing queue
1002                conn.brpoplpush(&self.queue_name, &self.processing_queue, 1.0)
1003                    .await
1004                    .map_err(|e| CelersError::Broker(format!("Failed to dequeue task: {}", e)))?
1005            }
1006            QueueMode::Priority => {
1007                // Use ZPOPMIN to get highest priority task (lowest score due to negation)
1008                // Note: Redis doesn't have BZPOPMINLPUSH, so we do two operations
1009                let items: Vec<(String, f64)> = conn
1010                    .zpopmin(&self.queue_name, 1)
1011                    .await
1012                    .map_err(|e| CelersError::Broker(format!("Failed to dequeue task: {}", e)))?;
1013
1014                if let Some((data, _score)) = items.first() {
1015                    // Move to processing queue
1016                    conn.lpush::<_, _, ()>(&self.processing_queue, data)
1017                        .await
1018                        .map_err(|e| {
1019                            CelersError::Broker(format!("Failed to move task to processing: {}", e))
1020                        })?;
1021                    Some(data.clone())
1022                } else {
1023                    None
1024                }
1025            }
1026        };
1027
1028        match result {
1029            Some(data) => {
1030                let task: SerializedTask = serde_json::from_str(&data)
1031                    .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1032
1033                debug!(
1034                    "Dequeued task {} (priority: {})",
1035                    task.metadata.id, task.metadata.priority
1036                );
1037                Ok(Some(BrokerMessage {
1038                    task,
1039                    receipt_handle: Some(data),
1040                }))
1041            }
1042            None => Ok(None),
1043        }
1044    }
1045
1046    async fn ack(&self, task_id: &TaskId, receipt_handle: Option<&str>) -> Result<()> {
1047        if let Some(handle) = receipt_handle {
1048            let mut conn = self
1049                .client
1050                .get_multiplexed_async_connection()
1051                .await
1052                .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1053
1054            conn.lrem::<_, _, ()>(&self.processing_queue, 1, handle)
1055                .await
1056                .map_err(|e| CelersError::Broker(format!("Failed to ack task: {}", e)))?;
1057
1058            info!("Acknowledged task {}", task_id);
1059        }
1060        Ok(())
1061    }
1062
1063    async fn reject(
1064        &self,
1065        task_id: &TaskId,
1066        receipt_handle: Option<&str>,
1067        requeue: bool,
1068    ) -> Result<()> {
1069        if let Some(handle) = receipt_handle {
1070            let mut conn = self
1071                .client
1072                .get_multiplexed_async_connection()
1073                .await
1074                .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1075
1076            // Remove from processing queue
1077            conn.lrem::<_, _, ()>(&self.processing_queue, 1, handle)
1078                .await
1079                .map_err(|e| {
1080                    CelersError::Broker(format!("Failed to remove from processing: {}", e))
1081                })?;
1082
1083            if requeue {
1084                // Re-add to main queue (increment retry count)
1085                let mut task: SerializedTask = serde_json::from_str(handle)
1086                    .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1087
1088                // Update task state to Retrying
1089                let retry_count = match task.metadata.state {
1090                    celers_core::TaskState::Retrying(count) => count + 1,
1091                    _ => 1,
1092                };
1093                task.metadata.state = celers_core::TaskState::Retrying(retry_count);
1094
1095                let serialized = serde_json::to_string(&task)
1096                    .map_err(|e| CelersError::Serialization(e.to_string()))?;
1097
1098                match self.mode {
1099                    QueueMode::Fifo => {
1100                        conn.rpush::<_, _, ()>(&self.queue_name, &serialized)
1101                            .await
1102                            .map_err(|e| {
1103                                CelersError::Broker(format!("Failed to requeue task: {}", e))
1104                            })?;
1105                    }
1106                    QueueMode::Priority => {
1107                        let score = -task.metadata.priority as f64;
1108                        conn.zadd::<_, _, _, ()>(&self.queue_name, &serialized, score)
1109                            .await
1110                            .map_err(|e| {
1111                                CelersError::Broker(format!("Failed to requeue task: {}", e))
1112                            })?;
1113                    }
1114                }
1115
1116                info!(
1117                    "Rejected and requeued task {} (retry {})",
1118                    task_id, retry_count
1119                );
1120            } else {
1121                // Move to Dead Letter Queue
1122                conn.lpush::<_, _, ()>(&self.dlq_name, handle)
1123                    .await
1124                    .map_err(|e| {
1125                        CelersError::Broker(format!("Failed to move task to DLQ: {}", e))
1126                    })?;
1127                error!("Task {} failed permanently, moved to DLQ", task_id);
1128            }
1129        }
1130        Ok(())
1131    }
1132
1133    async fn queue_size(&self) -> Result<usize> {
1134        let mut conn = self
1135            .client
1136            .get_multiplexed_async_connection()
1137            .await
1138            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1139
1140        let size: usize = match self.mode {
1141            QueueMode::Fifo => conn
1142                .llen(&self.queue_name)
1143                .await
1144                .map_err(|e| CelersError::Broker(format!("Failed to get queue size: {}", e)))?,
1145            QueueMode::Priority => conn
1146                .zcard(&self.queue_name)
1147                .await
1148                .map_err(|e| CelersError::Broker(format!("Failed to get queue size: {}", e)))?,
1149        };
1150
1151        #[cfg(feature = "metrics")]
1152        {
1153            QUEUE_SIZE.set(size as f64);
1154
1155            // Also update processing and DLQ sizes
1156            let processing_size: usize = conn.llen(&self.processing_queue).await.unwrap_or(0);
1157            PROCESSING_QUEUE_SIZE.set(processing_size as f64);
1158
1159            let dlq_size: usize = conn.llen(&self.dlq_name).await.unwrap_or(0);
1160            DLQ_SIZE.set(dlq_size as f64);
1161        }
1162
1163        Ok(size)
1164    }
1165
1166    async fn cancel(&self, task_id: &TaskId) -> Result<bool> {
1167        let mut conn = self
1168            .client
1169            .get_multiplexed_async_connection()
1170            .await
1171            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1172
1173        // Publish cancellation message
1174        let cancel_msg = task_id.to_string();
1175        let subscribers: i32 = conn
1176            .publish(&self.cancel_channel, &cancel_msg)
1177            .await
1178            .map_err(|e| CelersError::Broker(format!("Failed to publish cancel message: {}", e)))?;
1179
1180        if subscribers > 0 {
1181            info!(
1182                "Published cancellation for task {} to {} subscriber(s)",
1183                task_id, subscribers
1184            );
1185            Ok(true)
1186        } else {
1187            warn!("No subscribers listening for task {} cancellation", task_id);
1188            Ok(false)
1189        }
1190    }
1191
1192    // Optimized batch operations using Redis pipelining
1193
1194    async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>> {
1195        if tasks.is_empty() {
1196            return Ok(Vec::new());
1197        }
1198
1199        let mut conn = self
1200            .client
1201            .get_multiplexed_async_connection()
1202            .await
1203            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1204
1205        let mut task_ids = Vec::with_capacity(tasks.len());
1206        let mut pipe = redis::pipe();
1207
1208        // Build pipeline with all enqueue operations
1209        for task in &tasks {
1210            let task_id = task.metadata.id;
1211            task_ids.push(task_id);
1212
1213            let serialized = serde_json::to_string(&task)
1214                .map_err(|e| CelersError::Serialization(e.to_string()))?;
1215
1216            match self.mode {
1217                QueueMode::Fifo => {
1218                    pipe.rpush(&self.queue_name, &serialized);
1219                }
1220                QueueMode::Priority => {
1221                    let score = -(task.metadata.priority as f64);
1222                    pipe.zadd(&self.queue_name, &serialized, score);
1223                }
1224            }
1225        }
1226
1227        // Execute all operations in a single round-trip
1228        pipe.query_async::<redis::Value>(&mut conn)
1229            .await
1230            .map_err(|e| CelersError::Broker(format!("Failed to enqueue batch: {}", e)))?;
1231
1232        debug!(
1233            "Enqueued batch of {} tasks to queue {}",
1234            tasks.len(),
1235            self.queue_name
1236        );
1237
1238        #[cfg(feature = "metrics")]
1239        {
1240            use celers_metrics::{
1241                BATCH_ENQUEUE_TOTAL, BATCH_SIZE, TASKS_ENQUEUED_BY_TYPE, TASKS_ENQUEUED_TOTAL,
1242            };
1243            TASKS_ENQUEUED_TOTAL.inc_by(tasks.len() as f64);
1244            BATCH_ENQUEUE_TOTAL.inc();
1245            BATCH_SIZE.observe(tasks.len() as f64);
1246
1247            // Track per-task-type metrics
1248            for task in &tasks {
1249                let task_name = &task.metadata.name;
1250                TASKS_ENQUEUED_BY_TYPE.with_label_values(&[task_name]).inc();
1251            }
1252        }
1253
1254        Ok(task_ids)
1255    }
1256
1257    async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>> {
1258        if count == 0 {
1259            return Ok(Vec::new());
1260        }
1261
1262        let mut conn = self
1263            .client
1264            .get_multiplexed_async_connection()
1265            .await
1266            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1267
1268        let mut messages = Vec::with_capacity(count);
1269
1270        match self.mode {
1271            QueueMode::Fifo => {
1272                // Use pipeline for FIFO batch dequeue
1273                let mut pipe = redis::pipe();
1274                for _ in 0..count {
1275                    pipe.rpoplpush(&self.queue_name, &self.processing_queue);
1276                }
1277
1278                let results: Vec<Option<String>> = pipe
1279                    .query_async(&mut conn)
1280                    .await
1281                    .map_err(|e| CelersError::Broker(format!("Failed to dequeue batch: {}", e)))?;
1282
1283                for data_opt in results {
1284                    if let Some(data) = data_opt {
1285                        let task: SerializedTask = serde_json::from_str(&data)
1286                            .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1287
1288                        messages.push(BrokerMessage {
1289                            task,
1290                            receipt_handle: Some(data),
1291                        });
1292                    } else {
1293                        break; // Queue is empty
1294                    }
1295                }
1296            }
1297            QueueMode::Priority => {
1298                // For priority queue, pop multiple items at once
1299                let items: Vec<(String, f64)> = conn
1300                    .zpopmin(&self.queue_name, count as isize)
1301                    .await
1302                    .map_err(|e| CelersError::Broker(format!("Failed to dequeue batch: {}", e)))?;
1303
1304                if !items.is_empty() {
1305                    // Move all to processing queue using pipeline
1306                    let mut pipe = redis::pipe();
1307                    for (data, _score) in &items {
1308                        pipe.lpush(&self.processing_queue, data);
1309                    }
1310
1311                    pipe.query_async::<redis::Value>(&mut conn)
1312                        .await
1313                        .map_err(|e| {
1314                            CelersError::Broker(format!(
1315                                "Failed to move batch to processing: {}",
1316                                e
1317                            ))
1318                        })?;
1319
1320                    for (data, _score) in items {
1321                        let task: SerializedTask = serde_json::from_str(&data)
1322                            .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1323
1324                        messages.push(BrokerMessage {
1325                            task,
1326                            receipt_handle: Some(data),
1327                        });
1328                    }
1329                }
1330            }
1331        }
1332
1333        debug!(
1334            "Dequeued batch of {} tasks from queue {}",
1335            messages.len(),
1336            self.queue_name
1337        );
1338
1339        #[cfg(feature = "metrics")]
1340        if !messages.is_empty() {
1341            use celers_metrics::{BATCH_DEQUEUE_TOTAL, BATCH_SIZE};
1342            BATCH_DEQUEUE_TOTAL.inc();
1343            BATCH_SIZE.observe(messages.len() as f64);
1344        }
1345
1346        Ok(messages)
1347    }
1348
1349    async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()> {
1350        if tasks.is_empty() {
1351            return Ok(());
1352        }
1353
1354        let mut conn = self
1355            .client
1356            .get_multiplexed_async_connection()
1357            .await
1358            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1359
1360        let mut pipe = redis::pipe();
1361        let mut ack_count = 0;
1362
1363        for (task_id, receipt_handle) in tasks {
1364            if let Some(handle) = receipt_handle {
1365                pipe.lrem(&self.processing_queue, 1, handle);
1366                ack_count += 1;
1367            } else {
1368                warn!("No receipt handle for task {}, skipping ack", task_id);
1369            }
1370        }
1371
1372        if ack_count > 0 {
1373            pipe.query_async::<redis::Value>(&mut conn)
1374                .await
1375                .map_err(|e| CelersError::Broker(format!("Failed to ack batch: {}", e)))?;
1376
1377            debug!("Acknowledged batch of {} tasks", ack_count);
1378        }
1379
1380        Ok(())
1381    }
1382
1383    // Delayed Task Execution
1384
1385    async fn enqueue_at(&self, task: SerializedTask, execute_at: i64) -> Result<TaskId> {
1386        let mut conn = self
1387            .client
1388            .get_multiplexed_async_connection()
1389            .await
1390            .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1391
1392        let task_id = task.metadata.id;
1393        let serialized =
1394            serde_json::to_string(&task).map_err(|e| CelersError::Serialization(e.to_string()))?;
1395
1396        // Add to delayed queue (sorted set with execute_at as score)
1397        conn.zadd::<_, _, _, ()>(&self.delayed_queue, &serialized, execute_at as f64)
1398            .await
1399            .map_err(|e| CelersError::Broker(format!("Failed to schedule delayed task: {}", e)))?;
1400
1401        debug!(
1402            "Scheduled task {} for execution at Unix timestamp {} (queue: {})",
1403            task_id, execute_at, self.delayed_queue
1404        );
1405
1406        #[cfg(feature = "metrics")]
1407        {
1408            TASKS_ENQUEUED_TOTAL.inc();
1409
1410            // Track per-task-type metrics
1411            let task_name = &task.metadata.name;
1412            TASKS_ENQUEUED_BY_TYPE.with_label_values(&[task_name]).inc();
1413        }
1414
1415        Ok(task_id)
1416    }
1417
1418    async fn enqueue_after(&self, task: SerializedTask, delay_secs: u64) -> Result<TaskId> {
1419        let now = std::time::SystemTime::now()
1420            .duration_since(std::time::UNIX_EPOCH)
1421            .map_err(|e| CelersError::Other(format!("Failed to get current time: {}", e)))?
1422            .as_secs() as i64;
1423
1424        let execute_at = now + delay_secs as i64;
1425        self.enqueue_at(task, execute_at).await
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431    use super::*;
1432
1433    #[test]
1434    fn test_queue_mode_is_fifo() {
1435        assert!(QueueMode::Fifo.is_fifo());
1436        assert!(!QueueMode::Priority.is_fifo());
1437    }
1438
1439    #[test]
1440    fn test_queue_mode_is_priority() {
1441        assert!(!QueueMode::Fifo.is_priority());
1442        assert!(QueueMode::Priority.is_priority());
1443    }
1444
1445    #[test]
1446    fn test_queue_mode_display() {
1447        assert_eq!(QueueMode::Fifo.to_string(), "FIFO");
1448        assert_eq!(QueueMode::Priority.to_string(), "Priority");
1449    }
1450
1451    #[test]
1452    fn test_redis_broker_new() {
1453        let broker = RedisBroker::new("redis://localhost:6379", "test_queue");
1454        assert!(broker.is_ok());
1455
1456        let broker = broker.unwrap();
1457        assert_eq!(broker.queue_name(), "test_queue");
1458        assert_eq!(broker.mode(), QueueMode::Fifo);
1459    }
1460
1461    #[test]
1462    fn test_redis_broker_with_mode() {
1463        let broker =
1464            RedisBroker::with_mode("redis://localhost:6379", "test_queue", QueueMode::Priority);
1465        assert!(broker.is_ok());
1466
1467        let broker = broker.unwrap();
1468        assert_eq!(broker.mode(), QueueMode::Priority);
1469    }
1470
1471    #[test]
1472    fn test_redis_broker_with_visibility_timeout() {
1473        let broker = RedisBroker::new("redis://localhost:6379", "test_queue")
1474            .unwrap()
1475            .with_visibility_timeout(600);
1476
1477        assert_eq!(broker.visibility_timeout(), 600);
1478    }
1479
1480    #[test]
1481    fn test_queue_names() {
1482        let broker = RedisBroker::new("redis://localhost:6379", "my_queue").unwrap();
1483
1484        assert_eq!(broker.queue_name(), "my_queue");
1485        assert_eq!(broker.processing_queue_name(), "my_queue:processing");
1486        assert_eq!(broker.dlq_name(), "my_queue:dlq");
1487        assert_eq!(broker.delayed_queue_name(), "my_queue:delayed");
1488        assert_eq!(broker.cancel_channel(), "my_queue:cancel");
1489    }
1490
1491    #[test]
1492    fn test_broker_accessors() {
1493        let broker = RedisBroker::new("redis://localhost:6379", "test_queue").unwrap();
1494
1495        // Test health_checker creation
1496        let _ = broker.health_checker();
1497
1498        // Test queue_controller creation
1499        let _ = broker.queue_controller();
1500
1501        // Test deduplicator creation
1502        let _ = broker.deduplicator();
1503
1504        // Test client access
1505        let _ = broker.client();
1506    }
1507
1508    #[test]
1509    fn test_invalid_redis_url() {
1510        let result = RedisBroker::new("invalid://not-a-redis-url", "test_queue");
1511        assert!(result.is_err());
1512    }
1513}