1#![cfg_attr(docsrs, feature(doc_cfg))]
108
109pub mod autoscaler;
110pub mod broker;
111pub mod config;
112pub mod error;
113pub mod metrics;
114pub mod queue;
115pub mod scheduler;
116pub mod task;
117pub mod worker;
118
119#[cfg(feature = "tracing")]
120pub mod tracing_utils;
121
122#[cfg(feature = "actix-integration")]
123#[cfg_attr(docsrs, doc(cfg(feature = "actix-integration")))]
124pub mod actix;
125
126#[cfg(feature = "cli")]
127#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
128pub mod cli;
129
130pub use autoscaler::*;
131pub use broker::*;
132#[cfg(feature = "cli")]
133pub use cli::*;
134pub use config::*;
135pub use error::*;
136pub use metrics::*;
137pub use queue::*;
138pub use scheduler::*;
139pub use task::*;
140pub use worker::*;
141
142#[cfg(feature = "auto-register")]
154#[cfg_attr(docsrs, doc(cfg(feature = "auto-register")))]
155pub use inventory;
156
157pub mod prelude;
158
159use serde::{Deserialize, Serialize};
160use std::sync::Arc;
161use tokio::sync::RwLock;
162
163#[derive(Clone)]
165pub struct TaskQueue {
166 pub broker: Arc<RedisBroker>,
167 pub scheduler: Arc<TaskScheduler>,
168 pub autoscaler: Arc<AutoScaler>,
169 pub metrics: Arc<MetricsCollector>,
170 workers: Arc<RwLock<Vec<Worker>>>,
171 scheduler_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
172 shutdown_signal: Arc<RwLock<Option<tokio::sync::broadcast::Sender<()>>>>,
173}
174
175impl TaskQueue {
176 pub async fn new(redis_url: &str) -> Result<Self, TaskQueueError> {
178 let broker = Arc::new(RedisBroker::new(redis_url).await?);
179 let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
180 let autoscaler = Arc::new(AutoScaler::new(broker.clone()));
181 let metrics = Arc::new(MetricsCollector::new());
182
183 Ok(Self {
184 broker,
185 scheduler,
186 autoscaler,
187 metrics,
188 workers: Arc::new(RwLock::new(Vec::new())),
189 scheduler_handle: Arc::new(RwLock::new(None)),
190 shutdown_signal: Arc::new(RwLock::new(None)),
191 })
192 }
193
194 pub async fn with_autoscaler_config(
196 redis_url: &str,
197 autoscaler_config: AutoScalerConfig,
198 ) -> Result<Self, TaskQueueError> {
199 let broker = Arc::new(RedisBroker::new(redis_url).await?);
200 let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
201 let autoscaler = Arc::new(AutoScaler::with_config(broker.clone(), autoscaler_config));
202 let metrics = Arc::new(MetricsCollector::new());
203
204 Ok(Self {
205 broker,
206 scheduler,
207 autoscaler,
208 metrics,
209 workers: Arc::new(RwLock::new(Vec::new())),
210 scheduler_handle: Arc::new(RwLock::new(None)),
211 shutdown_signal: Arc::new(RwLock::new(None)),
212 })
213 }
214
215 pub async fn start_workers(&self, initial_count: usize) -> Result<(), TaskQueueError> {
217 let registry = {
219 #[cfg(feature = "auto-register")]
220 {
221 if let Ok(config) = TaskQueueConfig::get_or_init() {
223 if config.auto_register.enabled {
224 match TaskRegistry::with_auto_registered_and_config(Some(
225 &config.auto_register,
226 )) {
227 Ok(reg) => Arc::new(reg),
228 Err(e) => {
229 #[cfg(feature = "tracing")]
230 tracing::warn!("Failed to create auto-registered task registry: {}, using empty registry", e);
231 Arc::new(TaskRegistry::new())
232 }
233 }
234 } else {
235 Arc::new(TaskRegistry::new())
236 }
237 } else {
238 match TaskRegistry::with_auto_registered() {
240 Ok(reg) => Arc::new(reg),
241 Err(_) => Arc::new(TaskRegistry::new()),
242 }
243 }
244 }
245 #[cfg(not(feature = "auto-register"))]
246 {
247 Arc::new(TaskRegistry::new())
248 }
249 };
250
251 self.start_workers_with_registry(initial_count, registry)
252 .await
253 }
254
255 pub async fn start_workers_with_registry(
257 &self,
258 initial_count: usize,
259 task_registry: Arc<TaskRegistry>,
260 ) -> Result<(), TaskQueueError> {
261 let mut workers = self.workers.write().await;
262
263 for i in 0..initial_count {
264 let worker = Worker::new(
265 format!("worker-{}", i),
266 self.broker.clone(),
267 self.scheduler.clone(),
268 )
269 .with_task_registry(task_registry.clone());
270
271 let worker_handle = worker.start().await?;
272 workers.push(worker_handle);
273 }
274
275 #[cfg(feature = "tracing")]
276 tracing::info!(
277 "Started {} workers with task registry containing {} task types",
278 initial_count,
279 task_registry.registered_tasks().len()
280 );
281
282 Ok(())
283 }
284
285 pub fn start_autoscaler(&self) -> Result<(), TaskQueueError> {
287 let workers = self.workers.clone();
288 let autoscaler = self.autoscaler.clone();
289 let broker = self.broker.clone();
290 let scheduler = self.scheduler.clone();
291
292 tokio::spawn(async move {
293 loop {
294 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
295
296 if let Ok(metrics) = autoscaler.collect_metrics().await {
297 if let Ok(action) = {
299 let mut autoscaler_clone = (*autoscaler).clone();
300 autoscaler_clone.decide_scaling_action(&metrics)
301 } {
302 match action {
303 ScalingAction::ScaleUp(count) => {
304 let mut workers_lock = workers.write().await;
305 let current_count = workers_lock.len();
306
307 for i in current_count..current_count + count {
308 let worker = Worker::new(
309 format!("worker-{}", i),
310 broker.clone(),
311 scheduler.clone(),
312 );
313
314 if let Ok(worker_handle) = worker.start().await {
315 workers_lock.push(worker_handle);
316 #[cfg(feature = "tracing")]
317 tracing::info!("Scaled up: added worker-{}", i);
318 }
319 }
320 }
321 ScalingAction::ScaleDown(count) => {
322 let mut workers_lock = workers.write().await;
323 for _ in 0..count {
324 if let Some(worker) = workers_lock.pop() {
325 worker.stop().await;
326 #[cfg(feature = "tracing")]
327 tracing::info!("Scaled down: removed worker");
328 }
329 }
330 }
331 ScalingAction::NoAction => {}
332 }
333 }
334 }
335 }
336 });
337
338 #[cfg(feature = "tracing")]
339 tracing::info!("Started auto-scaler");
340
341 Ok(())
342 }
343
344 pub async fn start_scheduler(&self) -> Result<(), TaskQueueError> {
346 let mut handle_guard = self.scheduler_handle.write().await;
347
348 if handle_guard.is_some() {
350 return Ok(());
351 }
352
353 let handle = TaskScheduler::start(self.broker.clone())?;
354 *handle_guard = Some(handle);
355
356 Ok(())
357 }
358
359 pub async fn stop_scheduler(&self) {
361 let mut handle_guard = self.scheduler_handle.write().await;
362 if let Some(handle) = handle_guard.take() {
363 handle.abort();
364
365 #[cfg(feature = "tracing")]
366 tracing::info!("Task scheduler stopped");
367 }
368 }
369
370 #[inline]
372 pub async fn enqueue<T: Task>(&self, task: T, queue: &str) -> Result<TaskId, TaskQueueError> {
373 self.broker.enqueue_task(task, queue).await
374 }
375
376 #[inline]
378 pub async fn schedule<T: Task>(
379 &self,
380 task: T,
381 queue: &str,
382 delay: chrono::Duration,
383 ) -> Result<TaskId, TaskQueueError> {
384 self.scheduler.schedule_task(task, queue, delay).await
385 }
386
387 #[inline]
389 pub async fn worker_count(&self) -> usize {
390 self.workers.read().await.len()
391 }
392
393 pub async fn stop_workers(&self) {
395 let mut workers = self.workers.write().await;
396 while let Some(worker) = workers.pop() {
397 worker.stop().await;
398 }
399
400 #[cfg(feature = "tracing")]
401 tracing::info!("All workers stopped");
402 }
403
404 pub async fn shutdown(&self) -> Result<(), TaskQueueError> {
406 self.shutdown_with_timeout(std::time::Duration::from_secs(30))
407 .await
408 }
409
410 pub async fn shutdown_with_timeout(
412 &self,
413 timeout: std::time::Duration,
414 ) -> Result<(), TaskQueueError> {
415 #[cfg(feature = "tracing")]
416 tracing::info!("Initiating graceful shutdown of TaskQueue...");
417
418 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
420 {
421 let mut signal = self.shutdown_signal.write().await;
422 *signal = Some(shutdown_tx.clone());
423 }
424
425 let shutdown_future = async {
427 self.stop_scheduler().await;
429 #[cfg(feature = "tracing")]
430 tracing::info!("Scheduler stopped");
431
432 #[cfg(feature = "tracing")]
435 tracing::info!("Autoscaler stopped");
436
437 self.stop_workers().await;
439 #[cfg(feature = "tracing")]
440 tracing::info!("All workers stopped");
441
442 if let Err(e) = shutdown_tx.send(()) {
444 #[cfg(feature = "tracing")]
445 tracing::warn!("Failed to send shutdown signal: {}", e);
446 }
447
448 #[cfg(feature = "tracing")]
449 tracing::info!("TaskQueue shutdown completed successfully");
450
451 Ok(())
452 };
453
454 match tokio::time::timeout(timeout, shutdown_future).await {
456 Ok(result) => result,
457 Err(_) => {
458 #[cfg(feature = "tracing")]
459 tracing::error!("TaskQueue shutdown timed out after {:?}", timeout);
460 Err(TaskQueueError::Worker("Shutdown timeout".to_string()))
461 }
462 }
463 }
464
465 pub async fn is_shutting_down(&self) -> bool {
467 self.shutdown_signal.read().await.is_some()
468 }
469
470 pub async fn shutdown_receiver(&self) -> Option<tokio::sync::broadcast::Receiver<()>> {
472 self.shutdown_signal
473 .read()
474 .await
475 .as_ref()
476 .map(|tx| tx.subscribe())
477 }
478
479 pub async fn health_check(&self) -> Result<HealthStatus, TaskQueueError> {
481 let mut health = HealthStatus {
482 status: "healthy".to_string(),
483 timestamp: chrono::Utc::now(),
484 components: std::collections::HashMap::new(),
485 };
486
487 match self.broker.pool.get().await {
489 Ok(mut conn) => {
490 match redis::cmd("PING")
491 .query_async::<_, String>(&mut *conn)
492 .await
493 {
494 Ok(_) => {
495 health.components.insert(
496 "redis".to_string(),
497 ComponentHealth {
498 status: "healthy".to_string(),
499 message: Some("Connection successful".to_string()),
500 },
501 );
502 }
503 Err(e) => {
504 health.status = "unhealthy".to_string();
505 health.components.insert(
506 "redis".to_string(),
507 ComponentHealth {
508 status: "unhealthy".to_string(),
509 message: Some(format!("Ping failed: {}", e)),
510 },
511 );
512 }
513 }
514 }
515 Err(e) => {
516 health.status = "unhealthy".to_string();
517 health.components.insert(
518 "redis".to_string(),
519 ComponentHealth {
520 status: "unhealthy".to_string(),
521 message: Some(format!("Connection failed: {}", e)),
522 },
523 );
524 }
525 }
526
527 let worker_count = self.worker_count().await;
529 health.components.insert(
530 "workers".to_string(),
531 ComponentHealth {
532 status: if worker_count > 0 {
533 "healthy"
534 } else {
535 "warning"
536 }
537 .to_string(),
538 message: Some(format!("{} active workers", worker_count)),
539 },
540 );
541
542 let scheduler_running = self.scheduler_handle.read().await.is_some();
544 health.components.insert(
545 "scheduler".to_string(),
546 ComponentHealth {
547 status: if scheduler_running {
548 "healthy"
549 } else {
550 "stopped"
551 }
552 .to_string(),
553 message: Some(
554 if scheduler_running {
555 "Running"
556 } else {
557 "Stopped"
558 }
559 .to_string(),
560 ),
561 },
562 );
563
564 Ok(health)
565 }
566
567 pub async fn get_metrics(&self) -> Result<TaskQueueMetrics, TaskQueueError> {
569 let queues = ["default", "high_priority", "low_priority"];
570 let mut queue_metrics = Vec::new();
571 let mut total_pending = 0;
572 let mut total_processed = 0;
573 let mut total_failed = 0;
574
575 for queue in &queues {
576 let metrics = self.broker.get_queue_metrics(queue).await?;
577 total_pending += metrics.pending_tasks;
578 total_processed += metrics.processed_tasks;
579 total_failed += metrics.failed_tasks;
580 queue_metrics.push(metrics);
581 }
582
583 let autoscaler_metrics = self.autoscaler.collect_metrics().await?;
584
585 Ok(TaskQueueMetrics {
586 timestamp: chrono::Utc::now(),
587 total_pending_tasks: total_pending,
588 total_processed_tasks: total_processed,
589 total_failed_tasks: total_failed,
590 active_workers: autoscaler_metrics.active_workers,
591 tasks_per_worker: autoscaler_metrics.queue_pressure_score,
592 queue_metrics,
593 })
594 }
595
596 pub async fn get_system_metrics(&self) -> SystemMetrics {
598 self.metrics.get_system_metrics().await
599 }
600
601 pub async fn get_metrics_summary(&self) -> String {
603 self.metrics.get_metrics_summary().await
604 }
605}
606
607pub struct TaskQueueBuilder {
609 config: TaskQueueConfig,
610 task_registry: Option<Arc<TaskRegistry>>,
611 override_redis_url: Option<String>,
612}
613
614impl TaskQueueBuilder {
615 #[inline]
617 pub fn new(redis_url: impl Into<String>) -> Self {
618 let mut config = TaskQueueConfig::default();
619 config.redis.url = redis_url.into();
620
621 Self {
622 config,
623 task_registry: None,
624 override_redis_url: None,
625 }
626 }
627
628 #[inline]
630 pub fn from_global_config() -> Result<Self, TaskQueueError> {
631 let config = TaskQueueConfig::get_or_init()?.clone();
632 Ok(Self {
633 config,
634 task_registry: None,
635 override_redis_url: None,
636 })
637 }
638
639 #[must_use]
641 #[inline]
642 pub fn from_config(config: TaskQueueConfig) -> Self {
643 Self {
644 config,
645 task_registry: None,
646 override_redis_url: None,
647 }
648 }
649
650 #[inline]
652 pub fn auto() -> Result<Self, TaskQueueError> {
653 let config = TaskQueueConfig::load()?;
654 Ok(Self {
655 config,
656 task_registry: None,
657 override_redis_url: None,
658 })
659 }
660
661 #[must_use]
663 #[inline]
664 pub fn redis_url(mut self, url: impl Into<String>) -> Self {
665 self.override_redis_url = Some(url.into());
666 self
667 }
668
669 #[must_use]
671 #[inline]
672 pub fn autoscaler_config(mut self, config: AutoScalerConfig) -> Self {
673 self.config.autoscaler = config;
674 self
675 }
676
677 #[must_use]
679 #[inline]
680 pub fn initial_workers(mut self, count: usize) -> Self {
681 self.config.workers.initial_count = count;
682 self
683 }
684
685 #[must_use]
687 #[inline]
688 pub fn task_registry(mut self, registry: Arc<TaskRegistry>) -> Self {
689 self.task_registry = Some(registry);
690 self
691 }
692
693 #[cfg(feature = "auto-register")]
695 #[must_use]
696 #[inline]
697 pub fn auto_register_tasks(mut self) -> Self {
698 self.config.auto_register.enabled = true;
699 self
700 }
701
702 #[cfg(feature = "auto-register")]
704 #[must_use]
705 #[inline]
706 pub fn disable_auto_register(mut self) -> Self {
707 self.config.auto_register.enabled = false;
708 self
709 }
710
711 #[must_use]
713 #[inline]
714 pub fn with_scheduler(mut self) -> Self {
715 self.config.scheduler.enabled = true;
716 self
717 }
718
719 #[must_use]
721 #[inline]
722 pub fn with_autoscaler(self) -> Self {
723 self
726 }
727
728 #[must_use]
730 #[inline]
731 pub fn config(mut self, config: TaskQueueConfig) -> Self {
732 self.config = config;
733 self
734 }
735
736 #[inline]
741 pub async fn build(self) -> Result<TaskQueue, TaskQueueError> {
742 let redis_url = self
744 .override_redis_url
745 .as_ref()
746 .unwrap_or(&self.config.redis.url);
747
748 let task_queue =
750 TaskQueue::with_autoscaler_config(redis_url, self.config.autoscaler.clone()).await?;
751
752 let registry = if self.config.auto_register.enabled {
754 #[cfg(feature = "auto-register")]
755 {
756 if let Some(custom_registry) = self.task_registry {
757 custom_registry
759 .auto_register_tasks_with_config(Some(&self.config.auto_register))
760 .map_err(|e| {
761 TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
762 })?;
763 custom_registry
764 } else {
765 Arc::new(
767 TaskRegistry::with_auto_registered_and_config(Some(
768 &self.config.auto_register,
769 ))
770 .map_err(|e| {
771 TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
772 })?,
773 )
774 }
775 }
776 #[cfg(not(feature = "auto-register"))]
777 {
778 return Err(TaskQueueError::Configuration(
779 "Auto-registration requested but 'auto-register' feature is not enabled"
780 .to_string(),
781 ));
782 }
783 } else {
784 self.task_registry
785 .unwrap_or_else(|| Arc::new(TaskRegistry::new()))
786 };
787
788 let worker_count = self.config.workers.initial_count;
790 if worker_count > 0 {
791 task_queue
792 .start_workers_with_registry(worker_count, registry)
793 .await?;
794 }
795
796 if self.config.scheduler.enabled {
798 task_queue.start_scheduler().await?;
799 }
800
801 if self.config.autoscaler.max_workers > self.config.autoscaler.min_workers {
804 task_queue.start_autoscaler()?;
805 }
806
807 Ok(task_queue)
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814 #[test]
817 fn test_autoscaler_config() {
818 let config = AutoScalerConfig::default();
819 assert_eq!(config.min_workers, 1);
820 assert_eq!(config.max_workers, 20);
821 assert_eq!(config.scaling_triggers.queue_pressure_threshold, 0.75);
822 }
823
824 #[test]
825 fn test_queue_manager() {
826 let manager = QueueManager::new();
827 let queues = manager.get_queue_names();
828
829 assert!(queues.contains(&"default".to_owned()));
830 assert!(queues.contains(&"high_priority".to_owned()));
831 assert!(queues.contains(&"low_priority".to_owned()));
832 }
833
834 #[test]
835 fn test_builder_pattern() {
836 let builder = TaskQueueBuilder::new("redis://localhost:6379")
837 .initial_workers(4)
838 .with_scheduler()
839 .with_autoscaler();
840
841 assert_eq!(builder.config.redis.url, "redis://localhost:6379");
842 assert_eq!(builder.config.workers.initial_count, 4);
843 assert!(builder.config.scheduler.enabled);
844 }
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize)]
849pub struct HealthStatus {
850 pub status: String,
851 pub timestamp: chrono::DateTime<chrono::Utc>,
852 pub components: std::collections::HashMap<String, ComponentHealth>,
853}
854
855#[derive(Debug, Clone, Serialize, Deserialize)]
856pub struct ComponentHealth {
857 pub status: String,
858 pub message: Option<String>,
859}
860
861#[derive(Debug, Clone, Serialize, Deserialize)]
863pub struct TaskQueueMetrics {
864 pub timestamp: chrono::DateTime<chrono::Utc>,
865 pub total_pending_tasks: i64,
866 pub total_processed_tasks: i64,
867 pub total_failed_tasks: i64,
868 pub active_workers: i64,
869 pub tasks_per_worker: f64,
870 pub queue_metrics: Vec<QueueMetrics>,
871}