distributed_scheduler/
cron.rs

1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use job_scheduler_ng::{Schedule, Uuid};
4use tokio::sync::Mutex;
5use tracing::Instrument;
6
7use crate::{driver::Driver, node_pool};
8
9/// The `Cron` struct is the main entry point for the library, providing the ability to add and
10/// remove jobs.
11pub struct Cron<'a, D>
12where
13    D: Driver + Send + Sync,
14{
15    node_pool: Arc<node_pool::NodePool<D>>,
16    jobs: Mutex<HashMap<String, Uuid>>,
17    scheduler: Arc<Mutex<job_scheduler_ng::JobScheduler<'a>>>,
18}
19
20#[derive(Debug, thiserror::Error)]
21pub enum Error<D>
22where
23    D: Driver + Send + Sync + 'static,
24{
25    #[error("schedule stopped")]
26    SchedulerStopped,
27    #[error("job name already exists")]
28    JobNameConflict,
29    #[error("node pool error: {0}")]
30    NodePool(node_pool::Error<D>),
31}
32
33/// Run the scheduler in a separate task, return a Future
34async fn run_scheduler(job_scheduler_ng: Arc<Mutex<job_scheduler_ng::JobScheduler<'_>>>) {
35    let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
36
37    loop {
38        job_scheduler_ng.lock().await.tick();
39        interval.tick().await;
40    }
41}
42
43impl<'a, D> Cron<'a, D>
44where
45    D: Driver + Send + Sync + 'static,
46{
47    /// Create a new cron with the given node pool.
48    pub async fn new(np: node_pool::NodePool<D>) -> Arc<Self> {
49        Arc::new(Self {
50            node_pool: Arc::new(np),
51            jobs: Mutex::new(HashMap::new()),
52            scheduler: Arc::new(Mutex::new(job_scheduler_ng::JobScheduler::new())),
53        })
54    }
55
56    /// Start the cron, blocking the current thread.
57    pub async fn start(&self) -> Result<(), Error<D>> {
58        let np = self.node_pool.as_ref();
59
60        tokio::select! {
61            _ = run_scheduler(self.scheduler.clone()) => {
62                Err(Error::SchedulerStopped)
63            },
64            Err(err) = np.start() => {
65                Err(Error::NodePool(err))
66            },
67        }
68    }
69
70    /// Register a job in the scheduler
71    #[tracing::instrument(skip(self, job), err)]
72    async fn register_job(
73        &self,
74        job_name: &str,
75        job: job_scheduler_ng::Job<'a>,
76    ) -> Result<(), Error<D>> {
77        // check if job name is conflict
78        if self.jobs.lock().await.contains_key(job_name) {
79            return Err(Error::JobNameConflict);
80        }
81
82        let mut cron = self.scheduler.lock().await;
83        let id = cron.add(job);
84        self.jobs.lock().await.insert(job_name.to_string(), id);
85
86        tracing::info!(
87            name: "job_registered",
88            message = format!("Job {} registered", job_name),
89            scheduler.uuid = id.to_string(),
90            job.name = job_name
91        );
92
93        Ok(())
94    }
95
96    /// Add a job to the cron
97    ///
98    /// # Arguments
99    ///
100    /// * `job_name` - The unique name of the job
101    /// * `schedule` - The schedule of the job
102    /// * `run` - The function to run
103    #[tracing::instrument(skip(self, run), err)]
104    pub async fn add_job<F>(
105        &self,
106        job_name: &str,
107        schedule: Schedule,
108        run: F,
109    ) -> Result<(), Error<D>>
110    where
111        F: 'static + Sync + Send + Fn(),
112    {
113        let run = Arc::new(run);
114
115        let job = job_scheduler_ng::Job::new(schedule, {
116            let job_name = job_name.to_string();
117            let np = self.node_pool.clone();
118
119            move || {
120                let job_name = job_name.clone();
121                let np = np.clone();
122                let run = run.clone();
123
124                let job_name_trace = job_name.clone();
125
126                tokio::spawn(
127                    async move {
128                        match np.check_job_available(&job_name).await {
129                            Ok(is_this_node) if is_this_node => run(),
130                            Ok(_) => {
131                                tracing::trace!("Job is not available on this node")
132                            }
133                            Err(e) => tracing::error!("Failed to check job availability: {}", e),
134                        }
135                    }
136                    .instrument(tracing::info_span!(
137                        parent: None,
138                        "job_run",
139                        otel.name = format!("cronjob run: {}", job_name_trace),
140                        job.name = job_name_trace,
141                    )),
142                );
143            }
144        });
145
146        self.register_job(job_name, job).await?;
147
148        Ok(())
149    }
150
151    /// Add an async job to the cron
152    ///
153    /// # Arguments
154    ///
155    /// * `job_name` - The unique name of the job
156    /// * `schedule` - The schedule of the job
157    /// * `run` - The async function to run
158    #[tracing::instrument(skip(self, run), err)]
159    pub async fn add_async_job<F, Fut>(
160        &self,
161        job_name: &str,
162        schedule: Schedule,
163        run: F,
164    ) -> Result<(), Error<D>>
165    where
166        F: 'static + Sync + Send + Fn() -> Fut,
167        Fut: Future<Output = ()> + Send,
168    {
169        let run = Arc::new(run);
170
171        let job = job_scheduler_ng::Job::new(schedule, {
172            let job_name = job_name.to_string();
173            let np = Arc::clone(&self.node_pool);
174            let run = Arc::clone(&run);
175
176            let job_name_trace = job_name.clone();
177
178            move || {
179                let job_name = job_name.clone();
180                let np = Arc::clone(&np);
181                let run = Arc::clone(&run);
182
183                // spawn the async job
184                tokio::spawn(
185                    async move {
186                        // check if the job is available
187                        if np
188                            .check_job_available(&job_name)
189                            .await
190                            .is_ok_and(|is_this_node| is_this_node)
191                        {
192                            run().await;
193                        }
194                    }
195                    .instrument(tracing::info_span!(
196                        parent: None,
197                        "job_run",
198                        otel.name = format!("cronjob run: {}", job_name_trace),
199                        job.name = job_name_trace,
200                    )),
201                );
202            }
203        });
204
205        self.register_job(job_name, job).await?;
206
207        Ok(())
208    }
209
210    /// Remove a job from the cron
211    ///
212    /// # Arguments
213    ///
214    /// * `job_name` - The unique name of the job
215    #[tracing::instrument(skip(self), err)]
216    pub async fn remove_job(
217        &self,
218        job_name: &str,
219    ) -> Result<(), Error<D>> {
220        if let Some(id) = self.jobs.lock().await.remove(job_name) {
221            self.scheduler.lock().await.remove(id);
222
223            tracing::info!(
224                name: "job_removed",
225                message = format!("Job {} removed from scheduler", job_name),
226                scheduler.uuid = id.to_string(),
227                job.name = job_name,
228            );
229        }
230
231        Ok(())
232    }
233}
234
235
236impl<D> Drop for Cron<'_, D>
237where
238    D: Driver + Send + Sync,
239{
240    fn drop(&mut self) {
241        self.node_pool.stop();
242    }
243}