spring_job/
lib.rs

1//! [![spring-rs](https://img.shields.io/github/stars/spring-rs/spring-rs)](https://spring-rs.github.io/docs/plugins/spring-job)
2#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4
5pub mod extractor;
6pub mod handler;
7pub mod job;
8
9use spring::signal;
10/////////////////job-macros/////////////////////
11/// To use these Procedural Macros, you need to add `spring-job` dependency
12pub use spring_macros::cron;
13pub use spring_macros::fix_delay;
14pub use spring_macros::fix_rate;
15pub use spring_macros::one_shot;
16
17use anyhow::Context;
18use job::Job;
19use spring::async_trait;
20use spring::error::Result;
21use spring::plugin::component::ComponentRef;
22use spring::plugin::ComponentRegistry;
23use spring::plugin::MutableComponentRegistry;
24use spring::{
25    app::{App, AppBuilder},
26    plugin::Plugin,
27};
28use std::ops::Deref;
29use std::sync::Arc;
30use tokio_cron_scheduler::JobSchedulerError;
31use uuid::Uuid;
32
33#[derive(Clone, Default)]
34pub struct Jobs(Vec<Job>);
35
36impl Jobs {
37    pub fn new() -> Self {
38        Self::default()
39    }
40    fn single(job: Job) -> Self {
41        Self(vec![job])
42    }
43
44    pub fn add_job(mut self, job: Job) -> Self {
45        self.0.push(job);
46        self
47    }
48
49    pub fn add_jobs(mut self, jobs: Jobs) -> Self {
50        for job in jobs.0 {
51            self.0.push(job);
52        }
53        self
54    }
55
56    fn merge(&mut self, jobs: Jobs) {
57        for job in jobs.0 {
58            self.0.push(job);
59        }
60    }
61}
62
63impl Deref for Jobs {
64    type Target = Vec<Job>;
65
66    fn deref(&self) -> &Self::Target {
67        &self.0
68    }
69}
70
71pub type JobId = Uuid;
72pub type JobScheduler = tokio_cron_scheduler::JobScheduler;
73
74pub trait JobConfigurator {
75    fn add_job(&mut self, job: Job) -> &mut Self;
76    fn add_jobs(&mut self, job: Jobs) -> &mut Self;
77}
78
79impl JobConfigurator for AppBuilder {
80    fn add_job(&mut self, job: Job) -> &mut Self {
81        if let Some(jobs) = self.get_component_ref::<Jobs>() {
82            unsafe {
83                let raw_ptr = ComponentRef::into_raw(jobs);
84                let jobs = &mut *(raw_ptr as *mut Vec<Job>);
85                jobs.push(job);
86            }
87            self
88        } else {
89            self.add_component(Jobs::single(job))
90        }
91    }
92
93    fn add_jobs(&mut self, new_jobs: Jobs) -> &mut Self {
94        if let Some(jobs) = self.get_component_ref::<Jobs>() {
95            unsafe {
96                let raw_ptr = ComponentRef::into_raw(jobs);
97                let jobs = &mut *(raw_ptr as *mut Jobs);
98                jobs.merge(new_jobs);
99            }
100            self
101        } else {
102            self.add_component(new_jobs)
103        }
104    }
105}
106
107pub struct JobPlugin;
108
109#[async_trait]
110impl Plugin for JobPlugin {
111    async fn build(&self, app: &mut AppBuilder) {
112        let sched = Self::new_scheduler().await.expect("build scheduler failed");
113        app.add_component(sched)
114            .add_scheduler(|app: Arc<App>| Box::new(Self::schedule(app)));
115    }
116}
117
118impl JobPlugin {
119    async fn new_scheduler() -> std::result::Result<JobScheduler, JobSchedulerError> {
120        #[cfg(feature = "postgres_storage")]
121        {
122            let metadata_storage = Box::new(tokio_cron_scheduler::PostgresMetadataStore::default());
123            let notification_storage =
124                Box::new(tokio_cron_scheduler::PostgresNotificationStore::default());
125            let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
126            let notification_code =
127                Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
128            JobScheduler::new_with_storage_and_code(
129                metadata_storage,
130                notification_storage,
131                job_code,
132                notification_code,
133                200,
134            )
135            .await
136        }
137        #[cfg(all(not(feature = "postgres_storage"), feature = "nats_storage"))]
138        {
139            let metadata_storage = Box::new(tokio_cron_scheduler::NatsMetadataStore::default());
140            let notification_storage =
141                Box::new(tokio_cron_scheduler::NatsNotificationStore::default());
142            let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
143            let notification_code =
144                Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
145            JobScheduler::new_with_storage_and_code(
146                metadata_storage,
147                notification_storage,
148                job_code,
149                notification_code,
150                200,
151            )
152            .await
153        }
154        #[cfg(all(not(feature = "postgres_storage"), not(feature = "nats_storage")))]
155        {
156            JobScheduler::new().await
157        }
158    }
159
160    async fn schedule(app: Arc<App>) -> Result<String> {
161        let mut sched = app.get_expect_component::<JobScheduler>();
162        let jobs = app.get_component_ref::<Jobs>();
163
164        let jobs = match jobs {
165            None => {
166                let msg = "No tasks are registered, so the task scheduler does not start.";
167                tracing::info!(msg);
168                return Ok(msg.to_string());
169            }
170            Some(jobs) => jobs,
171        };
172
173        for job in jobs.deref().iter() {
174            sched
175                .add(job.to_owned().build(app.clone()))
176                .await
177                .context("add job failed")?;
178        }
179
180        let mut l = sched.clone();
181        // customize shutdown signal
182        tokio::spawn(async move {
183            let _ = signal::shutdown_signal("job").await;
184
185            if let Err(err) = l.shutdown().await {
186                tracing::error!("{:?}", err);
187            }
188        });
189
190        // Add code to be run during/after shutdown
191        sched.set_shutdown_handler(Box::new(|| {
192            Box::pin(async move {
193                tracing::info!("Shut down done");
194            })
195        }));
196
197        // Start the scheduler
198        sched.start().await.context("job scheduler start failed")?;
199
200        Ok("job schedule started".to_string())
201    }
202}