polly_scheduler/core/
context.rs

1use crate::core::cleaner::TaskCleaner;
2use crate::core::cron::next_run;
3use crate::core::handlers::TaskHandlers;
4use crate::core::store::TaskStore;
5use crate::core::task::Task;
6use crate::core::task_kind::TaskKind;
7use crate::core::worker::process_task_worker;
8use crate::utc_now;
9use std::time::Duration;
10use std::{collections::HashMap, sync::Arc};
11use tokio::signal;
12use tokio::sync::RwLock;
13
14pub struct TaskContext<S>
15where
16    S: TaskStore, // Ensures that S is a type that implements the TaskStore trait
17{
18    queue_concurrency: HashMap<String, u32>, // Stores the concurrency level for each task queue
19    handlers: TaskHandlers, // Collection of task handlers to process different task types
20    shutdown: Arc<RwLock<bool>>, // Shared state to control the shutdown of the system
21    store: Arc<S>, // Arc wrapper around the task store, allowing shared ownership across threads
22}
23
24impl<S> TaskContext<S>
25where
26    S: TaskStore + Sync + Send + 'static, // S must implement TaskStore, and be Sync and Send
27{
28    /// Creates a new TaskContext with the provided store.
29    pub fn new(store: S) -> Self {
30        Self {
31            queue_concurrency: HashMap::new(), // Initialize concurrency map as empty
32            handlers: TaskHandlers::new(),     // Create a new TaskHandlers instance
33            store: Arc::new(store),            // Wrap the store in an Arc for shared ownership
34            shutdown: Arc::new(RwLock::new(false)), // Initialize shutdown state to false
35        }
36    }
37
38    /// Registers a new task type in the context.
39    pub fn register<T>(mut self) -> Self
40    where
41        T: Task, // T must implement the Task trait
42    {
43        self.handlers.register::<T>(); // Register the task handler
44        self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); // Set default concurrency for the task queue
45        self
46    }
47
48    /// Sets the concurrency level for a specified queue.
49    pub fn set_concurrency(mut self, queue: &str, count: u32) -> Self {
50        self.queue_concurrency.insert(queue.to_owned(), count); // Update the concurrency level for the queue
51        self
52    }
53
54    /// Starts the task cleaner to periodically clean up tasks.
55    fn start_task_cleaner(&self) {
56        let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); // Create a new TaskCleaner
57        cleaner.start(Duration::from_secs(60 * 10)); // Start the cleaner to run every 10 minutes
58    }
59
60    /// Starts worker threads for processing tasks in each queue.
61    fn start_worker(&self) {
62        let Self {
63            queue_concurrency,
64            handlers,
65            store,
66            shutdown,
67        } = self;
68        // Spawn workers based on the concurrency settings for each queue
69        for (queue, concurrency) in queue_concurrency.iter() {
70            for _ in 0..*concurrency {
71                let handlers = handlers.clone(); // Clone the handlers for the worker
72                let task_store = store.clone(); // Clone the task store for the worker
73                let shutdown = shutdown.clone(); // Clone the shutdown flag for the worker
74                let queue = queue.clone(); // Clone the queue name for the worker
75                tokio::spawn(async move {
76                    process_task_worker(queue.as_str(), handlers, task_store, shutdown).await;
77                    // Start processing tasks in the worker
78                });
79            }
80        }
81    }
82
83    /// Starts a signal listener to handle shutdown signals.
84    pub fn start_signal_listener(&self) {
85        let shutdown = self.shutdown.clone(); // Clone the shutdown flag
86        tokio::spawn(async move {
87            match signal::ctrl_c().await {
88                Ok(()) => {
89                    println!("Shutdown signal received (Ctrl+C). Terminating all scheduled tasks and shutting down the system...");
90                    let mut triggered = shutdown.write().await; // Acquire write lock to set shutdown state
91                    *triggered = true; // Set the shutdown state to true
92                }
93                Err(err) => {
94                    eprintln!("Error listening for shutdown signal: {:?}", err);
95                    // Log any errors encountered while listening
96                }
97            }
98        });
99    }
100
101    /// Starts the task context, including workers and the task cleaner.
102    pub fn start(self) -> Self {
103        self.start_worker(); // Start task workers
104        self.start_task_cleaner(); // Start the task cleaner
105        self.start_signal_listener(); // Start the signal listener
106        self
107    }
108
109    /// Adds a new task to the context for execution.
110    pub async fn add_task<T>(&self, task: T) -> Result<(), String>
111    where
112        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
113    {
114        let mut task_meta = task.new_meta(); // Create metadata for the new task
115        let next_run = match T::TASK_KIND {
116            TaskKind::Once | TaskKind::Repeat => {
117                utc_now!() + (task_meta.delay_seconds * 1000) as i64
118            } // Set next run time to now for once or repeat tasks
119            TaskKind::Cron => {
120                let schedule = T::SCHEDULE
121                    .ok_or_else(|| "Cron schedule is required for TaskKind::Cron".to_string())?; // Ensure a cron schedule is provided
122
123                let timezone = T::TIMEZONE
124                    .ok_or_else(|| "Timezone is required for TaskKind::Cron".to_string())?; // Ensure a timezone is provided
125
126                // Calculate the next run time based on the cron schedule and timezone
127                next_run(schedule, timezone, 0).ok_or_else(|| {
128                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
129                })?
130            }
131        };
132
133        task_meta.next_run = next_run; // Set the next run time in the task metadata
134        task_meta.last_run = next_run; // Set the last run time in the task metadata
135        self.store
136            .store_task(task_meta) // Store the task metadata in the task store
137            .await
138            .map_err(|e| e.to_string()) // Handle any errors during the store operation
139    }
140}