rust_task_queue/
lib.rs

1//! # Rust Task Queue Framework
2//!
3//! A high-performance, Redis-backed task queue system with auto-scaling capabilities
4//! designed for use with async Rust applications.
5//!
6//! ## Features
7//!
8//! - **Redis-backed broker** for reliable message delivery
9//! - **Auto-scaling workers** based on queue load
10//! - **Task scheduling** with delay support
11//! - **Multiple queue priorities**
12//! - **Retry logic** with configurable attempts
13//! - **Task timeouts** and failure handling
14//! - **Metrics and monitoring**
15//! - **Actix Web integration** (optional)
16//! - **Automatic task registration** (optional)
17//!
18//! ## Quick Start
19//!
20//! ### Basic Usage
21//!
22//! ```rust,no_run
23//! use rust_task_queue::prelude::*;
24//! use serde::{Deserialize, Serialize};
25//!
26//! #[derive(Debug, Serialize, Deserialize)]
27//! struct MyTask {
28//!     data: String,
29//! }
30//!
31//! #[async_trait::async_trait]
32//! impl Task for MyTask {
33//!     async fn execute(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
34//!         println!("Processing: {}", self.data);
35//!         use serde::Serialize;
36//!         #[derive(Serialize)]
37//!         struct Response { status: String }
38//!         let response = Response { status: "completed".to_string() };
39//!         Ok(rmp_serde::to_vec(&response)?)
40//!     }
41//!     
42//!     fn name(&self) -> &str {
43//!         "my_task"
44//!     }
45//! }
46//!
47//! #[tokio::main]
48//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
49//!     let task_queue = TaskQueue::new("redis://localhost:6379").await?;
50//!     
51//!     // Start workers
52//!     task_queue.start_workers(2).await?;
53//!     
54//!     // Enqueue a task
55//!     let task = MyTask { data: "Hello, World!".to_string() };
56//!     let task_id = task_queue.enqueue(task, "default").await?;
57//!     
58//!     println!("Enqueued task: {}", task_id);
59//!     Ok(())
60//! }
61//! ```
62//!
63//! ### Automatic Task Registration
64//!
65//! ```rust,no_run
66//! use rust_task_queue::prelude::*;
67//! use serde::{Deserialize, Serialize};
68//!
69//! #[derive(Debug, Serialize, Deserialize, Default, AutoRegisterTask)]
70//! struct MyTask {
71//!     data: String,
72//! }
73//!
74//! #[async_trait::async_trait]
75//! impl Task for MyTask {
76//!     async fn execute(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
77//!         println!("Processing: {}", self.data);
78//!         use serde::Serialize;
79//!         #[derive(Serialize)]
80//!         struct Response { status: String }
81//!         let response = Response { status: "completed".to_string() };
82//!         Ok(rmp_serde::to_vec(&response)?)
83//!     }
84//!     
85//!     fn name(&self) -> &str {
86//!         "my_task"
87//!     }
88//! }
89//!
90//! #[tokio::main]
91//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
92//!     // Task is automatically registered!
93//!     let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
94//!         .auto_register_tasks()
95//!         .initial_workers(2)
96//!         .build()
97//!         .await?;
98//!     
99//!     let task = MyTask { data: "Hello, World!".to_string() };
100//!     let task_id = task_queue.enqueue(task, "default").await?;
101//!     
102//!     println!("Enqueued task: {}", task_id);
103//!     Ok(())
104//! }
105//! ```
106
107#![cfg_attr(docsrs, feature(doc_cfg))]
108
109pub mod autoscaler;
110pub mod broker;
111pub mod config;
112pub mod error;
113pub mod metrics;
114pub mod queue;
115pub mod scheduler;
116pub mod task;
117pub mod worker;
118
119#[cfg(feature = "tracing")]
120pub mod tracing_utils;
121
122#[cfg(feature = "actix-integration")]
123#[cfg_attr(docsrs, doc(cfg(feature = "actix-integration")))]
124pub mod actix;
125
126#[cfg(feature = "cli")]
127#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
128pub mod cli;
129
130pub use autoscaler::*;
131pub use broker::*;
132#[cfg(feature = "cli")]
133pub use cli::*;
134pub use config::*;
135pub use error::*;
136pub use metrics::*;
137pub use queue::*;
138pub use scheduler::*;
139pub use task::*;
140pub use worker::*;
141
142// Re-export macros when auto-register feature is enabled
143// #[cfg(feature = "auto-register")]
144// pub use rust_task_queue_macros::{register_task as proc_register_task, AutoRegisterTask};
145
146// Provide placeholder when auto-register is disabled
147// #[cfg(not(feature = "auto-register"))]
148// pub use std::marker::PhantomData as AutoRegisterTask;
149// #[cfg(not(feature = "auto-register"))]
150// pub use std::marker::PhantomData as proc_register_task;
151
152// Re-export inventory for users who want to use it directly
153#[cfg(feature = "auto-register")]
154#[cfg_attr(docsrs, doc(cfg(feature = "auto-register")))]
155pub use inventory;
156
157pub mod prelude;
158
159use serde::{Deserialize, Serialize};
160use std::sync::Arc;
161use tokio::sync::RwLock;
162
163/// Main task queue framework
164#[derive(Clone)]
165pub struct TaskQueue {
166    pub broker: Arc<RedisBroker>,
167    pub scheduler: Arc<TaskScheduler>,
168    pub autoscaler: Arc<AutoScaler>,
169    pub metrics: Arc<MetricsCollector>,
170    workers: Arc<RwLock<Vec<Worker>>>,
171    scheduler_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
172    shutdown_signal: Arc<RwLock<Option<tokio::sync::broadcast::Sender<()>>>>,
173}
174
175impl TaskQueue {
176    /// Create a new task queue instance
177    pub async fn new(redis_url: &str) -> Result<Self, TaskQueueError> {
178        let broker = Arc::new(RedisBroker::new(redis_url).await?);
179        let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
180        let autoscaler = Arc::new(AutoScaler::new(broker.clone()));
181        let metrics = Arc::new(MetricsCollector::new());
182
183        Ok(Self {
184            broker,
185            scheduler,
186            autoscaler,
187            metrics,
188            workers: Arc::new(RwLock::new(Vec::new())),
189            scheduler_handle: Arc::new(RwLock::new(None)),
190            shutdown_signal: Arc::new(RwLock::new(None)),
191        })
192    }
193
194    /// Create a new task queue with custom auto-scaler configuration
195    pub async fn with_autoscaler_config(
196        redis_url: &str,
197        autoscaler_config: AutoScalerConfig,
198    ) -> Result<Self, TaskQueueError> {
199        let broker = Arc::new(RedisBroker::new(redis_url).await?);
200        let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
201        let autoscaler = Arc::new(AutoScaler::with_config(broker.clone(), autoscaler_config));
202        let metrics = Arc::new(MetricsCollector::new());
203
204        Ok(Self {
205            broker,
206            scheduler,
207            autoscaler,
208            metrics,
209            workers: Arc::new(RwLock::new(Vec::new())),
210            scheduler_handle: Arc::new(RwLock::new(None)),
211            shutdown_signal: Arc::new(RwLock::new(None)),
212        })
213    }
214
215    /// Start the specified number of workers with auto-registered tasks (if available)
216    pub async fn start_workers(&self, initial_count: usize) -> Result<(), TaskQueueError> {
217        // Try to use auto-registered tasks if the feature is enabled, otherwise use empty registry
218        let registry = {
219            #[cfg(feature = "auto-register")]
220            {
221                // Check if there's a global config that enables auto-registration
222                if let Ok(config) = TaskQueueConfig::get_or_init() {
223                    if config.auto_register.enabled {
224                        match TaskRegistry::with_auto_registered_and_config(Some(
225                            &config.auto_register,
226                        )) {
227                            Ok(reg) => Arc::new(reg),
228                            Err(e) => {
229                                #[cfg(feature = "tracing")]
230                                tracing::warn!("Failed to create auto-registered task registry: {}, using empty registry", e);
231                                Arc::new(TaskRegistry::new())
232                            }
233                        }
234                    } else {
235                        Arc::new(TaskRegistry::new())
236                    }
237                } else {
238                    // Try to create auto-registered registry anyway (fallback for backward compatibility)
239                    match TaskRegistry::with_auto_registered() {
240                        Ok(reg) => Arc::new(reg),
241                        Err(_) => Arc::new(TaskRegistry::new()),
242                    }
243                }
244            }
245            #[cfg(not(feature = "auto-register"))]
246            {
247                Arc::new(TaskRegistry::new())
248            }
249        };
250
251        self.start_workers_with_registry(initial_count, registry)
252            .await
253    }
254
255    /// Start workers with a custom task registry
256    pub async fn start_workers_with_registry(
257        &self,
258        initial_count: usize,
259        task_registry: Arc<TaskRegistry>,
260    ) -> Result<(), TaskQueueError> {
261        let mut workers = self.workers.write().await;
262
263        for i in 0..initial_count {
264            let worker = Worker::new(
265                format!("worker-{}", i),
266                self.broker.clone(),
267                self.scheduler.clone(),
268            )
269            .with_task_registry(task_registry.clone());
270
271            let worker_handle = worker.start().await?;
272            workers.push(worker_handle);
273        }
274
275        #[cfg(feature = "tracing")]
276        tracing::info!(
277            "Started {} workers with task registry containing {} task types",
278            initial_count,
279            task_registry.registered_tasks().len()
280        );
281
282        Ok(())
283    }
284
285    /// Start the auto-scaler background process
286    pub fn start_autoscaler(&self) -> Result<(), TaskQueueError> {
287        let workers = self.workers.clone();
288        let autoscaler = self.autoscaler.clone();
289        let broker = self.broker.clone();
290        let scheduler = self.scheduler.clone();
291
292        tokio::spawn(async move {
293            loop {
294                tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
295
296                if let Ok(metrics) = autoscaler.collect_metrics().await {
297                    // Clone autoscaler for scaling decision since it requires mutable access
298                    if let Ok(action) = {
299                        let mut autoscaler_clone = (*autoscaler).clone();
300                        autoscaler_clone.decide_scaling_action(&metrics)
301                    } {
302                        match action {
303                            ScalingAction::ScaleUp(count) => {
304                                let mut workers_lock = workers.write().await;
305                                let current_count = workers_lock.len();
306
307                                for i in current_count..current_count + count {
308                                    let worker = Worker::new(
309                                        format!("worker-{}", i),
310                                        broker.clone(),
311                                        scheduler.clone(),
312                                    );
313
314                                    if let Ok(worker_handle) = worker.start().await {
315                                        workers_lock.push(worker_handle);
316                                        #[cfg(feature = "tracing")]
317                                        tracing::info!("Scaled up: added worker-{}", i);
318                                    }
319                                }
320                            }
321                            ScalingAction::ScaleDown(count) => {
322                                let mut workers_lock = workers.write().await;
323                                for _ in 0..count {
324                                    if let Some(worker) = workers_lock.pop() {
325                                        worker.stop().await;
326                                        #[cfg(feature = "tracing")]
327                                        tracing::info!("Scaled down: removed worker");
328                                    }
329                                }
330                            }
331                            ScalingAction::NoAction => {}
332                        }
333                    }
334                }
335            }
336        });
337
338        #[cfg(feature = "tracing")]
339        tracing::info!("Started auto-scaler");
340
341        Ok(())
342    }
343
344    /// Start the task scheduler background process
345    pub async fn start_scheduler(&self) -> Result<(), TaskQueueError> {
346        let mut handle_guard = self.scheduler_handle.write().await;
347
348        // Don't start if already running
349        if handle_guard.is_some() {
350            return Ok(());
351        }
352
353        let handle = TaskScheduler::start(self.broker.clone())?;
354        *handle_guard = Some(handle);
355
356        Ok(())
357    }
358
359    /// Stop the task scheduler
360    pub async fn stop_scheduler(&self) {
361        let mut handle_guard = self.scheduler_handle.write().await;
362        if let Some(handle) = handle_guard.take() {
363            handle.abort();
364
365            #[cfg(feature = "tracing")]
366            tracing::info!("Task scheduler stopped");
367        }
368    }
369
370    /// Enqueue a task for immediate execution
371    #[inline]
372    pub async fn enqueue<T: Task>(&self, task: T, queue: &str) -> Result<TaskId, TaskQueueError> {
373        self.broker.enqueue_task(task, queue).await
374    }
375
376    /// Schedule a task for delayed execution
377    #[inline]
378    pub async fn schedule<T: Task>(
379        &self,
380        task: T,
381        queue: &str,
382        delay: chrono::Duration,
383    ) -> Result<TaskId, TaskQueueError> {
384        self.scheduler.schedule_task(task, queue, delay).await
385    }
386
387    /// Get the current number of active workers
388    #[inline]
389    pub async fn worker_count(&self) -> usize {
390        self.workers.read().await.len()
391    }
392
393    /// Stop all workers
394    pub async fn stop_workers(&self) {
395        let mut workers = self.workers.write().await;
396        while let Some(worker) = workers.pop() {
397            worker.stop().await;
398        }
399
400        #[cfg(feature = "tracing")]
401        tracing::info!("All workers stopped");
402    }
403
404    /// Initiate graceful shutdown of the entire task queue
405    pub async fn shutdown(&self) -> Result<(), TaskQueueError> {
406        self.shutdown_with_timeout(std::time::Duration::from_secs(30))
407            .await
408    }
409
410    /// Shutdown with a custom timeout
411    pub async fn shutdown_with_timeout(
412        &self,
413        timeout: std::time::Duration,
414    ) -> Result<(), TaskQueueError> {
415        #[cfg(feature = "tracing")]
416        tracing::info!("Initiating graceful shutdown of TaskQueue...");
417
418        // Create shutdown signal
419        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
420        {
421            let mut signal = self.shutdown_signal.write().await;
422            *signal = Some(shutdown_tx.clone());
423        }
424
425        // Shutdown components in order
426        let shutdown_future = async {
427            // 1. Stop accepting new tasks (stop scheduler)
428            self.stop_scheduler().await;
429            #[cfg(feature = "tracing")]
430            tracing::info!("Scheduler stopped");
431
432            // 2. Stop autoscaler
433            // (Autoscaler doesn't have a stop method in current implementation)
434            #[cfg(feature = "tracing")]
435            tracing::info!("Autoscaler stopped");
436
437            // 3. Wait for running tasks to complete and stop workers
438            self.stop_workers().await;
439            #[cfg(feature = "tracing")]
440            tracing::info!("All workers stopped");
441
442            // 4. Broadcast shutdown signal
443            if let Err(e) = shutdown_tx.send(()) {
444                #[cfg(feature = "tracing")]
445                tracing::warn!("Failed to send shutdown signal: {}", e);
446            }
447
448            #[cfg(feature = "tracing")]
449            tracing::info!("TaskQueue shutdown completed successfully");
450
451            Ok(())
452        };
453
454        // Apply timeout
455        match tokio::time::timeout(timeout, shutdown_future).await {
456            Ok(result) => result,
457            Err(_) => {
458                #[cfg(feature = "tracing")]
459                tracing::error!("TaskQueue shutdown timed out after {:?}", timeout);
460                Err(TaskQueueError::Worker("Shutdown timeout".to_string()))
461            }
462        }
463    }
464
465    /// Check if shutdown has been initiated
466    pub async fn is_shutting_down(&self) -> bool {
467        self.shutdown_signal.read().await.is_some()
468    }
469
470    /// Get a shutdown signal receiver for external components
471    pub async fn shutdown_receiver(&self) -> Option<tokio::sync::broadcast::Receiver<()>> {
472        self.shutdown_signal
473            .read()
474            .await
475            .as_ref()
476            .map(|tx| tx.subscribe())
477    }
478
479    /// Get comprehensive health status
480    pub async fn health_check(&self) -> Result<HealthStatus, TaskQueueError> {
481        let mut health = HealthStatus {
482            status: "healthy".to_string(),
483            timestamp: chrono::Utc::now(),
484            components: std::collections::HashMap::new(),
485        };
486
487        // Check Redis connection
488        match self.broker.pool.get().await {
489            Ok(mut conn) => {
490                match redis::cmd("PING")
491                    .query_async::<_, String>(&mut *conn)
492                    .await
493                {
494                    Ok(_) => {
495                        health.components.insert(
496                            "redis".to_string(),
497                            ComponentHealth {
498                                status: "healthy".to_string(),
499                                message: Some("Connection successful".to_string()),
500                            },
501                        );
502                    }
503                    Err(e) => {
504                        health.status = "unhealthy".to_string();
505                        health.components.insert(
506                            "redis".to_string(),
507                            ComponentHealth {
508                                status: "unhealthy".to_string(),
509                                message: Some(format!("Ping failed: {}", e)),
510                            },
511                        );
512                    }
513                }
514            }
515            Err(e) => {
516                health.status = "unhealthy".to_string();
517                health.components.insert(
518                    "redis".to_string(),
519                    ComponentHealth {
520                        status: "unhealthy".to_string(),
521                        message: Some(format!("Connection failed: {}", e)),
522                    },
523                );
524            }
525        }
526
527        // Check workers
528        let worker_count = self.worker_count().await;
529        health.components.insert(
530            "workers".to_string(),
531            ComponentHealth {
532                status: if worker_count > 0 {
533                    "healthy"
534                } else {
535                    "warning"
536                }
537                .to_string(),
538                message: Some(format!("{} active workers", worker_count)),
539            },
540        );
541
542        // Check scheduler
543        let scheduler_running = self.scheduler_handle.read().await.is_some();
544        health.components.insert(
545            "scheduler".to_string(),
546            ComponentHealth {
547                status: if scheduler_running {
548                    "healthy"
549                } else {
550                    "stopped"
551                }
552                .to_string(),
553                message: Some(
554                    if scheduler_running {
555                        "Running"
556                    } else {
557                        "Stopped"
558                    }
559                    .to_string(),
560                ),
561            },
562        );
563
564        Ok(health)
565    }
566
567    /// Get comprehensive metrics
568    pub async fn get_metrics(&self) -> Result<TaskQueueMetrics, TaskQueueError> {
569        let queues = ["default", "high_priority", "low_priority"];
570        let mut queue_metrics = Vec::new();
571        let mut total_pending = 0;
572        let mut total_processed = 0;
573        let mut total_failed = 0;
574
575        for queue in &queues {
576            let metrics = self.broker.get_queue_metrics(queue).await?;
577            total_pending += metrics.pending_tasks;
578            total_processed += metrics.processed_tasks;
579            total_failed += metrics.failed_tasks;
580            queue_metrics.push(metrics);
581        }
582
583        let autoscaler_metrics = self.autoscaler.collect_metrics().await?;
584
585        Ok(TaskQueueMetrics {
586            timestamp: chrono::Utc::now(),
587            total_pending_tasks: total_pending,
588            total_processed_tasks: total_processed,
589            total_failed_tasks: total_failed,
590            active_workers: autoscaler_metrics.active_workers,
591            tasks_per_worker: autoscaler_metrics.queue_pressure_score,
592            queue_metrics,
593        })
594    }
595
596    /// Get enhanced system metrics with memory and performance data
597    pub async fn get_system_metrics(&self) -> SystemMetrics {
598        self.metrics.get_system_metrics().await
599    }
600
601    /// Get a quick metrics summary for debugging
602    pub async fn get_metrics_summary(&self) -> String {
603        self.metrics.get_metrics_summary().await
604    }
605}
606
607/// Builder for configuring and creating TaskQueue instances
608pub struct TaskQueueBuilder {
609    config: TaskQueueConfig,
610    task_registry: Option<Arc<TaskRegistry>>,
611    override_redis_url: Option<String>,
612}
613
614impl TaskQueueBuilder {
615    /// Create a new TaskQueue builder with default configuration
616    #[inline]
617    pub fn new(redis_url: impl Into<String>) -> Self {
618        let mut config = TaskQueueConfig::default();
619        config.redis.url = redis_url.into();
620
621        Self {
622            config,
623            task_registry: None,
624            override_redis_url: None,
625        }
626    }
627
628    /// Create a TaskQueue builder from global configuration
629    #[inline]
630    pub fn from_global_config() -> Result<Self, TaskQueueError> {
631        let config = TaskQueueConfig::get_or_init()?.clone();
632        Ok(Self {
633            config,
634            task_registry: None,
635            override_redis_url: None,
636        })
637    }
638
639    /// Create a TaskQueue builder from a specific configuration
640    #[must_use]
641    #[inline]
642    pub fn from_config(config: TaskQueueConfig) -> Self {
643        Self {
644            config,
645            task_registry: None,
646            override_redis_url: None,
647        }
648    }
649
650    /// Create a TaskQueue builder that auto-loads configuration
651    #[inline]
652    pub fn auto() -> Result<Self, TaskQueueError> {
653        let config = TaskQueueConfig::load()?;
654        Ok(Self {
655            config,
656            task_registry: None,
657            override_redis_url: None,
658        })
659    }
660
661    /// Override Redis URL (useful for testing or special cases)
662    #[must_use]
663    #[inline]
664    pub fn redis_url(mut self, url: impl Into<String>) -> Self {
665        self.override_redis_url = Some(url.into());
666        self
667    }
668
669    /// Set the auto-scaler configuration
670    #[must_use]
671    #[inline]
672    pub fn autoscaler_config(mut self, config: AutoScalerConfig) -> Self {
673        self.config.autoscaler = config;
674        self
675    }
676
677    /// Set the initial number of workers to start
678    #[must_use]
679    #[inline]
680    pub fn initial_workers(mut self, count: usize) -> Self {
681        self.config.workers.initial_count = count;
682        self
683    }
684
685    /// Set a custom task registry
686    #[must_use]
687    #[inline]
688    pub fn task_registry(mut self, registry: Arc<TaskRegistry>) -> Self {
689        self.task_registry = Some(registry);
690        self
691    }
692
693    /// Enable automatic task registration using inventory pattern
694    #[cfg(feature = "auto-register")]
695    #[must_use]
696    #[inline]
697    pub fn auto_register_tasks(mut self) -> Self {
698        self.config.auto_register.enabled = true;
699        self
700    }
701
702    /// Disable automatic task registration
703    #[cfg(feature = "auto-register")]
704    #[must_use]
705    #[inline]
706    pub fn disable_auto_register(mut self) -> Self {
707        self.config.auto_register.enabled = false;
708        self
709    }
710
711    /// Start the task scheduler automatically
712    #[must_use]
713    #[inline]
714    pub fn with_scheduler(mut self) -> Self {
715        self.config.scheduler.enabled = true;
716        self
717    }
718
719    /// Start the auto-scaler automatically
720    #[must_use]
721    #[inline]
722    pub fn with_autoscaler(self) -> Self {
723        // Auto-scaler is always available, we just don't start it by default
724        // This is a no-op now since we determine if to start it during build
725        self
726    }
727
728    /// Update the full configuration
729    #[must_use]
730    #[inline]
731    pub fn config(mut self, config: TaskQueueConfig) -> Self {
732        self.config = config;
733        self
734    }
735
736    /// Build the `TaskQueue` instance
737    ///
738    /// # Errors
739    /// Returns `TaskQueueError` if Redis connection fails or configuration is invalid
740    #[inline]
741    pub async fn build(self) -> Result<TaskQueue, TaskQueueError> {
742        // Use override Redis URL if provided, otherwise use config
743        let redis_url = self
744            .override_redis_url
745            .as_ref()
746            .unwrap_or(&self.config.redis.url);
747
748        // Create TaskQueue with autoscaler config
749        let task_queue =
750            TaskQueue::with_autoscaler_config(redis_url, self.config.autoscaler.clone()).await?;
751
752        // Handle task registry setup
753        let registry = if self.config.auto_register.enabled {
754            #[cfg(feature = "auto-register")]
755            {
756                if let Some(custom_registry) = self.task_registry {
757                    // Auto-register tasks into the provided registry using configuration
758                    custom_registry
759                        .auto_register_tasks_with_config(Some(&self.config.auto_register))
760                        .map_err(|e| {
761                            TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
762                        })?;
763                    custom_registry
764                } else {
765                    // Create a new registry with auto-registered tasks using configuration
766                    Arc::new(
767                        TaskRegistry::with_auto_registered_and_config(Some(
768                            &self.config.auto_register,
769                        ))
770                        .map_err(|e| {
771                            TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
772                        })?,
773                    )
774                }
775            }
776            #[cfg(not(feature = "auto-register"))]
777            {
778                return Err(TaskQueueError::Configuration(
779                    "Auto-registration requested but 'auto-register' feature is not enabled"
780                        .to_string(),
781                ));
782            }
783        } else {
784            self.task_registry
785                .unwrap_or_else(|| Arc::new(TaskRegistry::new()))
786        };
787
788        // Start workers
789        let worker_count = self.config.workers.initial_count;
790        if worker_count > 0 {
791            task_queue
792                .start_workers_with_registry(worker_count, registry)
793                .await?;
794        }
795
796        // Start scheduler if enabled
797        if self.config.scheduler.enabled {
798            task_queue.start_scheduler().await?;
799        }
800
801        // Auto-scaler is always available but we start it based on configuration
802        // For now, we'll start it if there are specific auto-scaling settings
803        if self.config.autoscaler.max_workers > self.config.autoscaler.min_workers {
804            task_queue.start_autoscaler()?;
805        }
806
807        Ok(task_queue)
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    // use tokio_test;
815
816    #[test]
817    fn test_autoscaler_config() {
818        let config = AutoScalerConfig::default();
819        assert_eq!(config.min_workers, 1);
820        assert_eq!(config.max_workers, 20);
821        assert_eq!(config.scaling_triggers.queue_pressure_threshold, 0.75);
822    }
823
824    #[test]
825    fn test_queue_manager() {
826        let manager = QueueManager::new();
827        let queues = manager.get_queue_names();
828
829        assert!(queues.contains(&"default".to_owned()));
830        assert!(queues.contains(&"high_priority".to_owned()));
831        assert!(queues.contains(&"low_priority".to_owned()));
832    }
833
834    #[test]
835    fn test_builder_pattern() {
836        let builder = TaskQueueBuilder::new("redis://localhost:6379")
837            .initial_workers(4)
838            .with_scheduler()
839            .with_autoscaler();
840
841        assert_eq!(builder.config.redis.url, "redis://localhost:6379");
842        assert_eq!(builder.config.workers.initial_count, 4);
843        assert!(builder.config.scheduler.enabled);
844    }
845}
846
847/// Health check status for monitoring
848#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct HealthStatus {
850    pub status: String,
851    pub timestamp: chrono::DateTime<chrono::Utc>,
852    pub components: std::collections::HashMap<String, ComponentHealth>,
853}
854
855#[derive(Debug, Clone, Serialize, Deserialize)]
856pub struct ComponentHealth {
857    pub status: String,
858    pub message: Option<String>,
859}
860
861/// Comprehensive metrics for monitoring
862#[derive(Debug, Clone, Serialize, Deserialize)]
863pub struct TaskQueueMetrics {
864    pub timestamp: chrono::DateTime<chrono::Utc>,
865    pub total_pending_tasks: i64,
866    pub total_processed_tasks: i64,
867    pub total_failed_tasks: i64,
868    pub active_workers: i64,
869    pub tasks_per_worker: f64,
870    pub queue_metrics: Vec<QueueMetrics>,
871}