rust_task_queue/
lib.rs

1//! # Rust Task Queue Framework
2//!
3//! A high-performance, Redis-backed task queue system with auto-scaling capabilities
4//! designed for use with async Rust applications.
5//!
6//! ## Features
7//!
8//! - **Redis-backed broker** for reliable message delivery
9//! - **Auto-scaling workers** based on queue load
10//! - **Task scheduling** with delay support
11//! - **Multiple queue priorities**
12//! - **Retry logic** with configurable attempts
13//! - **Task timeouts** and failure handling
14//! - **Metrics and monitoring**
15//! - **Actix Web integration** (optional)
16//! - **Automatic task registration** (optional)
17//!
18//! ## Quick Start
19//!
20//! ### Basic Usage
21//!
22//! ```rust,no_run
23//! use rust_task_queue::prelude::*;
24//! use serde::{Deserialize, Serialize};
25//!
26//! #[derive(Debug, Serialize, Deserialize, AutoRegisterTask)]
27//! struct MyTask {
28//!     data: String,
29//! }
30//!
31//! #[async_trait::async_trait]
32//! impl Task for MyTask {
33//!     async fn execute(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
34//!         println!("Processing: {}", self.data);
35//!         use serde::Serialize;
36//!         #[derive(Serialize)]
37//!         struct Response { status: String }
38//!         let response = Response { status: "completed".to_string() };
39//!         Ok(rmp_serde::to_vec(&response)?)
40//!     }
41//!     
42//!     fn name(&self) -> &str {
43//!         "my_task"
44//!     }
45//! }
46//!
47//! #[tokio::main]
48//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
49//!     let task_queue = TaskQueue::new("redis://localhost:6379").await?;
50//!     
51//!     // Start workers
52//!     task_queue.start_workers(2).await?;
53//!     
54//!     // Enqueue a task
55//!     let task = MyTask { data: "Hello, World!".to_string() };
56//!     let task_id = task_queue.enqueue(task, "default").await?;
57//!     
58//!     println!("Enqueued task: {}", task_id);
59//!     Ok(())
60//! }
61//! ```
62//!
63//! ### Automatic Task Registration
64//!
65//! ```rust,no_run
66//! use rust_task_queue::prelude::*;
67//! use serde::{Deserialize, Serialize};
68//!
69//! #[derive(Debug, Serialize, Deserialize, Default, AutoRegisterTask)]
70//! struct MyTask {
71//!     data: String,
72//! }
73//!
74//! #[async_trait::async_trait]
75//! impl Task for MyTask {
76//!     async fn execute(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
77//!         println!("Processing: {}", self.data);
78//!         use serde::Serialize;
79//!         #[derive(Serialize)]
80//!         struct Response { status: String }
81//!         let response = Response { status: "completed".to_string() };
82//!         Ok(rmp_serde::to_vec(&response)?)
83//!     }
84//!     
85//!     fn name(&self) -> &str {
86//!         "my_task"
87//!     }
88//! }
89//!
90//! #[tokio::main]
91//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
92//!     // Task is automatically registered!
93//!     let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
94//!         .auto_register_tasks()
95//!         .initial_workers(2)
96//!         .build()
97//!         .await?;
98//!     
99//!     let task = MyTask { data: "Hello, World!".to_string() };
100//!     let task_id = task_queue.enqueue(task, "default").await?;
101//!     
102//!     println!("Enqueued task: {}", task_id);
103//!     Ok(())
104//! }
105//! ```
106
107#![cfg_attr(docsrs, feature(doc_cfg))]
108
109pub mod autoscaler;
110pub mod broker;
111pub mod config;
112pub mod error;
113pub mod metrics;
114pub mod queue;
115pub mod scheduler;
116pub mod task;
117pub mod worker;
118
119#[cfg(feature = "tracing")]
120pub mod tracing_utils;
121
122#[cfg(feature = "actix-integration")]
123#[cfg_attr(docsrs, doc(cfg(feature = "actix-integration")))]
124pub mod actix;
125
126#[cfg(feature = "axum-integration")]
127#[cfg_attr(docsrs, doc(cfg(feature = "axum-integration")))]
128pub mod axum;
129
130#[cfg(feature = "cli")]
131#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
132pub mod cli;
133
134pub use autoscaler::*;
135pub use broker::*;
136#[cfg(feature = "cli")]
137pub use cli::*;
138pub use config::*;
139pub use error::*;
140pub use metrics::*;
141pub use queue::*;
142pub use scheduler::*;
143pub use task::*;
144pub use worker::*;
145
146// Re-export macros when auto-register feature is enabled
147#[cfg(feature = "auto-register")]
148pub use rust_task_queue_macro::{register_task as proc_register_task, AutoRegisterTask};
149
150// Provide placeholder when auto-register is disabled
151#[cfg(not(feature = "auto-register"))]
152pub use std::marker::PhantomData as AutoRegisterTask;
153#[cfg(not(feature = "auto-register"))]
154pub use std::marker::PhantomData as proc_register_task;
155
156// Re-export inventory for users who want to use it directly
157#[cfg(feature = "auto-register")]
158#[cfg_attr(docsrs, doc(cfg(feature = "auto-register")))]
159pub use inventory;
160
161pub mod prelude;
162
163use serde::{Deserialize, Serialize};
164use std::sync::Arc;
165use tokio::sync::RwLock;
166
167/// Main task queue framework
168#[derive(Clone)]
169pub struct TaskQueue {
170    pub broker: Arc<RedisBroker>,
171    pub scheduler: Arc<TaskScheduler>,
172    pub autoscaler: Arc<AutoScaler>,
173    pub metrics: Arc<MetricsCollector>,
174    workers: Arc<RwLock<Vec<Worker>>>,
175    scheduler_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
176    shutdown_signal: Arc<RwLock<Option<tokio::sync::broadcast::Sender<()>>>>,
177}
178
179impl TaskQueue {
180    /// Create a new task queue instance
181    pub async fn new(redis_url: &str) -> Result<Self, TaskQueueError> {
182        let broker = Arc::new(RedisBroker::new(redis_url).await?);
183        let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
184        let autoscaler = Arc::new(AutoScaler::new(broker.clone()));
185        let metrics = Arc::new(MetricsCollector::new());
186
187        Ok(Self {
188            broker,
189            scheduler,
190            autoscaler,
191            metrics,
192            workers: Arc::new(RwLock::new(Vec::new())),
193            scheduler_handle: Arc::new(RwLock::new(None)),
194            shutdown_signal: Arc::new(RwLock::new(None)),
195        })
196    }
197
198    /// Create a new task queue with custom auto-scaler configuration
199    pub async fn with_autoscaler_config(
200        redis_url: &str,
201        autoscaler_config: AutoScalerConfig,
202    ) -> Result<Self, TaskQueueError> {
203        let broker = Arc::new(RedisBroker::new(redis_url).await?);
204        let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
205        let autoscaler = Arc::new(AutoScaler::with_config(broker.clone(), autoscaler_config));
206        let metrics = Arc::new(MetricsCollector::new());
207
208        Ok(Self {
209            broker,
210            scheduler,
211            autoscaler,
212            metrics,
213            workers: Arc::new(RwLock::new(Vec::new())),
214            scheduler_handle: Arc::new(RwLock::new(None)),
215            shutdown_signal: Arc::new(RwLock::new(None)),
216        })
217    }
218
219    /// Start the specified number of workers with auto-registered tasks (if available)
220    pub async fn start_workers(&self, initial_count: usize) -> Result<(), TaskQueueError> {
221        // Try to use auto-registered tasks if the feature is enabled, otherwise use empty registry
222        let registry = {
223            #[cfg(feature = "auto-register")]
224            {
225                // Check if there's a global config that enables auto-registration
226                if let Ok(config) = TaskQueueConfig::get_or_init() {
227                    if config.auto_register.enabled {
228                        match TaskRegistry::with_auto_registered_and_config(Some(
229                            &config.auto_register,
230                        )) {
231                            Ok(reg) => Arc::new(reg),
232                            Err(e) => {
233                                #[cfg(feature = "tracing")]
234                                tracing::warn!("Failed to create auto-registered task registry: {}, using empty registry", e);
235                                Arc::new(TaskRegistry::new())
236                            }
237                        }
238                    } else {
239                        Arc::new(TaskRegistry::new())
240                    }
241                } else {
242                    // Try to create auto-registered registry anyway (fallback for backward compatibility)
243                    match TaskRegistry::with_auto_registered() {
244                        Ok(reg) => Arc::new(reg),
245                        Err(_) => Arc::new(TaskRegistry::new()),
246                    }
247                }
248            }
249            #[cfg(not(feature = "auto-register"))]
250            {
251                Arc::new(TaskRegistry::new())
252            }
253        };
254
255        self.start_workers_with_registry(initial_count, registry)
256            .await
257    }
258
259    /// Start workers with a custom task registry
260    pub async fn start_workers_with_registry(
261        &self,
262        initial_count: usize,
263        task_registry: Arc<TaskRegistry>,
264    ) -> Result<(), TaskQueueError> {
265        let mut workers = self.workers.write().await;
266
267        for i in 0..initial_count {
268            let worker = Worker::new(
269                format!("worker-{}", i),
270                self.broker.clone(),
271                self.scheduler.clone(),
272            )
273            .with_task_registry(task_registry.clone());
274
275            let worker_handle = worker.start().await?;
276            workers.push(worker_handle);
277        }
278
279        #[cfg(feature = "tracing")]
280        tracing::info!(
281            "Started {} workers with task registry containing {} task types",
282            initial_count,
283            task_registry.registered_tasks().len()
284        );
285
286        Ok(())
287    }
288
289    /// Start the auto-scaler background process
290    pub fn start_autoscaler(&self) -> Result<(), TaskQueueError> {
291        let workers = self.workers.clone();
292        let autoscaler = self.autoscaler.clone();
293        let broker = self.broker.clone();
294        let scheduler = self.scheduler.clone();
295
296        tokio::spawn(async move {
297            loop {
298                tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
299
300                if let Ok(metrics) = autoscaler.collect_metrics().await {
301                    // Clone autoscaler for scaling decision since it requires mutable access
302                    if let Ok(action) = {
303                        let mut autoscaler_clone = (*autoscaler).clone();
304                        autoscaler_clone.decide_scaling_action(&metrics)
305                    } {
306                        match action {
307                            ScalingAction::ScaleUp(count) => {
308                                let mut workers_lock = workers.write().await;
309                                let current_count = workers_lock.len();
310
311                                for i in current_count..current_count + count {
312                                    let worker = Worker::new(
313                                        format!("worker-{}", i),
314                                        broker.clone(),
315                                        scheduler.clone(),
316                                    );
317
318                                    if let Ok(worker_handle) = worker.start().await {
319                                        workers_lock.push(worker_handle);
320                                        #[cfg(feature = "tracing")]
321                                        tracing::info!("Scaled up: added worker-{}", i);
322                                    }
323                                }
324                            }
325                            ScalingAction::ScaleDown(count) => {
326                                let mut workers_lock = workers.write().await;
327                                for _ in 0..count {
328                                    if let Some(worker) = workers_lock.pop() {
329                                        worker.stop().await;
330                                        #[cfg(feature = "tracing")]
331                                        tracing::info!("Scaled down: removed worker");
332                                    }
333                                }
334                            }
335                            ScalingAction::NoAction => {}
336                        }
337                    }
338                }
339            }
340        });
341
342        #[cfg(feature = "tracing")]
343        tracing::info!("Started auto-scaler");
344
345        Ok(())
346    }
347
348    /// Start the task scheduler background process
349    pub async fn start_scheduler(&self) -> Result<(), TaskQueueError> {
350        let mut handle_guard = self.scheduler_handle.write().await;
351
352        // Don't start if already running
353        if handle_guard.is_some() {
354            return Ok(());
355        }
356
357        let handle = TaskScheduler::start(self.broker.clone())?;
358        *handle_guard = Some(handle);
359
360        Ok(())
361    }
362
363    /// Stop the task scheduler
364    pub async fn stop_scheduler(&self) {
365        let mut handle_guard = self.scheduler_handle.write().await;
366        if let Some(handle) = handle_guard.take() {
367            handle.abort();
368
369            #[cfg(feature = "tracing")]
370            tracing::info!("Task scheduler stopped");
371        }
372    }
373
374    /// Enqueue a task for immediate execution
375    #[inline]
376    pub async fn enqueue<T: Task>(&self, task: T, queue: &str) -> Result<TaskId, TaskQueueError> {
377        self.broker.enqueue_task(task, queue).await
378    }
379
380    /// Schedule a task for delayed execution
381    #[inline]
382    pub async fn schedule<T: Task>(
383        &self,
384        task: T,
385        queue: &str,
386        delay: chrono::Duration,
387    ) -> Result<TaskId, TaskQueueError> {
388        self.scheduler.schedule_task(task, queue, delay).await
389    }
390
391    /// Get the current number of active workers
392    #[inline]
393    pub async fn worker_count(&self) -> usize {
394        self.workers.read().await.len()
395    }
396
397    /// Stop all workers
398    pub async fn stop_workers(&self) {
399        let mut workers = self.workers.write().await;
400        while let Some(worker) = workers.pop() {
401            worker.stop().await;
402        }
403
404        #[cfg(feature = "tracing")]
405        tracing::info!("All workers stopped");
406    }
407
408    /// Initiate graceful shutdown of the entire task queue
409    pub async fn shutdown(&self) -> Result<(), TaskQueueError> {
410        self.shutdown_with_timeout(std::time::Duration::from_secs(30))
411            .await
412    }
413
414    /// Shutdown with a custom timeout
415    pub async fn shutdown_with_timeout(
416        &self,
417        timeout: std::time::Duration,
418    ) -> Result<(), TaskQueueError> {
419        #[cfg(feature = "tracing")]
420        tracing::info!("Initiating graceful shutdown of TaskQueue...");
421
422        // Create shutdown signal
423        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
424        {
425            let mut signal = self.shutdown_signal.write().await;
426            *signal = Some(shutdown_tx.clone());
427        }
428
429        // Shutdown components in order
430        let shutdown_future = async {
431            // 1. Stop accepting new tasks (stop scheduler)
432            self.stop_scheduler().await;
433            #[cfg(feature = "tracing")]
434            tracing::info!("Scheduler stopped");
435
436            // 2. Stop autoscaler
437            // (Autoscaler doesn't have a stop method in current implementation)
438            #[cfg(feature = "tracing")]
439            tracing::info!("Autoscaler stopped");
440
441            // 3. Wait for running tasks to complete and stop workers
442            self.stop_workers().await;
443            #[cfg(feature = "tracing")]
444            tracing::info!("All workers stopped");
445
446            // 4. Broadcast shutdown signal
447            if let Err(e) = shutdown_tx.send(()) {
448                #[cfg(feature = "tracing")]
449                tracing::warn!("Failed to send shutdown signal: {}", e);
450            }
451
452            #[cfg(feature = "tracing")]
453            tracing::info!("TaskQueue shutdown completed successfully");
454
455            Ok(())
456        };
457
458        // Apply timeout
459        match tokio::time::timeout(timeout, shutdown_future).await {
460            Ok(result) => result,
461            Err(_) => {
462                #[cfg(feature = "tracing")]
463                tracing::error!("TaskQueue shutdown timed out after {:?}", timeout);
464                Err(TaskQueueError::Worker("Shutdown timeout".to_string()))
465            }
466        }
467    }
468
469    /// Check if shutdown has been initiated
470    pub async fn is_shutting_down(&self) -> bool {
471        self.shutdown_signal.read().await.is_some()
472    }
473
474    /// Get a shutdown signal receiver for external components
475    pub async fn shutdown_receiver(&self) -> Option<tokio::sync::broadcast::Receiver<()>> {
476        self.shutdown_signal
477            .read()
478            .await
479            .as_ref()
480            .map(|tx| tx.subscribe())
481    }
482
483    /// Get comprehensive health status
484    pub async fn health_check(&self) -> Result<HealthStatus, TaskQueueError> {
485        let mut health = HealthStatus {
486            status: "healthy".to_string(),
487            timestamp: chrono::Utc::now(),
488            components: std::collections::HashMap::new(),
489        };
490
491        // Check Redis connection
492        match self.broker.pool.get().await {
493            Ok(mut conn) => {
494                match redis::cmd("PING")
495                    .query_async::<_, String>(&mut *conn)
496                    .await
497                {
498                    Ok(_) => {
499                        health.components.insert(
500                            "redis".to_string(),
501                            ComponentHealth {
502                                status: "healthy".to_string(),
503                                message: Some("Connection successful".to_string()),
504                            },
505                        );
506                    }
507                    Err(e) => {
508                        health.status = "unhealthy".to_string();
509                        health.components.insert(
510                            "redis".to_string(),
511                            ComponentHealth {
512                                status: "unhealthy".to_string(),
513                                message: Some(format!("Ping failed: {}", e)),
514                            },
515                        );
516                    }
517                }
518            }
519            Err(e) => {
520                health.status = "unhealthy".to_string();
521                health.components.insert(
522                    "redis".to_string(),
523                    ComponentHealth {
524                        status: "unhealthy".to_string(),
525                        message: Some(format!("Connection failed: {}", e)),
526                    },
527                );
528            }
529        }
530
531        // Check workers
532        let worker_count = self.worker_count().await;
533        health.components.insert(
534            "workers".to_string(),
535            ComponentHealth {
536                status: if worker_count > 0 {
537                    "healthy"
538                } else {
539                    "warning"
540                }
541                .to_string(),
542                message: Some(format!("{} active workers", worker_count)),
543            },
544        );
545
546        // Check scheduler
547        let scheduler_running = self.scheduler_handle.read().await.is_some();
548        health.components.insert(
549            "scheduler".to_string(),
550            ComponentHealth {
551                status: if scheduler_running {
552                    "healthy"
553                } else {
554                    "stopped"
555                }
556                .to_string(),
557                message: Some(
558                    if scheduler_running {
559                        "Running"
560                    } else {
561                        "Stopped"
562                    }
563                    .to_string(),
564                ),
565            },
566        );
567
568        Ok(health)
569    }
570
571    /// Get comprehensive metrics
572    pub async fn get_metrics(&self) -> Result<TaskQueueMetrics, TaskQueueError> {
573        let queues = ["default", "high_priority", "low_priority"];
574        let mut queue_metrics = Vec::new();
575        let mut total_pending = 0;
576        let mut total_processed = 0;
577        let mut total_failed = 0;
578
579        for queue in &queues {
580            let metrics = self.broker.get_queue_metrics(queue).await?;
581            total_pending += metrics.pending_tasks;
582            total_processed += metrics.processed_tasks;
583            total_failed += metrics.failed_tasks;
584            queue_metrics.push(metrics);
585        }
586
587        let autoscaler_metrics = self.autoscaler.collect_metrics().await?;
588
589        Ok(TaskQueueMetrics {
590            timestamp: chrono::Utc::now(),
591            total_pending_tasks: total_pending,
592            total_processed_tasks: total_processed,
593            total_failed_tasks: total_failed,
594            active_workers: autoscaler_metrics.active_workers,
595            tasks_per_worker: autoscaler_metrics.queue_pressure_score,
596            queue_metrics,
597        })
598    }
599
600    /// Get enhanced system metrics with memory and performance data
601    pub async fn get_system_metrics(&self) -> SystemMetrics {
602        self.metrics.get_system_metrics().await
603    }
604
605    /// Get a quick metrics summary for debugging
606    pub async fn get_metrics_summary(&self) -> String {
607        self.metrics.get_metrics_summary().await
608    }
609}
610
611/// Builder for configuring and creating TaskQueue instances
612pub struct TaskQueueBuilder {
613    config: TaskQueueConfig,
614    task_registry: Option<Arc<TaskRegistry>>,
615    override_redis_url: Option<String>,
616}
617
618impl TaskQueueBuilder {
619    /// Create a new TaskQueue builder with default configuration
620    #[inline]
621    pub fn new(redis_url: impl Into<String>) -> Self {
622        let mut config = TaskQueueConfig::default();
623        config.redis.url = redis_url.into();
624
625        Self {
626            config,
627            task_registry: None,
628            override_redis_url: None,
629        }
630    }
631
632    /// Create a TaskQueue builder from global configuration
633    #[inline]
634    pub fn from_global_config() -> Result<Self, TaskQueueError> {
635        let config = TaskQueueConfig::get_or_init()?.clone();
636        Ok(Self {
637            config,
638            task_registry: None,
639            override_redis_url: None,
640        })
641    }
642
643    /// Create a TaskQueue builder from a specific configuration
644    #[must_use]
645    #[inline]
646    pub fn from_config(config: TaskQueueConfig) -> Self {
647        Self {
648            config,
649            task_registry: None,
650            override_redis_url: None,
651        }
652    }
653
654    /// Create a TaskQueue builder that auto-loads configuration
655    #[inline]
656    pub fn auto() -> Result<Self, TaskQueueError> {
657        let config = TaskQueueConfig::load()?;
658        Ok(Self {
659            config,
660            task_registry: None,
661            override_redis_url: None,
662        })
663    }
664
665    /// Override Redis URL (useful for testing or special cases)
666    #[must_use]
667    #[inline]
668    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
669        self.override_redis_url = Some(url.into());
670        self
671    }
672
673    /// Set the auto-scaler configuration
674    #[must_use]
675    #[inline]
676    pub fn autoscaler_config(mut self, config: AutoScalerConfig) -> Self {
677        self.config.autoscaler = config;
678        self
679    }
680
681    /// Set the initial number of workers to start
682    #[must_use]
683    #[inline]
684    pub fn initial_workers(mut self, count: usize) -> Self {
685        self.config.workers.initial_count = count;
686        self
687    }
688
689    /// Set a custom task registry
690    #[must_use]
691    #[inline]
692    pub fn task_registry(mut self, registry: Arc<TaskRegistry>) -> Self {
693        self.task_registry = Some(registry);
694        self
695    }
696
697    /// Enable automatic task registration using inventory pattern
698    #[cfg(feature = "auto-register")]
699    #[must_use]
700    #[inline]
701    pub fn auto_register_tasks(mut self) -> Self {
702        self.config.auto_register.enabled = true;
703        self
704    }
705
706    /// Disable automatic task registration
707    #[cfg(feature = "auto-register")]
708    #[must_use]
709    #[inline]
710    pub fn disable_auto_register(mut self) -> Self {
711        self.config.auto_register.enabled = false;
712        self
713    }
714
715    /// Start the task scheduler automatically
716    #[must_use]
717    #[inline]
718    pub fn with_scheduler(mut self) -> Self {
719        self.config.scheduler.enabled = true;
720        self
721    }
722
723    /// Start the auto-scaler automatically
724    #[must_use]
725    #[inline]
726    pub fn with_autoscaler(self) -> Self {
727        // Auto-scaler is always available, we just don't start it by default
728        // This is a no-op now since we determine if to start it during build
729        self
730    }
731
732    /// Update the full configuration
733    #[must_use]
734    #[inline]
735    pub fn config(mut self, config: TaskQueueConfig) -> Self {
736        self.config = config;
737        self
738    }
739
740    /// Build the `TaskQueue` instance
741    ///
742    /// # Errors
743    /// Returns `TaskQueueError` if Redis connection fails or configuration is invalid
744    #[inline]
745    pub async fn build(self) -> Result<TaskQueue, TaskQueueError> {
746        // Use override Redis URL if provided, otherwise use config
747        let redis_url = self
748            .override_redis_url
749            .as_ref()
750            .unwrap_or(&self.config.redis.url);
751
752        // Create TaskQueue with autoscaler config
753        let task_queue =
754            TaskQueue::with_autoscaler_config(redis_url, self.config.autoscaler.clone()).await?;
755
756        // Handle task registry setup
757        let registry = if self.config.auto_register.enabled {
758            #[cfg(feature = "auto-register")]
759            {
760                if let Some(custom_registry) = self.task_registry {
761                    // Auto-register tasks into the provided registry using configuration
762                    custom_registry
763                        .auto_register_tasks_with_config(Some(&self.config.auto_register))
764                        .map_err(|e| {
765                            TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
766                        })?;
767                    custom_registry
768                } else {
769                    // Create a new registry with auto-registered tasks using configuration
770                    Arc::new(
771                        TaskRegistry::with_auto_registered_and_config(Some(
772                            &self.config.auto_register,
773                        ))
774                        .map_err(|e| {
775                            TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
776                        })?,
777                    )
778                }
779            }
780            #[cfg(not(feature = "auto-register"))]
781            {
782                return Err(TaskQueueError::Configuration(
783                    "Auto-registration requested but 'auto-register' feature is not enabled"
784                        .to_string(),
785                ));
786            }
787        } else {
788            self.task_registry
789                .unwrap_or_else(|| Arc::new(TaskRegistry::new()))
790        };
791
792        // Start workers
793        let worker_count = self.config.workers.initial_count;
794        if worker_count > 0 {
795            task_queue
796                .start_workers_with_registry(worker_count, registry)
797                .await?;
798        }
799
800        // Start scheduler if enabled
801        if self.config.scheduler.enabled {
802            task_queue.start_scheduler().await?;
803        }
804
805        // Auto-scaler is always available but we start it based on configuration
806        // For now, we'll start it if there are specific auto-scaling settings
807        if self.config.autoscaler.max_workers > self.config.autoscaler.min_workers {
808            task_queue.start_autoscaler()?;
809        }
810
811        Ok(task_queue)
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use super::*;
818    // use tokio_test;
819
820    #[test]
821    fn test_autoscaler_config() {
822        let config = AutoScalerConfig::default();
823        assert_eq!(config.min_workers, 1);
824        assert_eq!(config.max_workers, 20);
825        assert_eq!(config.scaling_triggers.queue_pressure_threshold, 0.75);
826    }
827
828    #[test]
829    fn test_queue_manager() {
830        let manager = QueueManager::new();
831        let queues = manager.get_queue_names();
832
833        assert!(queues.contains(&"default".to_owned()));
834        assert!(queues.contains(&"high_priority".to_owned()));
835        assert!(queues.contains(&"low_priority".to_owned()));
836    }
837
838    #[test]
839    fn test_builder_pattern() {
840        let builder = TaskQueueBuilder::new("redis://localhost:6379")
841            .initial_workers(4)
842            .with_scheduler()
843            .with_autoscaler();
844
845        assert_eq!(builder.config.redis.url, "redis://localhost:6379");
846        assert_eq!(builder.config.workers.initial_count, 4);
847        assert!(builder.config.scheduler.enabled);
848    }
849}
850
851/// Health check status for monitoring
852#[derive(Debug, Clone, Serialize, Deserialize)]
853pub struct HealthStatus {
854    pub status: String,
855    pub timestamp: chrono::DateTime<chrono::Utc>,
856    pub components: std::collections::HashMap<String, ComponentHealth>,
857}
858
859#[derive(Debug, Clone, Serialize, Deserialize)]
860pub struct ComponentHealth {
861    pub status: String,
862    pub message: Option<String>,
863}
864
865/// Comprehensive metrics for monitoring
866#[derive(Debug, Clone, Serialize, Deserialize)]
867pub struct TaskQueueMetrics {
868    pub timestamp: chrono::DateTime<chrono::Utc>,
869    pub total_pending_tasks: i64,
870    pub total_processed_tasks: i64,
871    pub total_failed_tasks: i64,
872    pub active_workers: i64,
873    pub tasks_per_worker: f64,
874    pub queue_metrics: Vec<QueueMetrics>,
875}