1#![cfg_attr(docsrs, feature(doc_cfg))]
108
109pub mod autoscaler;
110pub mod broker;
111pub mod config;
112pub mod error;
113pub mod metrics;
114pub mod queue;
115pub mod scheduler;
116pub mod task;
117pub mod worker;
118
119#[cfg(feature = "tracing")]
120pub mod tracing_utils;
121
122#[cfg(feature = "actix-integration")]
123#[cfg_attr(docsrs, doc(cfg(feature = "actix-integration")))]
124pub mod actix;
125
126#[cfg(feature = "axum-integration")]
127#[cfg_attr(docsrs, doc(cfg(feature = "axum-integration")))]
128pub mod axum;
129
130#[cfg(feature = "cli")]
131#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
132pub mod cli;
133
134pub use autoscaler::*;
135pub use broker::*;
136#[cfg(feature = "cli")]
137pub use cli::*;
138pub use config::*;
139pub use error::*;
140pub use metrics::*;
141pub use queue::*;
142pub use scheduler::*;
143pub use task::*;
144pub use worker::*;
145
146#[cfg(feature = "auto-register")]
148pub use rust_task_queue_macro::{register_task as proc_register_task, AutoRegisterTask};
149
150#[cfg(not(feature = "auto-register"))]
152pub use std::marker::PhantomData as AutoRegisterTask;
153#[cfg(not(feature = "auto-register"))]
154pub use std::marker::PhantomData as proc_register_task;
155
156#[cfg(feature = "auto-register")]
158#[cfg_attr(docsrs, doc(cfg(feature = "auto-register")))]
159pub use inventory;
160
161pub mod prelude;
162
163use serde::{Deserialize, Serialize};
164use std::sync::Arc;
165use tokio::sync::RwLock;
166
167#[derive(Clone)]
169pub struct TaskQueue {
170 pub broker: Arc<RedisBroker>,
171 pub scheduler: Arc<TaskScheduler>,
172 pub autoscaler: Arc<AutoScaler>,
173 pub metrics: Arc<MetricsCollector>,
174 workers: Arc<RwLock<Vec<Worker>>>,
175 scheduler_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
176 shutdown_signal: Arc<RwLock<Option<tokio::sync::broadcast::Sender<()>>>>,
177}
178
179impl TaskQueue {
180 pub async fn new(redis_url: &str) -> Result<Self, TaskQueueError> {
182 let broker = Arc::new(RedisBroker::new(redis_url).await?);
183 let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
184 let autoscaler = Arc::new(AutoScaler::new(broker.clone()));
185 let metrics = Arc::new(MetricsCollector::new());
186
187 Ok(Self {
188 broker,
189 scheduler,
190 autoscaler,
191 metrics,
192 workers: Arc::new(RwLock::new(Vec::new())),
193 scheduler_handle: Arc::new(RwLock::new(None)),
194 shutdown_signal: Arc::new(RwLock::new(None)),
195 })
196 }
197
198 pub async fn with_autoscaler_config(
200 redis_url: &str,
201 autoscaler_config: AutoScalerConfig,
202 ) -> Result<Self, TaskQueueError> {
203 let broker = Arc::new(RedisBroker::new(redis_url).await?);
204 let scheduler = Arc::new(TaskScheduler::new(broker.clone()));
205 let autoscaler = Arc::new(AutoScaler::with_config(broker.clone(), autoscaler_config));
206 let metrics = Arc::new(MetricsCollector::new());
207
208 Ok(Self {
209 broker,
210 scheduler,
211 autoscaler,
212 metrics,
213 workers: Arc::new(RwLock::new(Vec::new())),
214 scheduler_handle: Arc::new(RwLock::new(None)),
215 shutdown_signal: Arc::new(RwLock::new(None)),
216 })
217 }
218
219 pub async fn start_workers(&self, initial_count: usize) -> Result<(), TaskQueueError> {
221 let registry = {
223 #[cfg(feature = "auto-register")]
224 {
225 if let Ok(config) = TaskQueueConfig::get_or_init() {
227 if config.auto_register.enabled {
228 match TaskRegistry::with_auto_registered_and_config(Some(
229 &config.auto_register,
230 )) {
231 Ok(reg) => Arc::new(reg),
232 Err(e) => {
233 #[cfg(feature = "tracing")]
234 tracing::warn!("Failed to create auto-registered task registry: {}, using empty registry", e);
235 Arc::new(TaskRegistry::new())
236 }
237 }
238 } else {
239 Arc::new(TaskRegistry::new())
240 }
241 } else {
242 match TaskRegistry::with_auto_registered() {
244 Ok(reg) => Arc::new(reg),
245 Err(_) => Arc::new(TaskRegistry::new()),
246 }
247 }
248 }
249 #[cfg(not(feature = "auto-register"))]
250 {
251 Arc::new(TaskRegistry::new())
252 }
253 };
254
255 self.start_workers_with_registry(initial_count, registry)
256 .await
257 }
258
259 pub async fn start_workers_with_registry(
261 &self,
262 initial_count: usize,
263 task_registry: Arc<TaskRegistry>,
264 ) -> Result<(), TaskQueueError> {
265 let mut workers = self.workers.write().await;
266
267 for i in 0..initial_count {
268 let worker = Worker::new(
269 format!("worker-{}", i),
270 self.broker.clone(),
271 self.scheduler.clone(),
272 )
273 .with_task_registry(task_registry.clone());
274
275 let worker_handle = worker.start().await?;
276 workers.push(worker_handle);
277 }
278
279 #[cfg(feature = "tracing")]
280 tracing::info!(
281 "Started {} workers with task registry containing {} task types",
282 initial_count,
283 task_registry.registered_tasks().len()
284 );
285
286 Ok(())
287 }
288
289 pub fn start_autoscaler(&self) -> Result<(), TaskQueueError> {
291 let workers = self.workers.clone();
292 let autoscaler = self.autoscaler.clone();
293 let broker = self.broker.clone();
294 let scheduler = self.scheduler.clone();
295
296 tokio::spawn(async move {
297 loop {
298 tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
299
300 if let Ok(metrics) = autoscaler.collect_metrics().await {
301 if let Ok(action) = {
303 let mut autoscaler_clone = (*autoscaler).clone();
304 autoscaler_clone.decide_scaling_action(&metrics)
305 } {
306 match action {
307 ScalingAction::ScaleUp(count) => {
308 let mut workers_lock = workers.write().await;
309 let current_count = workers_lock.len();
310
311 for i in current_count..current_count + count {
312 let worker = Worker::new(
313 format!("worker-{}", i),
314 broker.clone(),
315 scheduler.clone(),
316 );
317
318 if let Ok(worker_handle) = worker.start().await {
319 workers_lock.push(worker_handle);
320 #[cfg(feature = "tracing")]
321 tracing::info!("Scaled up: added worker-{}", i);
322 }
323 }
324 }
325 ScalingAction::ScaleDown(count) => {
326 let mut workers_lock = workers.write().await;
327 for _ in 0..count {
328 if let Some(worker) = workers_lock.pop() {
329 worker.stop().await;
330 #[cfg(feature = "tracing")]
331 tracing::info!("Scaled down: removed worker");
332 }
333 }
334 }
335 ScalingAction::NoAction => {}
336 }
337 }
338 }
339 }
340 });
341
342 #[cfg(feature = "tracing")]
343 tracing::info!("Started auto-scaler");
344
345 Ok(())
346 }
347
348 pub async fn start_scheduler(&self) -> Result<(), TaskQueueError> {
350 let mut handle_guard = self.scheduler_handle.write().await;
351
352 if handle_guard.is_some() {
354 return Ok(());
355 }
356
357 let handle = TaskScheduler::start(self.broker.clone())?;
358 *handle_guard = Some(handle);
359
360 Ok(())
361 }
362
363 pub async fn stop_scheduler(&self) {
365 let mut handle_guard = self.scheduler_handle.write().await;
366 if let Some(handle) = handle_guard.take() {
367 handle.abort();
368
369 #[cfg(feature = "tracing")]
370 tracing::info!("Task scheduler stopped");
371 }
372 }
373
374 #[inline]
376 pub async fn enqueue<T: Task>(&self, task: T, queue: &str) -> Result<TaskId, TaskQueueError> {
377 self.broker.enqueue_task(task, queue).await
378 }
379
380 #[inline]
382 pub async fn schedule<T: Task>(
383 &self,
384 task: T,
385 queue: &str,
386 delay: chrono::Duration,
387 ) -> Result<TaskId, TaskQueueError> {
388 self.scheduler.schedule_task(task, queue, delay).await
389 }
390
391 #[inline]
393 pub async fn worker_count(&self) -> usize {
394 self.workers.read().await.len()
395 }
396
397 pub async fn stop_workers(&self) {
399 let mut workers = self.workers.write().await;
400 while let Some(worker) = workers.pop() {
401 worker.stop().await;
402 }
403
404 #[cfg(feature = "tracing")]
405 tracing::info!("All workers stopped");
406 }
407
408 pub async fn shutdown(&self) -> Result<(), TaskQueueError> {
410 self.shutdown_with_timeout(std::time::Duration::from_secs(30))
411 .await
412 }
413
414 pub async fn shutdown_with_timeout(
416 &self,
417 timeout: std::time::Duration,
418 ) -> Result<(), TaskQueueError> {
419 #[cfg(feature = "tracing")]
420 tracing::info!("Initiating graceful shutdown of TaskQueue...");
421
422 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
424 {
425 let mut signal = self.shutdown_signal.write().await;
426 *signal = Some(shutdown_tx.clone());
427 }
428
429 let shutdown_future = async {
431 self.stop_scheduler().await;
433 #[cfg(feature = "tracing")]
434 tracing::info!("Scheduler stopped");
435
436 #[cfg(feature = "tracing")]
439 tracing::info!("Autoscaler stopped");
440
441 self.stop_workers().await;
443 #[cfg(feature = "tracing")]
444 tracing::info!("All workers stopped");
445
446 if let Err(e) = shutdown_tx.send(()) {
448 #[cfg(feature = "tracing")]
449 tracing::warn!("Failed to send shutdown signal: {}", e);
450 }
451
452 #[cfg(feature = "tracing")]
453 tracing::info!("TaskQueue shutdown completed successfully");
454
455 Ok(())
456 };
457
458 match tokio::time::timeout(timeout, shutdown_future).await {
460 Ok(result) => result,
461 Err(_) => {
462 #[cfg(feature = "tracing")]
463 tracing::error!("TaskQueue shutdown timed out after {:?}", timeout);
464 Err(TaskQueueError::Worker("Shutdown timeout".to_string()))
465 }
466 }
467 }
468
469 pub async fn is_shutting_down(&self) -> bool {
471 self.shutdown_signal.read().await.is_some()
472 }
473
474 pub async fn shutdown_receiver(&self) -> Option<tokio::sync::broadcast::Receiver<()>> {
476 self.shutdown_signal
477 .read()
478 .await
479 .as_ref()
480 .map(|tx| tx.subscribe())
481 }
482
483 pub async fn health_check(&self) -> Result<HealthStatus, TaskQueueError> {
485 let mut health = HealthStatus {
486 status: "healthy".to_string(),
487 timestamp: chrono::Utc::now(),
488 components: std::collections::HashMap::new(),
489 };
490
491 match self.broker.pool.get().await {
493 Ok(mut conn) => {
494 match redis::cmd("PING")
495 .query_async::<_, String>(&mut *conn)
496 .await
497 {
498 Ok(_) => {
499 health.components.insert(
500 "redis".to_string(),
501 ComponentHealth {
502 status: "healthy".to_string(),
503 message: Some("Connection successful".to_string()),
504 },
505 );
506 }
507 Err(e) => {
508 health.status = "unhealthy".to_string();
509 health.components.insert(
510 "redis".to_string(),
511 ComponentHealth {
512 status: "unhealthy".to_string(),
513 message: Some(format!("Ping failed: {}", e)),
514 },
515 );
516 }
517 }
518 }
519 Err(e) => {
520 health.status = "unhealthy".to_string();
521 health.components.insert(
522 "redis".to_string(),
523 ComponentHealth {
524 status: "unhealthy".to_string(),
525 message: Some(format!("Connection failed: {}", e)),
526 },
527 );
528 }
529 }
530
531 let worker_count = self.worker_count().await;
533 health.components.insert(
534 "workers".to_string(),
535 ComponentHealth {
536 status: if worker_count > 0 {
537 "healthy"
538 } else {
539 "warning"
540 }
541 .to_string(),
542 message: Some(format!("{} active workers", worker_count)),
543 },
544 );
545
546 let scheduler_running = self.scheduler_handle.read().await.is_some();
548 health.components.insert(
549 "scheduler".to_string(),
550 ComponentHealth {
551 status: if scheduler_running {
552 "healthy"
553 } else {
554 "stopped"
555 }
556 .to_string(),
557 message: Some(
558 if scheduler_running {
559 "Running"
560 } else {
561 "Stopped"
562 }
563 .to_string(),
564 ),
565 },
566 );
567
568 Ok(health)
569 }
570
571 pub async fn get_metrics(&self) -> Result<TaskQueueMetrics, TaskQueueError> {
573 let queues = ["default", "high_priority", "low_priority"];
574 let mut queue_metrics = Vec::new();
575 let mut total_pending = 0;
576 let mut total_processed = 0;
577 let mut total_failed = 0;
578
579 for queue in &queues {
580 let metrics = self.broker.get_queue_metrics(queue).await?;
581 total_pending += metrics.pending_tasks;
582 total_processed += metrics.processed_tasks;
583 total_failed += metrics.failed_tasks;
584 queue_metrics.push(metrics);
585 }
586
587 let autoscaler_metrics = self.autoscaler.collect_metrics().await?;
588
589 Ok(TaskQueueMetrics {
590 timestamp: chrono::Utc::now(),
591 total_pending_tasks: total_pending,
592 total_processed_tasks: total_processed,
593 total_failed_tasks: total_failed,
594 active_workers: autoscaler_metrics.active_workers,
595 tasks_per_worker: autoscaler_metrics.queue_pressure_score,
596 queue_metrics,
597 })
598 }
599
600 pub async fn get_system_metrics(&self) -> SystemMetrics {
602 self.metrics.get_system_metrics().await
603 }
604
605 pub async fn get_metrics_summary(&self) -> String {
607 self.metrics.get_metrics_summary().await
608 }
609}
610
611pub struct TaskQueueBuilder {
613 config: TaskQueueConfig,
614 task_registry: Option<Arc<TaskRegistry>>,
615 override_redis_url: Option<String>,
616}
617
618impl TaskQueueBuilder {
619 #[inline]
621 pub fn new(redis_url: impl Into<String>) -> Self {
622 let mut config = TaskQueueConfig::default();
623 config.redis.url = redis_url.into();
624
625 Self {
626 config,
627 task_registry: None,
628 override_redis_url: None,
629 }
630 }
631
632 #[inline]
634 pub fn from_global_config() -> Result<Self, TaskQueueError> {
635 let config = TaskQueueConfig::get_or_init()?.clone();
636 Ok(Self {
637 config,
638 task_registry: None,
639 override_redis_url: None,
640 })
641 }
642
643 #[must_use]
645 #[inline]
646 pub fn from_config(config: TaskQueueConfig) -> Self {
647 Self {
648 config,
649 task_registry: None,
650 override_redis_url: None,
651 }
652 }
653
654 #[inline]
656 pub fn auto() -> Result<Self, TaskQueueError> {
657 let config = TaskQueueConfig::load()?;
658 Ok(Self {
659 config,
660 task_registry: None,
661 override_redis_url: None,
662 })
663 }
664
665 #[must_use]
667 #[inline]
668 pub fn redis_url(mut self, url: impl Into<String>) -> Self {
669 self.override_redis_url = Some(url.into());
670 self
671 }
672
673 #[must_use]
675 #[inline]
676 pub fn autoscaler_config(mut self, config: AutoScalerConfig) -> Self {
677 self.config.autoscaler = config;
678 self
679 }
680
681 #[must_use]
683 #[inline]
684 pub fn initial_workers(mut self, count: usize) -> Self {
685 self.config.workers.initial_count = count;
686 self
687 }
688
689 #[must_use]
691 #[inline]
692 pub fn task_registry(mut self, registry: Arc<TaskRegistry>) -> Self {
693 self.task_registry = Some(registry);
694 self
695 }
696
697 #[cfg(feature = "auto-register")]
699 #[must_use]
700 #[inline]
701 pub fn auto_register_tasks(mut self) -> Self {
702 self.config.auto_register.enabled = true;
703 self
704 }
705
706 #[cfg(feature = "auto-register")]
708 #[must_use]
709 #[inline]
710 pub fn disable_auto_register(mut self) -> Self {
711 self.config.auto_register.enabled = false;
712 self
713 }
714
715 #[must_use]
717 #[inline]
718 pub fn with_scheduler(mut self) -> Self {
719 self.config.scheduler.enabled = true;
720 self
721 }
722
723 #[must_use]
725 #[inline]
726 pub fn with_autoscaler(self) -> Self {
727 self
730 }
731
732 #[must_use]
734 #[inline]
735 pub fn config(mut self, config: TaskQueueConfig) -> Self {
736 self.config = config;
737 self
738 }
739
740 #[inline]
745 pub async fn build(self) -> Result<TaskQueue, TaskQueueError> {
746 let redis_url = self
748 .override_redis_url
749 .as_ref()
750 .unwrap_or(&self.config.redis.url);
751
752 let task_queue =
754 TaskQueue::with_autoscaler_config(redis_url, self.config.autoscaler.clone()).await?;
755
756 let registry = if self.config.auto_register.enabled {
758 #[cfg(feature = "auto-register")]
759 {
760 if let Some(custom_registry) = self.task_registry {
761 custom_registry
763 .auto_register_tasks_with_config(Some(&self.config.auto_register))
764 .map_err(|e| {
765 TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
766 })?;
767 custom_registry
768 } else {
769 Arc::new(
771 TaskRegistry::with_auto_registered_and_config(Some(
772 &self.config.auto_register,
773 ))
774 .map_err(|e| {
775 TaskQueueError::Configuration(format!("Auto-registration failed: {e}"))
776 })?,
777 )
778 }
779 }
780 #[cfg(not(feature = "auto-register"))]
781 {
782 return Err(TaskQueueError::Configuration(
783 "Auto-registration requested but 'auto-register' feature is not enabled"
784 .to_string(),
785 ));
786 }
787 } else {
788 self.task_registry
789 .unwrap_or_else(|| Arc::new(TaskRegistry::new()))
790 };
791
792 let worker_count = self.config.workers.initial_count;
794 if worker_count > 0 {
795 task_queue
796 .start_workers_with_registry(worker_count, registry)
797 .await?;
798 }
799
800 if self.config.scheduler.enabled {
802 task_queue.start_scheduler().await?;
803 }
804
805 if self.config.autoscaler.max_workers > self.config.autoscaler.min_workers {
808 task_queue.start_autoscaler()?;
809 }
810
811 Ok(task_queue)
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use super::*;
818 #[test]
821 fn test_autoscaler_config() {
822 let config = AutoScalerConfig::default();
823 assert_eq!(config.min_workers, 1);
824 assert_eq!(config.max_workers, 20);
825 assert_eq!(config.scaling_triggers.queue_pressure_threshold, 0.75);
826 }
827
828 #[test]
829 fn test_queue_manager() {
830 let manager = QueueManager::new();
831 let queues = manager.get_queue_names();
832
833 assert!(queues.contains(&"default".to_owned()));
834 assert!(queues.contains(&"high_priority".to_owned()));
835 assert!(queues.contains(&"low_priority".to_owned()));
836 }
837
838 #[test]
839 fn test_builder_pattern() {
840 let builder = TaskQueueBuilder::new("redis://localhost:6379")
841 .initial_workers(4)
842 .with_scheduler()
843 .with_autoscaler();
844
845 assert_eq!(builder.config.redis.url, "redis://localhost:6379");
846 assert_eq!(builder.config.workers.initial_count, 4);
847 assert!(builder.config.scheduler.enabled);
848 }
849}
850
851#[derive(Debug, Clone, Serialize, Deserialize)]
853pub struct HealthStatus {
854 pub status: String,
855 pub timestamp: chrono::DateTime<chrono::Utc>,
856 pub components: std::collections::HashMap<String, ComponentHealth>,
857}
858
859#[derive(Debug, Clone, Serialize, Deserialize)]
860pub struct ComponentHealth {
861 pub status: String,
862 pub message: Option<String>,
863}
864
865#[derive(Debug, Clone, Serialize, Deserialize)]
867pub struct TaskQueueMetrics {
868 pub timestamp: chrono::DateTime<chrono::Utc>,
869 pub total_pending_tasks: i64,
870 pub total_processed_tasks: i64,
871 pub total_failed_tasks: i64,
872 pub active_workers: i64,
873 pub tasks_per_worker: f64,
874 pub queue_metrics: Vec<QueueMetrics>,
875}