persistent_scheduler/core/
context.rs

1use crate::core::cleaner::TaskCleaner;
2use crate::core::cron::next_run;
3use crate::core::handlers::TaskHandlers;
4use crate::core::model::TaskMeta;
5use crate::core::status_updater::TaskStatusUpdater;
6use crate::core::store::TaskStore;
7use crate::core::task::Task;
8use crate::core::task_kind::TaskKind;
9use crate::utc_now;
10use ahash::AHashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use super::flow::TaskFlow;
15
16pub struct TaskContext<S>
17where
18    S: TaskStore + Send + Sync + Clone + 'static, // Ensures that S is a type that implements the TaskStore trait
19{
20    queue_concurrency: AHashMap<String, usize>, // Stores the concurrency level for each task queue
21    handlers: TaskHandlers, // Collection of task handlers to process different task types
22    store: Arc<S>, // Arc wrapper around the task store, allowing shared ownership across threads
23}
24
25impl<S> TaskContext<S>
26where
27    S: TaskStore + Send + Sync + Clone + 'static, // S must implement TaskStore, and be Sync and Send
28{
29    /// Creates a new TaskContext with the provided store.
30    pub fn new(store: S) -> Self {
31        let store = Arc::new(store);
32        Self {
33            queue_concurrency: AHashMap::new(), // Initialize concurrency map as empty
34            handlers: TaskHandlers::new(),      // Create a new TaskHandlers instance
35            store: store.clone(),               // Wrap the store in an Arc for shared ownership
36        }
37    }
38
39    /// Creates a new TaskContext with the provided Arc-wrapped store.
40    pub fn with_arc_store(store: Arc<S>) -> Self {
41        Self {
42            queue_concurrency: AHashMap::new(), // Initialize concurrency map as empty
43            handlers: TaskHandlers::new(),      // Create a new TaskHandlers instance
44            store,                              // Use the provided Arc directly
45        }
46    }
47
48    /// Registers a new task type in the context.
49    pub fn register<T>(mut self) -> Self
50    where
51        T: Task, // T must implement the Task trait
52    {
53        self.handlers.register::<T>(); // Register the task handler
54        self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); // Set default concurrency for the task queue
55        self
56    }
57
58    /// Sets the concurrency level for a specified queue.
59    pub fn set_concurrency(mut self, queue: &str, count: usize) -> Self {
60        self.queue_concurrency.insert(queue.to_owned(), count); // Update the concurrency level for the queue
61        self
62    }
63
64    /// Starts the task cleaner to periodically clean up tasks.
65    fn start_task_cleaner(&self) {
66        let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); // Create a new TaskCleaner
67        cleaner.start(Duration::from_secs(60 * 10)); // Start the cleaner to run every 10 minutes
68    }
69
70    /// Starts worker threads for processing tasks in each queue.
71    async fn start_flow(&self) {
72        let status_updater = Arc::new(TaskStatusUpdater::new(
73            self.store.clone(),
74            self.queue_concurrency.len(),
75        ));
76
77        let flow = Arc::new(TaskFlow::new(
78            self.store.clone(),
79            &self.queue_concurrency,
80            Arc::new(self.handlers.clone()),
81            status_updater,
82        ));
83
84        flow.start().await;
85    }
86
87    /// Starts the task context, including workers and the task cleaner.
88    pub async fn start(self) -> Self {
89        self.start_flow().await; // Start task workers
90        self.start_task_cleaner(); // Start the task cleaner
91        self
92    }
93
94    /// Adds a new task to the context for execution.
95    pub async fn add_task<T>(
96        &self,
97        task: T,
98        kind: TaskKind,
99        delay_seconds: Option<u32>,
100    ) -> Result<(), String>
101    where
102        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
103    {
104        let mut task_meta = task.new_meta(kind); // Create metadata for the new task
105        let next_run = match &task_meta.kind {
106            TaskKind::Once | TaskKind::Repeat { .. } => {
107                let delay_seconds = delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
108                utc_now!() + delay_seconds as i64
109            } // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
110            TaskKind::Cron { schedule, timezone } => {
111                // Calculate the next run time based on the cron schedule and timezone
112                next_run(&*schedule, &*timezone, 0).ok_or_else(|| {
113                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
114                })?
115            }
116        };
117
118        task_meta.next_run = next_run;
119        task_meta.last_run = next_run;
120        self.store
121            .store_task(task_meta) // Store the task metadata in the task store
122            .await
123            .map_err(|e| e.to_string()) // Handle any errors during the store operation
124    }
125
126    /// Adds a new task to the context for execution.
127    pub async fn add_tasks<T>(&self, tasks: Vec<TaskConfiguration<T>>) -> Result<(), String>
128    where
129        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
130    {
131        let mut batch: Vec<TaskMeta> = Vec::new();
132
133        for task in tasks {
134            let mut task_meta = task.inner.new_meta(task.kind); // Create metadata for the new task
135            let next_run = match &task_meta.kind {
136                TaskKind::Once | TaskKind::Repeat { .. } => {
137                    let delay_seconds =
138                        task.delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
139                    utc_now!() + delay_seconds as i64
140                } // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
141                TaskKind::Cron { schedule, timezone } => {
142                    // Calculate the next run time based on the cron schedule and timezone
143                    next_run(&*schedule, &*timezone, 0).ok_or_else(|| {
144                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
145                })?
146                }
147            };
148
149            task_meta.next_run = next_run;
150            task_meta.last_run = next_run;
151            batch.push(task_meta);
152        }
153
154        self.store
155            .store_tasks(batch) // Store the task metadata in the task store
156            .await
157            .map_err(|e| e.to_string()) // Handle any errors during the store operation
158    }
159}
160
161pub struct TaskConfiguration<T: Task> {
162    pub inner: T,
163    pub kind: TaskKind,
164    pub delay_seconds: Option<u32>,
165}