persistent_scheduler/core/
context.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
use crate::core::cleaner::TaskCleaner;
use crate::core::cron::next_run;
use crate::core::handlers::TaskHandlers;
use crate::core::store::TaskStore;
use crate::core::task::Task;
use crate::core::task_kind::TaskKind;
use crate::core::worker::process_task_worker;
use crate::utc_now;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use tokio::signal;
use tokio::sync::RwLock;

use super::model::TaskMeta;

pub struct TaskContext<S>
where
    S: TaskStore, // Ensures that S is a type that implements the TaskStore trait
{
    queue_concurrency: HashMap<String, u32>, // Stores the concurrency level for each task queue
    handlers: TaskHandlers, // Collection of task handlers to process different task types
    shutdown: Arc<RwLock<bool>>, // Shared state to control the shutdown of the system
    store: Arc<S>, // Arc wrapper around the task store, allowing shared ownership across threads
}

impl<S> TaskContext<S>
where
    S: TaskStore + Sync + Send + 'static, // S must implement TaskStore, and be Sync and Send
{
    /// Creates a new TaskContext with the provided store.
    pub fn new(store: S) -> Self {
        Self {
            queue_concurrency: HashMap::new(), // Initialize concurrency map as empty
            handlers: TaskHandlers::new(),     // Create a new TaskHandlers instance
            store: Arc::new(store),            // Wrap the store in an Arc for shared ownership
            shutdown: Arc::new(RwLock::new(false)), // Initialize shutdown state to false
        }
    }

    /// Registers a new task type in the context.
    pub fn register<T>(mut self) -> Self
    where
        T: Task, // T must implement the Task trait
    {
        self.handlers.register::<T>(); // Register the task handler
        self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); // Set default concurrency for the task queue
        self
    }

    /// Sets the concurrency level for a specified queue.
    pub fn set_concurrency(mut self, queue: &str, count: u32) -> Self {
        self.queue_concurrency.insert(queue.to_owned(), count); // Update the concurrency level for the queue
        self
    }

    /// Starts the task cleaner to periodically clean up tasks.
    fn start_task_cleaner(&self) {
        let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); // Create a new TaskCleaner
        cleaner.start(Duration::from_secs(60 * 10)); // Start the cleaner to run every 10 minutes
    }

    /// Starts worker threads for processing tasks in each queue.
    fn start_worker(&self) {
        let Self {
            queue_concurrency,
            handlers,
            store,
            shutdown,
        } = self;
        // Spawn workers based on the concurrency settings for each queue
        for (queue, concurrency) in queue_concurrency.iter() {
            for _ in 0..*concurrency {
                let handlers = handlers.clone(); // Clone the handlers for the worker
                let task_store = store.clone(); // Clone the task store for the worker
                let shutdown = shutdown.clone(); // Clone the shutdown flag for the worker
                let queue = queue.clone(); // Clone the queue name for the worker
                tokio::spawn(async move {
                    process_task_worker(queue.as_str(), handlers, task_store, shutdown).await;
                    // Start processing tasks in the worker
                });
            }
        }
    }

    /// Starts a signal listener to handle shutdown signals.
    fn start_signal_listener(&self) {
        let shutdown = self.shutdown.clone(); // Clone the shutdown flag
        tokio::spawn(async move {
            match signal::ctrl_c().await {
                Ok(()) => {
                    println!("Shutdown signal received (Ctrl+C). Terminating all scheduled tasks and shutting down the system...");
                    let mut triggered = shutdown.write().await; // Acquire write lock to set shutdown state
                    *triggered = true; // Set the shutdown state to true
                }
                Err(err) => {
                    eprintln!("Error listening for shutdown signal: {:?}", err);
                    // Log any errors encountered while listening
                }
            }
        });
    }

    /// Starts the task context, including workers and the task cleaner.
    pub fn start(self) -> Self {
        self.start_worker(); // Start task workers
        self.start_task_cleaner(); // Start the task cleaner
        self.start_signal_listener(); // Start the signal listener
        self
    }

    /// Adds a new task to the context for execution.
    pub async fn add_task<T>(&self, task: T, delay_seconds: Option<u32>) -> Result<(), String>
    where
        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
    {
        let mut task_meta = task.new_meta(); // Create metadata for the new task
        let next_run = match T::TASK_KIND {
            TaskKind::Once | TaskKind::Repeat => {
                let delay_seconds = delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
                utc_now!() + delay_seconds as i64
            } // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
            TaskKind::Cron => {
                let schedule = T::SCHEDULE
                    .ok_or_else(|| "Cron schedule is required for TaskKind::Cron".to_string())?; // Ensure a cron schedule is provided

                let timezone = T::TIMEZONE
                    .ok_or_else(|| "Timezone is required for TaskKind::Cron".to_string())?; // Ensure a timezone is provided

                // Calculate the next run time based on the cron schedule and timezone
                next_run(schedule, timezone, 0).ok_or_else(|| {
                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
                })?
            }
        };

        task_meta.next_run = next_run;
        task_meta.last_run = next_run;
        self.store
            .store_task(task_meta) // Store the task metadata in the task store
            .await
            .map_err(|e| e.to_string()) // Handle any errors during the store operation
    }

    /// Adds a new task to the context for execution.
    pub async fn add_tasks<T>(&self, tasks: Vec<TaskAndDelay<T>>) -> Result<(), String>
    where
        T: Task + Send + Sync + 'static, // T must implement the Task trait and be thread-safe
    {
        let mut batch: Vec<TaskMeta> = Vec::new();

        for task in tasks {
            let mut task_meta = task.inner.new_meta(); // Create metadata for the new task
            let next_run = match T::TASK_KIND {
                TaskKind::Once | TaskKind::Repeat => {
                    let delay_seconds =
                        task.delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
                    utc_now!() + delay_seconds as i64
                } // Set the next run time by adding a delay to the current time, allowing the task to run at a specified future time.
                TaskKind::Cron => {
                    let schedule = T::SCHEDULE.ok_or_else(|| {
                        "Cron schedule is required for TaskKind::Cron".to_string()
                    })?; // Ensure a cron schedule is provided

                    let timezone = T::TIMEZONE
                        .ok_or_else(|| "Timezone is required for TaskKind::Cron".to_string())?; // Ensure a timezone is provided

                    // Calculate the next run time based on the cron schedule and timezone
                    next_run(schedule, timezone, 0).ok_or_else(|| {
                    format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
                })?
                }
            };

            task_meta.next_run = next_run;
            task_meta.last_run = next_run;
            batch.push(task_meta);
        }

        self.store
            .store_tasks(batch) // Store the task metadata in the task store
            .await
            .map_err(|e| e.to_string()) // Handle any errors during the store operation
    }
}

pub struct TaskAndDelay<T: Task> {
    pub inner: T,
    pub delay_seconds: Option<u32>,
}