polly_scheduler/core/context.rs
1use crate::core::cleaner::TaskCleaner;
2use crate::core::cron::next_run;
3use crate::core::handlers::TaskHandlers;
4use crate::core::store::TaskStore;
5use crate::core::task::Task;
6use crate::core::task_kind::TaskKind;
7use crate::core::worker::process_task_worker;
8use crate::utc_now;
9use std::time::Duration;
10use std::{collections::HashMap, sync::Arc};
11use tokio::signal;
12use tokio::sync::RwLock;
13
14pub struct TaskContext<S>
15where
16 S: TaskStore, // Ensures that S is a type that implements the TaskStore trait
17{
18 queue_concurrency: HashMap<String, u32>, // Stores the concurrency level for each task queue
19 handlers: TaskHandlers, // Collection of task handlers to process different task types
20 shutdown: Arc<RwLock<bool>>, // Shared state to control the shutdown of the system
21 store: Arc<S>, // Arc wrapper around the task store, allowing shared ownership across threads
22}
23
24impl<S> TaskContext<S>
25where
26 S: TaskStore + Sync + Send + 'static, // S must implement TaskStore, and be Sync and Send
27{
28 /// Creates a new TaskContext with the provided store.
29 pub fn new(store: S) -> Self {
30 Self {
31 queue_concurrency: HashMap::new(), // Initialize concurrency map as empty
32 handlers: TaskHandlers::new(), // Create a new TaskHandlers instance
33 store: Arc::new(store), // Wrap the store in an Arc for shared ownership
34 shutdown: Arc::new(RwLock::new(false)), // Initialize shutdown state to false
35 }
36 }
37
38 /// Registers a new task type in the context.
39 pub fn register<T>(mut self) -> Self
40 where
41 T: Task, // T must implement the Task trait
42 {
43 self.handlers.register::<T>(); // Register the task handler
44 self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); // Set default concurrency for the task queue
45 self
46 }
47
48 /// Sets the concurrency level for a specified queue.
49 pub fn set_concurrency(mut self, queue: &str, count: u32) -> Self {
50 self.queue_concurrency.insert(queue.to_owned(), count); // Update the concurrency level for the queue
51 self
52 }
53
54 /// Starts the task cleaner to periodically clean up tasks.
55 fn start_task_cleaner(&self) {
56 let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); // Create a new TaskCleaner
57 cleaner.start(Duration::from_secs(60 * 10)); // Start the cleaner to run every 10 minutes
58 }
59
60 /// Starts worker threads for processing tasks in each queue.
61 fn start_worker(&self) {
62 let Self {
63 queue_concurrency,
64 handlers,
65 store,
66 shutdown,
67 } = self;
68 // Spawn workers based on the concurrency settings for each queue
69 for (queue, concurrency) in queue_concurrency.iter() {
70 for _ in 0..*concurrency {
71 let handlers = handlers.clone(); // Clone the handlers for the worker
72 let task_store = store.clone(); // Clone the task store for the worker
73 let shutdown = shutdown.clone(); // Clone the shutdown flag for the worker
74 let queue = queue.clone(); // Clone the queue name for the worker
75 tokio::spawn(async move {
76 process_task_worker(queue.as_str(), handlers, task_store, shutdown).await;
77 // Start processing tasks in the worker
78 });
79 }
80 }
81 }
82
83 /// Starts a signal listener to handle shutdown signals.
84 pub fn start_signal_listener(&self) {
85 let shutdown = self.shutdown.clone(); // Clone the shutdown flag
86 tokio::spawn(async move {
87 match signal::ctrl_c().await {
88 Ok(()) => {
89 println!("Shutdown signal received (Ctrl+C). Terminating all scheduled tasks and shutting down the system...");
90 let mut triggered = shutdown.write().await; // Acquire write lock to set shutdown state
91 *triggered = true; // Set the shutdown state to true
92 }
93 Err(err) => {
94 eprintln!("Error listening for shutdown signal: {:?}", err);
95 // Log any errors encountered while listening
96 }
97 }
98 });
99 }
100
101 /// Starts the task context, including workers and the task cleaner.
102 pub fn start(self) -> Self {
103 self.start_worker(); // Start task workers
104 self.start_task_cleaner(); // Start the task cleaner
105 self.start_signal_listener(); // Start the signal listener
106 self
107 }
108
109 /// Adds a new task to the context for execution.
110 pub async fn add_task<T>(&self, task: T) -> Result<(), String>
111 where
112 T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
113 {
114 let mut task_meta = task.new_meta(); // Create metadata for the new task
115 let next_run = match T::TASK_KIND {
116 TaskKind::Once | TaskKind::Repeat => {
117 utc_now!() + (task_meta.delay_seconds * 1000) as i64
118 } // Set next run time to now for once or repeat tasks
119 TaskKind::Cron => {
120 let schedule = T::SCHEDULE
121 .ok_or_else(|| "Cron schedule is required for TaskKind::Cron".to_string())?; // Ensure a cron schedule is provided
122
123 let timezone = T::TIMEZONE
124 .ok_or_else(|| "Timezone is required for TaskKind::Cron".to_string())?; // Ensure a timezone is provided
125
126 // Calculate the next run time based on the cron schedule and timezone
127 next_run(schedule, timezone, 0).ok_or_else(|| {
128 format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
129 })?
130 }
131 };
132
133 task_meta.next_run = next_run; // Set the next run time in the task metadata
134 task_meta.last_run = next_run; // Set the last run time in the task metadata
135 self.store
136 .store_task(task_meta) // Store the task metadata in the task store
137 .await
138 .map_err(|e| e.to_string()) // Handle any errors during the store operation
139 }
140}