spring_job/
lib.rs

1//! [![spring-rs](https://img.shields.io/github/stars/spring-rs/spring-rs)](https://spring-rs.github.io/docs/plugins/spring-job)
2#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4
5pub mod extractor;
6pub mod handler;
7pub mod job;
8
9/////////////////job-macros/////////////////////
10/// To use these Procedural Macros, you need to add `spring-job` dependency
11pub use spring_macros::cron;
12pub use spring_macros::fix_delay;
13pub use spring_macros::fix_rate;
14pub use spring_macros::one_shot;
15
16use anyhow::Context;
17use job::Job;
18use spring::async_trait;
19use spring::error::Result;
20use spring::plugin::component::ComponentRef;
21use spring::plugin::ComponentRegistry;
22use spring::plugin::MutableComponentRegistry;
23use spring::{
24    app::{App, AppBuilder},
25    plugin::Plugin,
26};
27use std::ops::Deref;
28use std::sync::Arc;
29use tokio_cron_scheduler::JobSchedulerError;
30use uuid::Uuid;
31
32#[derive(Clone, Default)]
33pub struct Jobs(Vec<Job>);
34
35impl Jobs {
36    pub fn new() -> Self {
37        Self::default()
38    }
39    fn single(job: Job) -> Self {
40        Self(vec![job])
41    }
42
43    pub fn add_job(mut self, job: Job) -> Self {
44        self.0.push(job);
45        self
46    }
47
48    pub fn add_jobs(mut self, jobs: Jobs) -> Self {
49        for job in jobs.0 {
50            self.0.push(job);
51        }
52        self
53    }
54
55    fn merge(&mut self, jobs: Jobs) {
56        for job in jobs.0 {
57            self.0.push(job);
58        }
59    }
60}
61
62impl Deref for Jobs {
63    type Target = Vec<Job>;
64
65    fn deref(&self) -> &Self::Target {
66        &self.0
67    }
68}
69
70pub type JobId = Uuid;
71pub type JobScheduler = tokio_cron_scheduler::JobScheduler;
72
73pub trait JobConfigurator {
74    fn add_job(&mut self, job: Job) -> &mut Self;
75    fn add_jobs(&mut self, job: Jobs) -> &mut Self;
76}
77
78impl JobConfigurator for AppBuilder {
79    fn add_job(&mut self, job: Job) -> &mut Self {
80        if let Some(jobs) = self.get_component_ref::<Jobs>() {
81            unsafe {
82                let raw_ptr = ComponentRef::into_raw(jobs);
83                let jobs = &mut *(raw_ptr as *mut Vec<Job>);
84                jobs.push(job);
85            }
86            self
87        } else {
88            self.add_component(Jobs::single(job))
89        }
90    }
91
92    fn add_jobs(&mut self, new_jobs: Jobs) -> &mut Self {
93        if let Some(jobs) = self.get_component_ref::<Jobs>() {
94            unsafe {
95                let raw_ptr = ComponentRef::into_raw(jobs);
96                let jobs = &mut *(raw_ptr as *mut Jobs);
97                jobs.merge(new_jobs);
98            }
99            self
100        } else {
101            self.add_component(new_jobs)
102        }
103    }
104}
105
106pub struct JobPlugin;
107
108#[async_trait]
109impl Plugin for JobPlugin {
110    async fn build(&self, app: &mut AppBuilder) {
111        let sched = Self::new_scheduler().await.expect("build scheduler failed");
112        app.add_component(sched)
113            .add_scheduler(|app: Arc<App>| Box::new(Self::schedule(app)));
114    }
115}
116
117impl JobPlugin {
118    async fn new_scheduler() -> std::result::Result<JobScheduler, JobSchedulerError> {
119        #[cfg(feature = "postgres_storage")]
120        {
121            let metadata_storage = Box::new(tokio_cron_scheduler::PostgresMetadataStore::default());
122            let notification_storage =
123                Box::new(tokio_cron_scheduler::PostgresNotificationStore::default());
124            let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
125            let notification_code =
126                Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
127            JobScheduler::new_with_storage_and_code(
128                metadata_storage,
129                notification_storage,
130                job_code,
131                notification_code,
132                200,
133            )
134            .await
135        }
136        #[cfg(all(not(feature = "postgres_storage"), feature = "nats_storage"))]
137        {
138            let metadata_storage = Box::new(tokio_cron_scheduler::NatsMetadataStore::default());
139            let notification_storage =
140                Box::new(tokio_cron_scheduler::NatsNotificationStore::default());
141            let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
142            let notification_code =
143                Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
144            JobScheduler::new_with_storage_and_code(
145                metadata_storage,
146                notification_storage,
147                job_code,
148                notification_code,
149                200,
150            )
151            .await
152        }
153        #[cfg(all(not(feature = "postgres_storage"), not(feature = "nats_storage")))]
154        {
155            JobScheduler::new().await
156        }
157    }
158
159    async fn schedule(app: Arc<App>) -> Result<String> {
160        let mut sched = app.get_expect_component::<JobScheduler>();
161        let jobs = app.get_component_ref::<Jobs>();
162
163        let jobs = match jobs {
164            None => {
165                let msg = "No tasks are registered, so the task scheduler does not start.";
166                tracing::info!(msg);
167                return Ok(msg.to_string());
168            }
169            Some(jobs) => jobs,
170        };
171
172        for job in jobs.deref().iter() {
173            sched
174                .add(job.to_owned().build(app.clone()))
175                .await
176                .context("add job failed")?;
177        }
178
179        sched.shutdown_on_ctrl_c();
180
181        // Add code to be run during/after shutdown
182        sched.set_shutdown_handler(Box::new(|| {
183            Box::pin(async move {
184                tracing::info!("Shut down done");
185            })
186        }));
187
188        // Start the scheduler
189        sched.start().await.context("job scheduler start failed")?;
190
191        Ok("job schedule started".to_string())
192    }
193}