1use async_trait::async_trait;
17use celers_core::{Broker, BrokerMessage, CelersError, Result, SerializedTask, TaskId};
18use redis::{AsyncCommands, Client};
19use tracing::{debug, error, info, warn};
20
21#[cfg(feature = "metrics")]
22use celers_metrics::{
23 DLQ_SIZE, PROCESSING_QUEUE_SIZE, QUEUE_SIZE, TASKS_ENQUEUED_BY_TYPE, TASKS_ENQUEUED_TOTAL,
24};
25
26pub mod advanced_queue;
27pub mod authorization;
28pub mod backup_restore;
29pub mod batch_ext;
30pub mod bulkhead;
31pub mod circuit_breaker;
32pub mod cluster;
33pub mod compression;
34pub mod connection;
35pub mod cron_scheduler;
36pub mod dedup;
37pub mod degradation;
38pub mod dlq_analytics;
39pub mod dlq_archival;
40pub mod dlq_replay;
41pub mod encryption;
42pub mod geo;
43pub mod health;
44pub mod hooks;
45pub mod hooks_advanced;
46pub mod integrity;
47pub mod locks;
48pub mod lua_scripts;
49pub mod metrics_ext;
50pub mod monitoring;
51pub mod otel_integration;
52pub mod partitioning;
53pub mod pipeline_advanced;
54pub mod pool;
55pub mod pool_advanced;
56pub mod priority_mgmt;
57pub mod queue_control;
58pub mod quota_mgmt;
59pub mod rate_limit;
60pub mod result_backend;
61pub mod retry;
62pub mod sentinel;
63pub mod streams;
64pub mod structured_logging;
65pub mod task_groups;
66pub mod task_query;
67pub mod telemetry;
68pub mod utilities;
69pub mod visibility;
70
71pub use advanced_queue::{
72 AdvancedQueueManager, PriorityAgingConfig, PriorityAgingConfigBuilder, QueueWeight,
73 StarvationPrevention, StarvationStats, TaskAge, WeightedQueueSelector,
74};
75pub use authorization::{AuthorizationPolicy, UserPermissions};
76pub use backup_restore::{BackupManager, QueueSnapshot, SnapshotComparison};
77pub use batch_ext::{BatchOperations, TaskFilter};
78pub use bulkhead::{Bulkhead, BulkheadConfig, BulkheadManager, BulkheadPermit, BulkheadStats};
79pub use circuit_breaker::{
80 CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
81};
82pub use cluster::{
83 ClusterConfig, ClusterConfigBuilder, ClusterNode, ClusterNodeRole, ClusterTopology, HashSlot,
84 RedisMode,
85};
86#[allow(deprecated)]
87pub use compression::{CompressionAlgorithm, CompressionConfig, CompressionStats, Compressor};
88pub use connection::{ConnectionStats, RedisConfig, TlsConfig};
89pub use cron_scheduler::{CronExpression, CronScheduler, ScheduledTask};
90pub use dedup::{DedupResult, DedupStrategy, Deduplicator};
91pub use degradation::{DegradationManager, DegradationMode, DegradationStats, QueuedOperation};
92pub use dlq_analytics::{
93 DLQAnalyzer, ErrorCategory, ErrorCluster, FailurePattern, FailureTrend, RootCause,
94 TemporalAnalysis,
95};
96pub use dlq_archival::{
97 ArchivalConfig, ArchiveSearchCriteria, ArchiveStats, ArchivedTask, DLQArchivalManager,
98 RetentionPolicy, StorageBackend,
99};
100pub use dlq_replay::{
101 ReplayCondition, ReplayPolicy, ReplayPolicyType, ReplayResult, ReplayScheduler, ReplayStats,
102};
103pub use encryption::{
104 EncryptedData, EncryptionAlgorithm, EncryptionConfig, EncryptionManager, EncryptionStats,
105};
106pub use geo::{
107 ConflictResolution, GeoReplicationManager, Region, RegionId, RegionStats, RegionStatsSnapshot,
108 RegionalReadRouter, ReplicationConfig, ReplicationConfigBuilder, RoutingStrategy, SyncMode,
109};
110pub use health::{HealthChecker, KeyspaceStats, QueueStats, RedisHealthStatus, ReplicationInfo};
111pub use hooks::{
112 CompletionHook, CompletionStatus, DequeueHook, EnqueueHook, HookContext, HookResult, LogLevel,
113 LoggingHook, MetricsHook, PayloadSizeValidator, TaskHookRegistry, TimestampEnrichmentHook,
114};
115pub use hooks_advanced::{
116 ConditionalHook, HookCondition, HookErrorStrategy, ParallelHookExecutor, PrioritizedHook,
117 RetryableHook, SequentialHooks,
118};
119pub use integrity::{ChecksumAlgorithm, IntegrityStats, IntegrityValidator, IntegrityWrappedTask};
120pub use locks::{DistributedLock, LockConfig, LockGuard, LockToken};
121pub use lua_scripts::{ScriptId, ScriptManager, ScriptPerformance, ScriptStats, SCRIPT_VERSION};
122pub use metrics_ext::{
123 HistogramSnapshot, LatencyStats, MetricsSnapshot, MetricsTracker, SlowOperation,
124 SlowOperationLogger, TaskAgeHistogram,
125};
126pub use monitoring::{
127 analyze_performance_trend, analyze_redis_broker_performance, analyze_redis_consumer_lag,
128 analyze_redis_dlq_health, analyze_redis_fragmentation, analyze_redis_memory_efficiency,
129 analyze_redis_slowlog, analyze_task_completion_patterns,
130 calculate_redis_message_age_distribution, calculate_redis_message_velocity,
131 calculate_redis_queue_health_score, detect_performance_regression, detect_queue_burst,
132 detect_redis_queue_anomaly, detect_redis_queue_saturation, estimate_redis_monthly_cost,
133 estimate_redis_processing_capacity, generate_queue_health_report, predict_redis_queue_size,
134 recommend_alert_thresholds, recommend_redis_scaling_strategy, suggest_redis_worker_scaling,
135 AlertThresholds, AnomalyDetection, BurstDetection, ConsumerLagAnalysis, DLQAnalysis,
136 FragmentationAnalysis, MemoryEfficiencyAnalysis, MessageAgeDistribution, MessageVelocity,
137 PerformanceTrend, ProcessingCapacity, QueueHealthReport, QueueSaturationAnalysis,
138 QueueSizePrediction, QueueTrend, RedisCostEstimate, RegressionDetection, SaturationLevel,
139 ScalingRecommendation, ScalingStrategy, SlowlogAnalysis, TaskCompletionAnalysis,
140 WorkerScalingSuggestion,
141};
142pub use otel_integration::{
143 OtelBrokerInstrumentation, OtelConfig, SpanInfo, TracingBackend, W3CTraceContext,
144};
145pub use partitioning::{
146 ConsistentHashRing, HashAlgorithm, PartitionManager, PartitionStats, PartitionStrategy,
147};
148pub use pipeline_advanced::{
149 AdvancedPipeline, PipelineConfig, PipelineExecutionResult, PipelineOperation, PipelineStats,
150};
151pub use pool::{ConnectionPool, PoolConfig, PoolStats};
152pub use pool_advanced::{AdaptiveConnectionPool, AdaptivePoolConfig, AdaptivePoolStats};
153pub use priority_mgmt::{PriorityAdjustment, PriorityManager};
154pub use queue_control::{QueueController, QueueState};
155pub use quota_mgmt::{QuotaConfig, QuotaManager, QuotaPeriod, QuotaUsage};
156pub use rate_limit::{
157 DistributedRateLimiter, QueueRateLimitConfig, QueueRateLimiter, TokenBucketLimiter,
158 TrackedRateLimiter,
159};
160pub use result_backend::{ResultBackend, ResultBackendConfig, TaskResult, TaskStatus};
161pub use retry::{BackoffStrategy, RetryConfig, RetryExecutor, RetryResult};
162pub use sentinel::{
163 MasterAddress, SentinelClient, SentinelConfig, SentinelConfigBuilder, SentinelRole,
164};
165pub use streams::{
166 StreamConfig, StreamConfigBuilder, StreamEntry, StreamMessageId, StreamStats, StreamsClient,
167};
168pub use structured_logging::{
169 CorrelationAnalysis, LogConfig, LogContext, LogEntry, PerformanceAnalysis, StructuredLogLevel,
170 StructuredLogger,
171};
172pub use task_groups::{GroupConfig, GroupMetadata, GroupStatus, TaskGroup};
173pub use task_query::{TaskQuery, TaskSearchCriteria, TaskStats};
174pub use telemetry::{SpanBuilder, SpanEvent, TracingContext};
175pub use utilities::{
176 analyze_redis_command_performance, analyze_redis_queue_balance,
177 calculate_optimal_redis_batch_size, calculate_optimal_redis_pool_size,
178 calculate_redis_capacity_headroom, calculate_redis_key_ttl_by_priority,
179 calculate_redis_migration_batch_size, calculate_redis_optimal_shard_count,
180 calculate_redis_pipeline_size, calculate_redis_queue_efficiency,
181 calculate_redis_sla_compliance, calculate_redis_timeout_values, calculate_worker_distribution,
182 estimate_redis_migration_time, estimate_redis_queue_drain_time, estimate_redis_queue_memory,
183 estimate_redis_scaling_time, optimize_task_priority, recommend_queue_rebalancing,
184 suggest_redis_data_retention, suggest_redis_persistence_strategy,
185 suggest_redis_pipeline_strategy, CapacityHeadroom, MigrationStrategy, PriorityOptimization,
186 QueueEfficiency, RebalancingRecommendation, SLACompliance, ScalingTimeEstimate,
187 WorkerDistribution,
188};
189use visibility::VisibilityManager;
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
193pub enum QueueMode {
194 Fifo,
196 Priority,
198}
199
200impl QueueMode {
201 pub fn is_fifo(&self) -> bool {
203 matches!(self, QueueMode::Fifo)
204 }
205
206 pub fn is_priority(&self) -> bool {
208 matches!(self, QueueMode::Priority)
209 }
210}
211
212impl std::fmt::Display for QueueMode {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 match self {
215 QueueMode::Fifo => write!(f, "FIFO"),
216 QueueMode::Priority => write!(f, "Priority"),
217 }
218 }
219}
220
221pub struct RedisBroker {
223 client: Client,
224 queue_name: String,
225 processing_queue: String,
226 dlq_name: String,
227 delayed_queue: String,
228 cancel_channel: String,
229 mode: QueueMode,
230 #[allow(dead_code)]
231 visibility_manager: VisibilityManager,
232 visibility_timeout_secs: u64,
233}
234
235impl RedisBroker {
236 pub fn new(redis_url: &str, queue_name: &str) -> Result<Self> {
238 Self::with_mode(redis_url, queue_name, QueueMode::Fifo)
239 }
240
241 pub fn with_mode(redis_url: &str, queue_name: &str, mode: QueueMode) -> Result<Self> {
243 let client = Client::open(redis_url)
244 .map_err(|e| CelersError::Broker(format!("Failed to connect to Redis: {}", e)))?;
245
246 Ok(Self {
247 client,
248 queue_name: queue_name.to_string(),
249 processing_queue: format!("{}:processing", queue_name),
250 dlq_name: format!("{}:dlq", queue_name),
251 delayed_queue: format!("{}:delayed", queue_name),
252 cancel_channel: format!("{}:cancel", queue_name),
253 mode,
254 visibility_manager: VisibilityManager::new(),
255 visibility_timeout_secs: 300, })
257 }
258
259 pub fn from_config(config: &RedisConfig, queue_name: &str, mode: QueueMode) -> Result<Self> {
261 let client = config.build_client()?;
262
263 debug!("Created Redis broker with config: {}", config.describe());
264
265 Ok(Self {
266 client,
267 queue_name: queue_name.to_string(),
268 processing_queue: format!("{}:processing", queue_name),
269 dlq_name: format!("{}:dlq", queue_name),
270 delayed_queue: format!("{}:delayed", queue_name),
271 cancel_channel: format!("{}:cancel", queue_name),
272 mode,
273 visibility_manager: VisibilityManager::new(),
274 visibility_timeout_secs: 300, })
276 }
277
278 pub async fn from_sentinel(
280 sentinel: &SentinelClient,
281 queue_name: &str,
282 mode: QueueMode,
283 ) -> Result<Self> {
284 let client = sentinel.get_client().await?;
285
286 info!("Created Redis broker with Sentinel support");
287
288 Ok(Self {
289 client,
290 queue_name: queue_name.to_string(),
291 processing_queue: format!("{}:processing", queue_name),
292 dlq_name: format!("{}:dlq", queue_name),
293 delayed_queue: format!("{}:delayed", queue_name),
294 cancel_channel: format!("{}:cancel", queue_name),
295 mode,
296 visibility_manager: VisibilityManager::new(),
297 visibility_timeout_secs: 300, })
299 }
300
301 pub fn with_visibility_timeout(mut self, timeout_secs: u64) -> Self {
303 self.visibility_timeout_secs = timeout_secs;
304 self
305 }
306
307 pub fn mode(&self) -> QueueMode {
309 self.mode
310 }
311
312 pub async fn dlq_size(&self) -> Result<usize> {
314 let mut conn = self
315 .client
316 .get_multiplexed_async_connection()
317 .await
318 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
319
320 let size: usize = conn
321 .llen(&self.dlq_name)
322 .await
323 .map_err(|e| CelersError::Broker(format!("Failed to get DLQ size: {}", e)))?;
324
325 Ok(size)
326 }
327
328 pub async fn inspect_dlq(&self, limit: isize) -> Result<Vec<SerializedTask>> {
330 let mut conn = self
331 .client
332 .get_multiplexed_async_connection()
333 .await
334 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
335
336 let items: Vec<String> = conn
337 .lrange(&self.dlq_name, 0, limit - 1)
338 .await
339 .map_err(|e| CelersError::Broker(format!("Failed to inspect DLQ: {}", e)))?;
340
341 let mut tasks = Vec::new();
342 for item in items {
343 let task: SerializedTask = serde_json::from_str(&item)
344 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
345 tasks.push(task);
346 }
347
348 Ok(tasks)
349 }
350
351 pub async fn replay_from_dlq(&self, task_id: &TaskId) -> Result<bool> {
353 let mut conn = self
354 .client
355 .get_multiplexed_async_connection()
356 .await
357 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
358
359 let items: Vec<String> = conn
361 .lrange(&self.dlq_name, 0, -1)
362 .await
363 .map_err(|e| CelersError::Broker(format!("Failed to get DLQ items: {}", e)))?;
364
365 for item in items {
367 let task: SerializedTask = serde_json::from_str(&item)
368 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
369
370 if &task.metadata.id == task_id {
371 conn.lrem::<_, _, ()>(&self.dlq_name, 1, &item)
373 .await
374 .map_err(|e| {
375 CelersError::Broker(format!("Failed to remove from DLQ: {}", e))
376 })?;
377
378 let mut replayed_task = task;
380 replayed_task.metadata.state = celers_core::TaskState::Pending;
381
382 let serialized = serde_json::to_string(&replayed_task)
383 .map_err(|e| CelersError::Serialization(e.to_string()))?;
384
385 match self.mode {
387 QueueMode::Fifo => {
388 conn.rpush::<_, _, ()>(&self.queue_name, &serialized)
389 .await
390 .map_err(|e| {
391 CelersError::Broker(format!("Failed to replay task: {}", e))
392 })?;
393 }
394 QueueMode::Priority => {
395 let score = -replayed_task.metadata.priority as f64;
396 conn.zadd::<_, _, _, ()>(&self.queue_name, &serialized, score)
397 .await
398 .map_err(|e| {
399 CelersError::Broker(format!("Failed to replay task: {}", e))
400 })?;
401 }
402 }
403
404 info!("Replayed task {} from DLQ", task_id);
405 return Ok(true);
406 }
407 }
408
409 Ok(false)
410 }
411
412 pub async fn clear_dlq(&self) -> Result<usize> {
414 let mut conn = self
415 .client
416 .get_multiplexed_async_connection()
417 .await
418 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
419
420 let count: usize = conn
421 .llen(&self.dlq_name)
422 .await
423 .map_err(|e| CelersError::Broker(format!("Failed to get DLQ size: {}", e)))?;
424
425 conn.del::<_, ()>(&self.dlq_name)
426 .await
427 .map_err(|e| CelersError::Broker(format!("Failed to clear DLQ: {}", e)))?;
428
429 info!("Cleared {} tasks from DLQ", count);
430 Ok(count)
431 }
432
433 pub fn cancel_channel(&self) -> &str {
435 &self.cancel_channel
436 }
437
438 pub async fn create_pubsub(&self) -> Result<redis::aio::PubSub> {
440 let pubsub = self.client.get_async_pubsub().await.map_err(|e| {
441 CelersError::Broker(format!("Failed to create PubSub connection: {}", e))
442 })?;
443 Ok(pubsub)
444 }
445
446 pub fn health_checker(&self) -> HealthChecker {
448 HealthChecker::new(self.client.clone())
449 }
450
451 pub fn queue_controller(&self) -> QueueController {
453 QueueController::new(self.client.clone(), &self.queue_name)
454 }
455
456 pub fn deduplicator(&self) -> Deduplicator {
458 Deduplicator::new(self.client.clone(), &self.queue_name)
459 }
460
461 pub fn script_manager(&self) -> ScriptManager {
463 ScriptManager::new(self.client.clone())
464 }
465
466 pub fn partition_manager(
468 &self,
469 num_partitions: usize,
470 strategy: PartitionStrategy,
471 ) -> PartitionManager {
472 PartitionManager::new(num_partitions, strategy, &self.queue_name)
473 }
474
475 pub fn batch_operations(&self) -> BatchOperations {
477 BatchOperations::new(self.client.clone(), self.queue_name.clone(), self.mode)
478 }
479
480 pub fn priority_manager(&self) -> PriorityManager {
482 PriorityManager::new(self.client.clone(), self.queue_name.clone(), self.mode)
483 }
484
485 pub fn task_query(&self) -> TaskQuery {
487 TaskQuery::new(
488 self.client.clone(),
489 self.queue_name.clone(),
490 self.processing_queue.clone(),
491 self.dlq_name.clone(),
492 self.mode,
493 )
494 }
495
496 pub fn backup_manager(&self) -> BackupManager {
498 BackupManager::new(
499 self.client.clone(),
500 self.queue_name.clone(),
501 self.processing_queue.clone(),
502 self.dlq_name.clone(),
503 self.delayed_queue.clone(),
504 self.mode,
505 )
506 }
507
508 pub async fn set_queue_ttl(&self, ttl_secs: u64) -> Result<()> {
513 let mut conn = self
514 .client
515 .get_multiplexed_async_connection()
516 .await
517 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
518
519 let _: () = redis::cmd("EXPIRE")
521 .arg(&self.queue_name)
522 .arg(ttl_secs)
523 .query_async(&mut conn)
524 .await
525 .map_err(|e| CelersError::Broker(format!("Failed to set queue TTL: {}", e)))?;
526
527 let _: () = redis::cmd("EXPIRE")
529 .arg(&self.processing_queue)
530 .arg(ttl_secs)
531 .query_async(&mut conn)
532 .await
533 .map_err(|e| {
534 CelersError::Broker(format!("Failed to set processing queue TTL: {}", e))
535 })?;
536
537 let _: () = redis::cmd("EXPIRE")
539 .arg(&self.delayed_queue)
540 .arg(ttl_secs)
541 .query_async(&mut conn)
542 .await
543 .map_err(|e| CelersError::Broker(format!("Failed to set delayed queue TTL: {}", e)))?;
544
545 debug!("Set TTL of {} seconds on all queues", ttl_secs);
546
547 Ok(())
548 }
549
550 pub async fn cleanup_dlq(&self, _max_age_secs: u64) -> Result<usize> {
552 let mut conn = self
553 .client
554 .get_multiplexed_async_connection()
555 .await
556 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
557
558 let items: Vec<String> = conn
560 .lrange(&self.dlq_name, 0, -1)
561 .await
562 .map_err(|e| CelersError::Broker(format!("Failed to get DLQ items: {}", e)))?;
563
564 let mut removed_count = 0;
565
566 for item in items {
567 if serde_json::from_str::<SerializedTask>(&item).is_ok() {
569 conn.lrem::<_, _, ()>(&self.dlq_name, 1, &item)
572 .await
573 .map_err(|e| {
574 CelersError::Broker(format!("Failed to remove from DLQ: {}", e))
575 })?;
576 removed_count += 1;
577 }
578 }
579
580 if removed_count > 0 {
581 info!("Cleaned up {} old tasks from DLQ", removed_count);
582 }
583
584 Ok(removed_count)
585 }
586
587 pub async fn get_queue_stats(&self) -> Result<QueueStats> {
589 self.health_checker()
590 .get_queue_stats(&self.queue_name, self.mode.is_priority())
591 .await
592 }
593
594 pub async fn check_health(&self) -> RedisHealthStatus {
596 self.health_checker().check_health().await
597 }
598
599 pub async fn ping(&self) -> Result<u64> {
601 self.health_checker().ping().await
602 }
603
604 pub fn visibility_timeout(&self) -> u64 {
606 self.visibility_timeout_secs
607 }
608
609 pub fn client(&self) -> &Client {
611 &self.client
612 }
613
614 pub fn delayed_queue_name(&self) -> &str {
616 &self.delayed_queue
617 }
618
619 pub fn processing_queue_name(&self) -> &str {
621 &self.processing_queue
622 }
623
624 pub fn dlq_name(&self) -> &str {
626 &self.dlq_name
627 }
628
629 pub fn queue_name(&self) -> &str {
631 &self.queue_name
632 }
633
634 pub fn queue_names(&self) -> Vec<String> {
636 vec![
637 self.queue_name.clone(),
638 self.processing_queue.clone(),
639 self.dlq_name.clone(),
640 self.delayed_queue.clone(),
641 ]
642 }
643
644 pub async fn queue_exists(&self, queue_name: &str) -> Result<bool> {
646 let mut conn = self
647 .client
648 .get_multiplexed_async_connection()
649 .await
650 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
651
652 let exists: bool = redis::cmd("EXISTS")
653 .arg(queue_name)
654 .query_async(&mut conn)
655 .await
656 .map_err(|e| CelersError::Broker(format!("Failed to check queue existence: {}", e)))?;
657
658 Ok(exists)
659 }
660
661 pub async fn get_queue_size_by_name(&self, queue_name: &str) -> Result<usize> {
663 let mut conn = self
664 .client
665 .get_multiplexed_async_connection()
666 .await
667 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
668
669 let list_size: usize = conn.llen(queue_name).await.unwrap_or(0);
671 if list_size > 0 {
672 return Ok(list_size);
673 }
674
675 let zset_size: usize = conn.zcard(queue_name).await.unwrap_or(0);
676 Ok(zset_size)
677 }
678
679 pub async fn delete_queue(&self, queue_name: &str) -> Result<()> {
681 let mut conn = self
682 .client
683 .get_multiplexed_async_connection()
684 .await
685 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
686
687 conn.del::<_, ()>(queue_name)
688 .await
689 .map_err(|e| CelersError::Broker(format!("Failed to delete queue: {}", e)))?;
690
691 warn!("Deleted queue: {}", queue_name);
692 Ok(())
693 }
694
695 pub async fn purge_all_queues(&self) -> Result<usize> {
697 let mut total_removed = 0;
698
699 for queue_name in self.queue_names() {
700 if let Ok(size) = self.get_queue_size_by_name(&queue_name).await {
701 total_removed += size;
702 self.delete_queue(&queue_name).await?;
703 }
704 }
705
706 warn!("Purged all queues, removed {} tasks total", total_removed);
707 Ok(total_removed)
708 }
709
710 pub async fn get_all_queue_sizes(&self) -> Result<std::collections::HashMap<String, usize>> {
712 let mut sizes = std::collections::HashMap::new();
713
714 for queue_name in self.queue_names() {
715 if let Ok(size) = self.get_queue_size_by_name(&queue_name).await {
716 sizes.insert(queue_name, size);
717 }
718 }
719
720 Ok(sizes)
721 }
722
723 pub async fn recover_processing_tasks(&self) -> Result<usize> {
728 let mut conn = self
729 .client
730 .get_multiplexed_async_connection()
731 .await
732 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
733
734 let items: Vec<String> = conn
735 .lrange(&self.processing_queue, 0, -1)
736 .await
737 .map_err(|e| {
738 CelersError::Broker(format!("Failed to get processing queue items: {}", e))
739 })?;
740
741 if items.is_empty() {
742 return Ok(0);
743 }
744
745 let count = items.len();
746 let mut pipe = redis::pipe();
747
748 for item in &items {
749 if let Ok(task) = serde_json::from_str::<SerializedTask>(item) {
751 match self.mode {
753 QueueMode::Fifo => {
754 pipe.rpush(&self.queue_name, item);
755 }
756 QueueMode::Priority => {
757 let score = -(task.metadata.priority as f64);
758 pipe.zadd(&self.queue_name, item, score);
759 }
760 }
761 pipe.lrem(&self.processing_queue, 1, item);
763 }
764 }
765
766 pipe.query_async::<redis::Value>(&mut conn)
767 .await
768 .map_err(|e| {
769 CelersError::Broker(format!("Failed to recover processing tasks: {}", e))
770 })?;
771
772 info!("Recovered {} tasks from processing queue", count);
773 Ok(count)
774 }
775
776 pub async fn total_task_count(&self) -> Result<usize> {
778 let sizes = self.get_all_queue_sizes().await?;
779 Ok(sizes.values().sum())
780 }
781
782 pub async fn is_idle(&self) -> Result<bool> {
784 let total = self.total_task_count().await?;
785 Ok(total == 0)
786 }
787
788 pub async fn estimate_memory_usage(&self) -> Result<usize> {
792 let mut conn = self
793 .client
794 .get_multiplexed_async_connection()
795 .await
796 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
797
798 let mut total_bytes = 0usize;
799
800 for queue_name in self.queue_names() {
802 let memory: usize = redis::cmd("MEMORY")
803 .arg("USAGE")
804 .arg(&queue_name)
805 .query_async(&mut conn)
806 .await
807 .unwrap_or(0);
808 total_bytes += memory;
809 }
810
811 Ok(total_bytes)
812 }
813
814 pub async fn bulk_replay_from_dlq(&self, max_count: Option<usize>) -> Result<usize> {
818 let limit = max_count.unwrap_or(isize::MAX as usize) as isize;
819 let tasks = self.inspect_dlq(limit).await?;
820 let count = tasks.len();
821
822 for task in tasks {
823 self.replay_from_dlq(&task.metadata.id).await?;
824 }
825
826 Ok(count)
827 }
828
829 pub async fn peek_next(&self) -> Result<Option<SerializedTask>> {
833 let mut conn = self
834 .client
835 .get_multiplexed_async_connection()
836 .await
837 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
838
839 let data: Option<String> = match self.mode {
840 QueueMode::Fifo => {
841 conn.lindex(&self.queue_name, -1)
843 .await
844 .map_err(|e| CelersError::Broker(format!("Failed to peek: {}", e)))?
845 }
846 QueueMode::Priority => {
847 let items: Vec<String> = conn
849 .zrange(&self.queue_name, 0, 0)
850 .await
851 .map_err(|e| CelersError::Broker(format!("Failed to peek: {}", e)))?;
852 items.into_iter().next()
853 }
854 };
855
856 if let Some(serialized) = data {
857 let task: SerializedTask = serde_json::from_str(&serialized)
858 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
859 Ok(Some(task))
860 } else {
861 Ok(None)
862 }
863 }
864
865 pub async fn queue_depth_percentage(&self, max_recommended_size: Option<usize>) -> Result<f64> {
870 let max_size = max_recommended_size.unwrap_or(10000) as f64;
871 let current_size = self.queue_size().await? as f64;
872 Ok(current_size / max_size)
873 }
874
875 pub async fn is_near_capacity(&self, max_recommended_size: Option<usize>) -> Result<bool> {
877 let percentage = self.queue_depth_percentage(max_recommended_size).await?;
878 Ok(percentage > 0.8)
879 }
880
881 async fn process_delayed_tasks(&self) -> Result<usize> {
886 let mut conn = self
887 .client
888 .get_multiplexed_async_connection()
889 .await
890 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
891
892 let now = std::time::SystemTime::now()
893 .duration_since(std::time::UNIX_EPOCH)
894 .map_err(|e| CelersError::Other(format!("Failed to get current time: {}", e)))?
895 .as_secs() as f64;
896
897 let ready_tasks: Vec<String> =
899 conn.zrangebyscore(&self.delayed_queue, "-inf", now)
900 .await
901 .map_err(|e| CelersError::Broker(format!("Failed to get ready tasks: {}", e)))?;
902
903 if ready_tasks.is_empty() {
904 return Ok(0);
905 }
906
907 let count = ready_tasks.len();
908
909 let mut pipe = redis::pipe();
911
912 for task_data in &ready_tasks {
913 if let Ok(task) = serde_json::from_str::<SerializedTask>(task_data) {
915 match self.mode {
916 QueueMode::Fifo => {
917 pipe.rpush(&self.queue_name, task_data);
918 }
919 QueueMode::Priority => {
920 let score = -(task.metadata.priority as f64);
921 pipe.zadd(&self.queue_name, task_data, score);
922 }
923 }
924 }
925
926 pipe.zrem(&self.delayed_queue, task_data);
928 }
929
930 pipe.query_async::<redis::Value>(&mut conn)
932 .await
933 .map_err(|e| CelersError::Broker(format!("Failed to move delayed tasks: {}", e)))?;
934
935 debug!("Moved {} delayed tasks to main queue", count);
936
937 Ok(count)
938 }
939}
940
941#[async_trait]
942impl Broker for RedisBroker {
943 async fn enqueue(&self, task: SerializedTask) -> Result<TaskId> {
944 let mut conn = self
945 .client
946 .get_multiplexed_async_connection()
947 .await
948 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
949
950 let task_id = task.metadata.id;
951 let priority = task.metadata.priority;
952 let serialized =
953 serde_json::to_string(&task).map_err(|e| CelersError::Serialization(e.to_string()))?;
954
955 match self.mode {
956 QueueMode::Fifo => {
957 conn.rpush::<_, _, ()>(&self.queue_name, &serialized)
959 .await
960 .map_err(|e| CelersError::Broker(format!("Failed to enqueue task: {}", e)))?;
961 }
962 QueueMode::Priority => {
963 let score = -priority as f64;
966 conn.zadd::<_, _, _, ()>(&self.queue_name, &serialized, score)
967 .await
968 .map_err(|e| CelersError::Broker(format!("Failed to enqueue task: {}", e)))?;
969 }
970 }
971
972 debug!(
973 "Enqueued task {} to queue {} (priority: {})",
974 task_id, self.queue_name, priority
975 );
976
977 #[cfg(feature = "metrics")]
978 {
979 TASKS_ENQUEUED_TOTAL.inc();
980
981 let task_name = &task.metadata.name;
983 TASKS_ENQUEUED_BY_TYPE.with_label_values(&[task_name]).inc();
984 }
985
986 Ok(task_id)
987 }
988
989 async fn dequeue(&self) -> Result<Option<BrokerMessage>> {
990 let _ = self.process_delayed_tasks().await;
992
993 let mut conn = self
994 .client
995 .get_multiplexed_async_connection()
996 .await
997 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
998
999 let result = match self.mode {
1000 QueueMode::Fifo => {
1001 conn.brpoplpush(&self.queue_name, &self.processing_queue, 1.0)
1003 .await
1004 .map_err(|e| CelersError::Broker(format!("Failed to dequeue task: {}", e)))?
1005 }
1006 QueueMode::Priority => {
1007 let items: Vec<(String, f64)> = conn
1010 .zpopmin(&self.queue_name, 1)
1011 .await
1012 .map_err(|e| CelersError::Broker(format!("Failed to dequeue task: {}", e)))?;
1013
1014 if let Some((data, _score)) = items.first() {
1015 conn.lpush::<_, _, ()>(&self.processing_queue, data)
1017 .await
1018 .map_err(|e| {
1019 CelersError::Broker(format!("Failed to move task to processing: {}", e))
1020 })?;
1021 Some(data.clone())
1022 } else {
1023 None
1024 }
1025 }
1026 };
1027
1028 match result {
1029 Some(data) => {
1030 let task: SerializedTask = serde_json::from_str(&data)
1031 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1032
1033 debug!(
1034 "Dequeued task {} (priority: {})",
1035 task.metadata.id, task.metadata.priority
1036 );
1037 Ok(Some(BrokerMessage {
1038 task,
1039 receipt_handle: Some(data),
1040 }))
1041 }
1042 None => Ok(None),
1043 }
1044 }
1045
1046 async fn ack(&self, task_id: &TaskId, receipt_handle: Option<&str>) -> Result<()> {
1047 if let Some(handle) = receipt_handle {
1048 let mut conn = self
1049 .client
1050 .get_multiplexed_async_connection()
1051 .await
1052 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1053
1054 conn.lrem::<_, _, ()>(&self.processing_queue, 1, handle)
1055 .await
1056 .map_err(|e| CelersError::Broker(format!("Failed to ack task: {}", e)))?;
1057
1058 info!("Acknowledged task {}", task_id);
1059 }
1060 Ok(())
1061 }
1062
1063 async fn reject(
1064 &self,
1065 task_id: &TaskId,
1066 receipt_handle: Option<&str>,
1067 requeue: bool,
1068 ) -> Result<()> {
1069 if let Some(handle) = receipt_handle {
1070 let mut conn = self
1071 .client
1072 .get_multiplexed_async_connection()
1073 .await
1074 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1075
1076 conn.lrem::<_, _, ()>(&self.processing_queue, 1, handle)
1078 .await
1079 .map_err(|e| {
1080 CelersError::Broker(format!("Failed to remove from processing: {}", e))
1081 })?;
1082
1083 if requeue {
1084 let mut task: SerializedTask = serde_json::from_str(handle)
1086 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1087
1088 let retry_count = match task.metadata.state {
1090 celers_core::TaskState::Retrying(count) => count + 1,
1091 _ => 1,
1092 };
1093 task.metadata.state = celers_core::TaskState::Retrying(retry_count);
1094
1095 let serialized = serde_json::to_string(&task)
1096 .map_err(|e| CelersError::Serialization(e.to_string()))?;
1097
1098 match self.mode {
1099 QueueMode::Fifo => {
1100 conn.rpush::<_, _, ()>(&self.queue_name, &serialized)
1101 .await
1102 .map_err(|e| {
1103 CelersError::Broker(format!("Failed to requeue task: {}", e))
1104 })?;
1105 }
1106 QueueMode::Priority => {
1107 let score = -task.metadata.priority as f64;
1108 conn.zadd::<_, _, _, ()>(&self.queue_name, &serialized, score)
1109 .await
1110 .map_err(|e| {
1111 CelersError::Broker(format!("Failed to requeue task: {}", e))
1112 })?;
1113 }
1114 }
1115
1116 info!(
1117 "Rejected and requeued task {} (retry {})",
1118 task_id, retry_count
1119 );
1120 } else {
1121 conn.lpush::<_, _, ()>(&self.dlq_name, handle)
1123 .await
1124 .map_err(|e| {
1125 CelersError::Broker(format!("Failed to move task to DLQ: {}", e))
1126 })?;
1127 error!("Task {} failed permanently, moved to DLQ", task_id);
1128 }
1129 }
1130 Ok(())
1131 }
1132
1133 async fn queue_size(&self) -> Result<usize> {
1134 let mut conn = self
1135 .client
1136 .get_multiplexed_async_connection()
1137 .await
1138 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1139
1140 let size: usize = match self.mode {
1141 QueueMode::Fifo => conn
1142 .llen(&self.queue_name)
1143 .await
1144 .map_err(|e| CelersError::Broker(format!("Failed to get queue size: {}", e)))?,
1145 QueueMode::Priority => conn
1146 .zcard(&self.queue_name)
1147 .await
1148 .map_err(|e| CelersError::Broker(format!("Failed to get queue size: {}", e)))?,
1149 };
1150
1151 #[cfg(feature = "metrics")]
1152 {
1153 QUEUE_SIZE.set(size as f64);
1154
1155 let processing_size: usize = conn.llen(&self.processing_queue).await.unwrap_or(0);
1157 PROCESSING_QUEUE_SIZE.set(processing_size as f64);
1158
1159 let dlq_size: usize = conn.llen(&self.dlq_name).await.unwrap_or(0);
1160 DLQ_SIZE.set(dlq_size as f64);
1161 }
1162
1163 Ok(size)
1164 }
1165
1166 async fn cancel(&self, task_id: &TaskId) -> Result<bool> {
1167 let mut conn = self
1168 .client
1169 .get_multiplexed_async_connection()
1170 .await
1171 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1172
1173 let cancel_msg = task_id.to_string();
1175 let subscribers: i32 = conn
1176 .publish(&self.cancel_channel, &cancel_msg)
1177 .await
1178 .map_err(|e| CelersError::Broker(format!("Failed to publish cancel message: {}", e)))?;
1179
1180 if subscribers > 0 {
1181 info!(
1182 "Published cancellation for task {} to {} subscriber(s)",
1183 task_id, subscribers
1184 );
1185 Ok(true)
1186 } else {
1187 warn!("No subscribers listening for task {} cancellation", task_id);
1188 Ok(false)
1189 }
1190 }
1191
1192 async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>> {
1195 if tasks.is_empty() {
1196 return Ok(Vec::new());
1197 }
1198
1199 let mut conn = self
1200 .client
1201 .get_multiplexed_async_connection()
1202 .await
1203 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1204
1205 let mut task_ids = Vec::with_capacity(tasks.len());
1206 let mut pipe = redis::pipe();
1207
1208 for task in &tasks {
1210 let task_id = task.metadata.id;
1211 task_ids.push(task_id);
1212
1213 let serialized = serde_json::to_string(&task)
1214 .map_err(|e| CelersError::Serialization(e.to_string()))?;
1215
1216 match self.mode {
1217 QueueMode::Fifo => {
1218 pipe.rpush(&self.queue_name, &serialized);
1219 }
1220 QueueMode::Priority => {
1221 let score = -(task.metadata.priority as f64);
1222 pipe.zadd(&self.queue_name, &serialized, score);
1223 }
1224 }
1225 }
1226
1227 pipe.query_async::<redis::Value>(&mut conn)
1229 .await
1230 .map_err(|e| CelersError::Broker(format!("Failed to enqueue batch: {}", e)))?;
1231
1232 debug!(
1233 "Enqueued batch of {} tasks to queue {}",
1234 tasks.len(),
1235 self.queue_name
1236 );
1237
1238 #[cfg(feature = "metrics")]
1239 {
1240 use celers_metrics::{
1241 BATCH_ENQUEUE_TOTAL, BATCH_SIZE, TASKS_ENQUEUED_BY_TYPE, TASKS_ENQUEUED_TOTAL,
1242 };
1243 TASKS_ENQUEUED_TOTAL.inc_by(tasks.len() as f64);
1244 BATCH_ENQUEUE_TOTAL.inc();
1245 BATCH_SIZE.observe(tasks.len() as f64);
1246
1247 for task in &tasks {
1249 let task_name = &task.metadata.name;
1250 TASKS_ENQUEUED_BY_TYPE.with_label_values(&[task_name]).inc();
1251 }
1252 }
1253
1254 Ok(task_ids)
1255 }
1256
1257 async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>> {
1258 if count == 0 {
1259 return Ok(Vec::new());
1260 }
1261
1262 let mut conn = self
1263 .client
1264 .get_multiplexed_async_connection()
1265 .await
1266 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1267
1268 let mut messages = Vec::with_capacity(count);
1269
1270 match self.mode {
1271 QueueMode::Fifo => {
1272 let mut pipe = redis::pipe();
1274 for _ in 0..count {
1275 pipe.rpoplpush(&self.queue_name, &self.processing_queue);
1276 }
1277
1278 let results: Vec<Option<String>> = pipe
1279 .query_async(&mut conn)
1280 .await
1281 .map_err(|e| CelersError::Broker(format!("Failed to dequeue batch: {}", e)))?;
1282
1283 for data_opt in results {
1284 if let Some(data) = data_opt {
1285 let task: SerializedTask = serde_json::from_str(&data)
1286 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1287
1288 messages.push(BrokerMessage {
1289 task,
1290 receipt_handle: Some(data),
1291 });
1292 } else {
1293 break; }
1295 }
1296 }
1297 QueueMode::Priority => {
1298 let items: Vec<(String, f64)> = conn
1300 .zpopmin(&self.queue_name, count as isize)
1301 .await
1302 .map_err(|e| CelersError::Broker(format!("Failed to dequeue batch: {}", e)))?;
1303
1304 if !items.is_empty() {
1305 let mut pipe = redis::pipe();
1307 for (data, _score) in &items {
1308 pipe.lpush(&self.processing_queue, data);
1309 }
1310
1311 pipe.query_async::<redis::Value>(&mut conn)
1312 .await
1313 .map_err(|e| {
1314 CelersError::Broker(format!(
1315 "Failed to move batch to processing: {}",
1316 e
1317 ))
1318 })?;
1319
1320 for (data, _score) in items {
1321 let task: SerializedTask = serde_json::from_str(&data)
1322 .map_err(|e| CelersError::Deserialization(e.to_string()))?;
1323
1324 messages.push(BrokerMessage {
1325 task,
1326 receipt_handle: Some(data),
1327 });
1328 }
1329 }
1330 }
1331 }
1332
1333 debug!(
1334 "Dequeued batch of {} tasks from queue {}",
1335 messages.len(),
1336 self.queue_name
1337 );
1338
1339 #[cfg(feature = "metrics")]
1340 if !messages.is_empty() {
1341 use celers_metrics::{BATCH_DEQUEUE_TOTAL, BATCH_SIZE};
1342 BATCH_DEQUEUE_TOTAL.inc();
1343 BATCH_SIZE.observe(messages.len() as f64);
1344 }
1345
1346 Ok(messages)
1347 }
1348
1349 async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()> {
1350 if tasks.is_empty() {
1351 return Ok(());
1352 }
1353
1354 let mut conn = self
1355 .client
1356 .get_multiplexed_async_connection()
1357 .await
1358 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1359
1360 let mut pipe = redis::pipe();
1361 let mut ack_count = 0;
1362
1363 for (task_id, receipt_handle) in tasks {
1364 if let Some(handle) = receipt_handle {
1365 pipe.lrem(&self.processing_queue, 1, handle);
1366 ack_count += 1;
1367 } else {
1368 warn!("No receipt handle for task {}, skipping ack", task_id);
1369 }
1370 }
1371
1372 if ack_count > 0 {
1373 pipe.query_async::<redis::Value>(&mut conn)
1374 .await
1375 .map_err(|e| CelersError::Broker(format!("Failed to ack batch: {}", e)))?;
1376
1377 debug!("Acknowledged batch of {} tasks", ack_count);
1378 }
1379
1380 Ok(())
1381 }
1382
1383 async fn enqueue_at(&self, task: SerializedTask, execute_at: i64) -> Result<TaskId> {
1386 let mut conn = self
1387 .client
1388 .get_multiplexed_async_connection()
1389 .await
1390 .map_err(|e| CelersError::Broker(format!("Failed to get connection: {}", e)))?;
1391
1392 let task_id = task.metadata.id;
1393 let serialized =
1394 serde_json::to_string(&task).map_err(|e| CelersError::Serialization(e.to_string()))?;
1395
1396 conn.zadd::<_, _, _, ()>(&self.delayed_queue, &serialized, execute_at as f64)
1398 .await
1399 .map_err(|e| CelersError::Broker(format!("Failed to schedule delayed task: {}", e)))?;
1400
1401 debug!(
1402 "Scheduled task {} for execution at Unix timestamp {} (queue: {})",
1403 task_id, execute_at, self.delayed_queue
1404 );
1405
1406 #[cfg(feature = "metrics")]
1407 {
1408 TASKS_ENQUEUED_TOTAL.inc();
1409
1410 let task_name = &task.metadata.name;
1412 TASKS_ENQUEUED_BY_TYPE.with_label_values(&[task_name]).inc();
1413 }
1414
1415 Ok(task_id)
1416 }
1417
1418 async fn enqueue_after(&self, task: SerializedTask, delay_secs: u64) -> Result<TaskId> {
1419 let now = std::time::SystemTime::now()
1420 .duration_since(std::time::UNIX_EPOCH)
1421 .map_err(|e| CelersError::Other(format!("Failed to get current time: {}", e)))?
1422 .as_secs() as i64;
1423
1424 let execute_at = now + delay_secs as i64;
1425 self.enqueue_at(task, execute_at).await
1426 }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431 use super::*;
1432
1433 #[test]
1434 fn test_queue_mode_is_fifo() {
1435 assert!(QueueMode::Fifo.is_fifo());
1436 assert!(!QueueMode::Priority.is_fifo());
1437 }
1438
1439 #[test]
1440 fn test_queue_mode_is_priority() {
1441 assert!(!QueueMode::Fifo.is_priority());
1442 assert!(QueueMode::Priority.is_priority());
1443 }
1444
1445 #[test]
1446 fn test_queue_mode_display() {
1447 assert_eq!(QueueMode::Fifo.to_string(), "FIFO");
1448 assert_eq!(QueueMode::Priority.to_string(), "Priority");
1449 }
1450
1451 #[test]
1452 fn test_redis_broker_new() {
1453 let broker = RedisBroker::new("redis://localhost:6379", "test_queue");
1454 assert!(broker.is_ok());
1455
1456 let broker = broker.unwrap();
1457 assert_eq!(broker.queue_name(), "test_queue");
1458 assert_eq!(broker.mode(), QueueMode::Fifo);
1459 }
1460
1461 #[test]
1462 fn test_redis_broker_with_mode() {
1463 let broker =
1464 RedisBroker::with_mode("redis://localhost:6379", "test_queue", QueueMode::Priority);
1465 assert!(broker.is_ok());
1466
1467 let broker = broker.unwrap();
1468 assert_eq!(broker.mode(), QueueMode::Priority);
1469 }
1470
1471 #[test]
1472 fn test_redis_broker_with_visibility_timeout() {
1473 let broker = RedisBroker::new("redis://localhost:6379", "test_queue")
1474 .unwrap()
1475 .with_visibility_timeout(600);
1476
1477 assert_eq!(broker.visibility_timeout(), 600);
1478 }
1479
1480 #[test]
1481 fn test_queue_names() {
1482 let broker = RedisBroker::new("redis://localhost:6379", "my_queue").unwrap();
1483
1484 assert_eq!(broker.queue_name(), "my_queue");
1485 assert_eq!(broker.processing_queue_name(), "my_queue:processing");
1486 assert_eq!(broker.dlq_name(), "my_queue:dlq");
1487 assert_eq!(broker.delayed_queue_name(), "my_queue:delayed");
1488 assert_eq!(broker.cancel_channel(), "my_queue:cancel");
1489 }
1490
1491 #[test]
1492 fn test_broker_accessors() {
1493 let broker = RedisBroker::new("redis://localhost:6379", "test_queue").unwrap();
1494
1495 let _ = broker.health_checker();
1497
1498 let _ = broker.queue_controller();
1500
1501 let _ = broker.deduplicator();
1503
1504 let _ = broker.client();
1506 }
1507
1508 #[test]
1509 fn test_invalid_redis_url() {
1510 let result = RedisBroker::new("invalid://not-a-redis-url", "test_queue");
1511 assert!(result.is_err());
1512 }
1513}