distributed_scheduler/
cron.rs1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use job_scheduler_ng::{Schedule, Uuid};
4use tokio::sync::Mutex;
5use tracing::Instrument;
6
7use crate::{driver::Driver, node_pool};
8
9pub struct Cron<'a, D>
12where
13 D: Driver + Send + Sync,
14{
15 node_pool: Arc<node_pool::NodePool<D>>,
16 jobs: Mutex<HashMap<String, Uuid>>,
17 scheduler: Arc<Mutex<job_scheduler_ng::JobScheduler<'a>>>,
18}
19
20#[derive(Debug, thiserror::Error)]
21pub enum Error<D>
22where
23 D: Driver + Send + Sync + 'static,
24{
25 #[error("schedule stopped")]
26 SchedulerStopped,
27 #[error("job name already exists")]
28 JobNameConflict,
29 #[error("node pool error: {0}")]
30 NodePool(node_pool::Error<D>),
31}
32
33async fn run_scheduler(job_scheduler_ng: Arc<Mutex<job_scheduler_ng::JobScheduler<'_>>>) {
35 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
36
37 loop {
38 job_scheduler_ng.lock().await.tick();
39 interval.tick().await;
40 }
41}
42
43impl<'a, D> Cron<'a, D>
44where
45 D: Driver + Send + Sync + 'static,
46{
47 pub async fn new(np: node_pool::NodePool<D>) -> Arc<Self> {
49 Arc::new(Self {
50 node_pool: Arc::new(np),
51 jobs: Mutex::new(HashMap::new()),
52 scheduler: Arc::new(Mutex::new(job_scheduler_ng::JobScheduler::new())),
53 })
54 }
55
56 pub async fn start(&self) -> Result<(), Error<D>> {
58 let np = self.node_pool.as_ref();
59
60 tokio::select! {
61 _ = run_scheduler(self.scheduler.clone()) => {
62 Err(Error::SchedulerStopped)
63 },
64 Err(err) = np.start() => {
65 Err(Error::NodePool(err))
66 },
67 }
68 }
69
70 #[tracing::instrument(skip(self, job), err)]
72 async fn register_job(
73 &self,
74 job_name: &str,
75 job: job_scheduler_ng::Job<'a>,
76 ) -> Result<(), Error<D>> {
77 if self.jobs.lock().await.contains_key(job_name) {
79 return Err(Error::JobNameConflict);
80 }
81
82 let mut cron = self.scheduler.lock().await;
83 let id = cron.add(job);
84 self.jobs.lock().await.insert(job_name.to_string(), id);
85
86 tracing::info!(
87 name: "job_registered",
88 message = format!("Job {} registered", job_name),
89 scheduler.uuid = id.to_string(),
90 job.name = job_name
91 );
92
93 Ok(())
94 }
95
96 #[tracing::instrument(skip(self, run), err)]
104 pub async fn add_job<F>(
105 &self,
106 job_name: &str,
107 schedule: Schedule,
108 run: F,
109 ) -> Result<(), Error<D>>
110 where
111 F: 'static + Sync + Send + Fn(),
112 {
113 let run = Arc::new(run);
114
115 let job = job_scheduler_ng::Job::new(schedule, {
116 let job_name = job_name.to_string();
117 let np = self.node_pool.clone();
118
119 move || {
120 let job_name = job_name.clone();
121 let np = np.clone();
122 let run = run.clone();
123
124 let job_name_trace = job_name.clone();
125
126 tokio::spawn(
127 async move {
128 match np.check_job_available(&job_name).await {
129 Ok(is_this_node) if is_this_node => run(),
130 Ok(_) => {
131 tracing::trace!("Job is not available on this node")
132 }
133 Err(e) => tracing::error!("Failed to check job availability: {}", e),
134 }
135 }
136 .instrument(tracing::info_span!(
137 parent: None,
138 "job_run",
139 otel.name = format!("cronjob run: {}", job_name_trace),
140 job.name = job_name_trace,
141 )),
142 );
143 }
144 });
145
146 self.register_job(job_name, job).await?;
147
148 Ok(())
149 }
150
151 #[tracing::instrument(skip(self, run), err)]
159 pub async fn add_async_job<F, Fut>(
160 &self,
161 job_name: &str,
162 schedule: Schedule,
163 run: F,
164 ) -> Result<(), Error<D>>
165 where
166 F: 'static + Sync + Send + Fn() -> Fut,
167 Fut: Future<Output = ()> + Send,
168 {
169 let run = Arc::new(run);
170
171 let job = job_scheduler_ng::Job::new(schedule, {
172 let job_name = job_name.to_string();
173 let np = Arc::clone(&self.node_pool);
174 let run = Arc::clone(&run);
175
176 let job_name_trace = job_name.clone();
177
178 move || {
179 let job_name = job_name.clone();
180 let np = Arc::clone(&np);
181 let run = Arc::clone(&run);
182
183 tokio::spawn(
185 async move {
186 if np
188 .check_job_available(&job_name)
189 .await
190 .is_ok_and(|is_this_node| is_this_node)
191 {
192 run().await;
193 }
194 }
195 .instrument(tracing::info_span!(
196 parent: None,
197 "job_run",
198 otel.name = format!("cronjob run: {}", job_name_trace),
199 job.name = job_name_trace,
200 )),
201 );
202 }
203 });
204
205 self.register_job(job_name, job).await?;
206
207 Ok(())
208 }
209
210 #[tracing::instrument(skip(self), err)]
216 pub async fn remove_job(
217 &self,
218 job_name: &str,
219 ) -> Result<(), Error<D>> {
220 if let Some(id) = self.jobs.lock().await.remove(job_name) {
221 self.scheduler.lock().await.remove(id);
222
223 tracing::info!(
224 name: "job_removed",
225 message = format!("Job {} removed from scheduler", job_name),
226 scheduler.uuid = id.to_string(),
227 job.name = job_name,
228 );
229 }
230
231 Ok(())
232 }
233}
234
235
236impl<D> Drop for Cron<'_, D>
237where
238 D: Driver + Send + Sync,
239{
240 fn drop(&mut self) {
241 self.node_pool.stop();
242 }
243}