persistent_scheduler/core/context.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
use crate::core::cleaner::TaskCleaner;
use crate::core::cron::next_run;
use crate::core::handlers::TaskHandlers;
use crate::core::store::TaskStore;
use crate::core::task::Task;
use crate::core::task_kind::TaskKind;
use crate::core::worker::process_task_worker;
use crate::utc_now;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use tokio::signal;
use tokio::sync::RwLock;
pub struct TaskContext<S>
where
S: TaskStore, // Ensures that S is a type that implements the TaskStore trait
{
queue_concurrency: HashMap<String, u32>, // Stores the concurrency level for each task queue
handlers: TaskHandlers, // Collection of task handlers to process different task types
shutdown: Arc<RwLock<bool>>, // Shared state to control the shutdown of the system
store: Arc<S>, // Arc wrapper around the task store, allowing shared ownership across threads
}
impl<S> TaskContext<S>
where
S: TaskStore + Sync + Send + 'static, // S must implement TaskStore, and be Sync and Send
{
/// Creates a new TaskContext with the provided store.
pub fn new(store: S) -> Self {
Self {
queue_concurrency: HashMap::new(), // Initialize concurrency map as empty
handlers: TaskHandlers::new(), // Create a new TaskHandlers instance
store: Arc::new(store), // Wrap the store in an Arc for shared ownership
shutdown: Arc::new(RwLock::new(false)), // Initialize shutdown state to false
}
}
/// Registers a new task type in the context.
pub fn register<T>(mut self) -> Self
where
T: Task, // T must implement the Task trait
{
self.handlers.register::<T>(); // Register the task handler
self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); // Set default concurrency for the task queue
self
}
/// Sets the concurrency level for a specified queue.
pub fn set_concurrency(mut self, queue: &str, count: u32) -> Self {
self.queue_concurrency.insert(queue.to_owned(), count); // Update the concurrency level for the queue
self
}
/// Starts the task cleaner to periodically clean up tasks.
fn start_task_cleaner(&self) {
let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); // Create a new TaskCleaner
cleaner.start(Duration::from_secs(60 * 10)); // Start the cleaner to run every 10 minutes
}
/// Starts worker threads for processing tasks in each queue.
fn start_worker(&self) {
let Self {
queue_concurrency,
handlers,
store,
shutdown,
} = self;
// Spawn workers based on the concurrency settings for each queue
for (queue, concurrency) in queue_concurrency.iter() {
for _ in 0..*concurrency {
let handlers = handlers.clone(); // Clone the handlers for the worker
let task_store = store.clone(); // Clone the task store for the worker
let shutdown = shutdown.clone(); // Clone the shutdown flag for the worker
let queue = queue.clone(); // Clone the queue name for the worker
tokio::spawn(async move {
process_task_worker(queue.as_str(), handlers, task_store, shutdown).await;
// Start processing tasks in the worker
});
}
}
}
/// Starts a signal listener to handle shutdown signals.
fn start_signal_listener(&self) {
let shutdown = self.shutdown.clone(); // Clone the shutdown flag
tokio::spawn(async move {
match signal::ctrl_c().await {
Ok(()) => {
println!("Shutdown signal received (Ctrl+C). Terminating all scheduled tasks and shutting down the system...");
let mut triggered = shutdown.write().await; // Acquire write lock to set shutdown state
*triggered = true; // Set the shutdown state to true
}
Err(err) => {
eprintln!("Error listening for shutdown signal: {:?}", err);
// Log any errors encountered while listening
}
}
});
}
/// Starts the task context, including workers and the task cleaner.
pub fn start(self) -> Self {
self.start_worker(); // Start task workers
self.start_task_cleaner(); // Start the task cleaner
self.start_signal_listener(); // Start the signal listener
self
}
/// Adds a new task to the context for execution.
pub async fn add_task<T>(&self, task: T, delay_seconds: Option<u32>) -> Result<(), String>
where
T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
{
let mut task_meta = task.new_meta(); // Create metadata for the new task
let next_run = match T::TASK_KIND {
TaskKind::Once | TaskKind::Repeat => {
let delay_seconds = delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
utc_now!() + delay_seconds as i64
} // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
TaskKind::Cron => {
let schedule = T::SCHEDULE
.ok_or_else(|| "Cron schedule is required for TaskKind::Cron".to_string())?; // Ensure a cron schedule is provided
let timezone = T::TIMEZONE
.ok_or_else(|| "Timezone is required for TaskKind::Cron".to_string())?; // Ensure a timezone is provided
// Calculate the next run time based on the cron schedule and timezone
next_run(schedule, timezone, 0).ok_or_else(|| {
format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
})?
}
};
task_meta.next_run = next_run;
task_meta.last_run = next_run;
self.store
.store_task(task_meta) // Store the task metadata in the task store
.await
.map_err(|e| e.to_string()) // Handle any errors during the store operation
}
}