persistent_scheduler/core/
context.rs

1use crate::core::cleaner::TaskCleaner;
2use crate::core::cron::next_run;
3use crate::core::handlers::TaskHandlers;
4use crate::core::model::TaskMeta;
5use crate::core::status_updater::TaskStatusUpdater;
6use crate::core::store::TaskStore;
7use crate::core::task::Task;
8use crate::core::task_kind::TaskKind;
9use crate::utc_now;
10use ahash::AHashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use super::flow::TaskFlow;
15
16pub struct TaskContext<S>
17where
18    S: TaskStore + Send + Sync + Clone + 'static, // Ensures that S is a type that implements the TaskStore trait
19{
20    queue_concurrency: AHashMap<String, usize>, // Stores the concurrency level for each task queue
21    handlers: TaskHandlers, // Collection of task handlers to process different task types
22    store: Arc<S>, // Arc wrapper around the task store, allowing shared ownership across threads
23}
24
25impl<S> TaskContext<S>
26where
27    S: TaskStore + Send + Sync + Clone + 'static, // S must implement TaskStore, and be Sync and Send
28{
29    /// Creates a new TaskContext with the provided store.
30    pub fn new(store: S) -> Self {
31        let store = Arc::new(store);
32        Self {
33            queue_concurrency: AHashMap::new(), // Initialize concurrency map as empty
34            handlers: TaskHandlers::new(),      // Create a new TaskHandlers instance
35            store: store.clone(),               // Wrap the store in an Arc for shared ownership
36        }
37    }
38
39    /// Creates a new TaskContext with the provided Arc-wrapped store.
40    pub fn with_arc_store(store: Arc<S>) -> Self {
41        Self {
42            queue_concurrency: AHashMap::new(), // Initialize concurrency map as empty
43            handlers: TaskHandlers::new(),      // Create a new TaskHandlers instance
44            store,                              // Use the provided Arc directly
45        }
46    }
47
48    /// Registers a new task type in the context.
49    pub fn register<T>(mut self) -> Self
50    where
51        T: Task, // T must implement the Task trait
52    {
53        self.handlers.register::<T>(); // Register the task handler
54        self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); // Set default concurrency for the task queue
55        self
56    }
57
58    /// Sets the concurrency level for a specified queue.
59    pub fn set_concurrency(mut self, queue: &str, count: usize) -> Self {
60        self.queue_concurrency.insert(queue.to_owned(), count); // Update the concurrency level for the queue
61        self
62    }
63
64    /// Starts the task cleaner to periodically clean up tasks.
65    fn start_task_cleaner(&self) {
66        let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); // Create a new TaskCleaner
67        cleaner.start(Duration::from_secs(60 * 10)); // Start the cleaner to run every 10 minutes
68    }
69
70    /// Starts worker threads for processing tasks in each queue.
71    async fn start_flow(&self) {
72        let status_updater = Arc::new(TaskStatusUpdater::new(
73            self.store.clone(),
74            self.queue_concurrency.len(),
75        ));
76
77        let flow = Arc::new(TaskFlow::new(
78            self.store.clone(),
79            &self.queue_concurrency,
80            Arc::new(self.handlers.clone()),
81            status_updater,
82        ));
83
84        flow.start().await;
85    }
86
87    /// Starts the task context with workers, leaving task cleanup to be handled manually by the user.
88    pub async fn start(self) -> Self {
89        self.start_flow().await; // Start task workers
90        self
91    }
92
93    /// Runs the task context, enabling workers and the task cleaner.
94    pub async fn start_with_cleaner(self) -> Self {
95        self.start_flow().await; // Start task workers
96        self.start_task_cleaner(); // Start the task cleaner
97        self
98    }
99
100    /// Adds a new task to the context for execution.
101    pub async fn add_task<T>(
102        &self,
103        task: T,
104        kind: TaskKind,
105        delay_seconds: Option<u32>,
106    ) -> Result<(), String>
107    where
108        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
109    {
110        let mut task_meta = task.new_meta(kind); // Create metadata for the new task
111        let next_run = match &task_meta.kind {
112            TaskKind::Once | TaskKind::Repeat { .. } => {
113                let delay_seconds = delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
114                utc_now!() + delay_seconds as i64
115            } // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
116            TaskKind::Cron { schedule, timezone } => {
117                // Calculate the next run time based on the cron schedule and timezone
118                next_run(&*schedule, &*timezone, 0).ok_or_else(|| {
119                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
120                })?
121            }
122        };
123
124        task_meta.next_run = next_run;
125        task_meta.last_run = next_run;
126        self.store
127            .store_task(task_meta) // Store the task metadata in the task store
128            .await
129            .map_err(|e| e.to_string()) // Handle any errors during the store operation
130    }
131
132    /// Adds a new task to the context for execution.
133    pub async fn add_tasks<T>(&self, tasks: Vec<TaskConfiguration<T>>) -> Result<(), String>
134    where
135        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
136    {
137        let mut batch: Vec<TaskMeta> = Vec::new();
138
139        for task in tasks {
140            let mut task_meta = task.inner.new_meta(task.kind); // Create metadata for the new task
141            let next_run = match &task_meta.kind {
142                TaskKind::Once | TaskKind::Repeat { .. } => {
143                    let delay_seconds =
144                        task.delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
145                    utc_now!() + delay_seconds as i64
146                } // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
147                TaskKind::Cron { schedule, timezone } => {
148                    // Calculate the next run time based on the cron schedule and timezone
149                    next_run(&*schedule, &*timezone, 0).ok_or_else(|| {
150                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
151                })?
152                }
153            };
154
155            task_meta.next_run = next_run;
156            task_meta.last_run = next_run;
157            batch.push(task_meta);
158        }
159
160        self.store
161            .store_tasks(batch) // Store the task metadata in the task store
162            .await
163            .map_err(|e| e.to_string()) // Handle any errors during the store operation
164    }
165}
166
167pub struct TaskConfiguration<T: Task> {
168    pub inner: T,
169    pub kind: TaskKind,
170    pub delay_seconds: Option<u32>,
171}