1#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4
5pub mod extractor;
6pub mod handler;
7pub mod job;
8
9pub use spring_macros::cron;
12pub use spring_macros::fix_delay;
13pub use spring_macros::fix_rate;
14pub use spring_macros::one_shot;
15
16use anyhow::Context;
17use job::Job;
18use spring::async_trait;
19use spring::error::Result;
20use spring::plugin::component::ComponentRef;
21use spring::plugin::ComponentRegistry;
22use spring::plugin::MutableComponentRegistry;
23use spring::{
24 app::{App, AppBuilder},
25 plugin::Plugin,
26};
27use std::ops::Deref;
28use std::sync::Arc;
29use tokio_cron_scheduler::JobSchedulerError;
30use uuid::Uuid;
31
32#[derive(Clone, Default)]
33pub struct Jobs(Vec<Job>);
34
35impl Jobs {
36 pub fn new() -> Self {
37 Self::default()
38 }
39 fn single(job: Job) -> Self {
40 Self(vec![job])
41 }
42
43 pub fn add_job(mut self, job: Job) -> Self {
44 self.0.push(job);
45 self
46 }
47
48 pub fn add_jobs(mut self, jobs: Jobs) -> Self {
49 for job in jobs.0 {
50 self.0.push(job);
51 }
52 self
53 }
54
55 fn merge(&mut self, jobs: Jobs) {
56 for job in jobs.0 {
57 self.0.push(job);
58 }
59 }
60}
61
62impl Deref for Jobs {
63 type Target = Vec<Job>;
64
65 fn deref(&self) -> &Self::Target {
66 &self.0
67 }
68}
69
70pub type JobId = Uuid;
71pub type JobScheduler = tokio_cron_scheduler::JobScheduler;
72
73pub trait JobConfigurator {
74 fn add_job(&mut self, job: Job) -> &mut Self;
75 fn add_jobs(&mut self, job: Jobs) -> &mut Self;
76}
77
78impl JobConfigurator for AppBuilder {
79 fn add_job(&mut self, job: Job) -> &mut Self {
80 if let Some(jobs) = self.get_component_ref::<Jobs>() {
81 unsafe {
82 let raw_ptr = ComponentRef::into_raw(jobs);
83 let jobs = &mut *(raw_ptr as *mut Vec<Job>);
84 jobs.push(job);
85 }
86 self
87 } else {
88 self.add_component(Jobs::single(job))
89 }
90 }
91
92 fn add_jobs(&mut self, new_jobs: Jobs) -> &mut Self {
93 if let Some(jobs) = self.get_component_ref::<Jobs>() {
94 unsafe {
95 let raw_ptr = ComponentRef::into_raw(jobs);
96 let jobs = &mut *(raw_ptr as *mut Jobs);
97 jobs.merge(new_jobs);
98 }
99 self
100 } else {
101 self.add_component(new_jobs)
102 }
103 }
104}
105
106pub struct JobPlugin;
107
108#[async_trait]
109impl Plugin for JobPlugin {
110 async fn build(&self, app: &mut AppBuilder) {
111 let sched = Self::new_scheduler().await.expect("build scheduler failed");
112 app.add_component(sched)
113 .add_scheduler(|app: Arc<App>| Box::new(Self::schedule(app)));
114 }
115}
116
117impl JobPlugin {
118 async fn new_scheduler() -> std::result::Result<JobScheduler, JobSchedulerError> {
119 #[cfg(feature = "postgres_storage")]
120 {
121 let metadata_storage = Box::new(tokio_cron_scheduler::PostgresMetadataStore::default());
122 let notification_storage =
123 Box::new(tokio_cron_scheduler::PostgresNotificationStore::default());
124 let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
125 let notification_code =
126 Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
127 JobScheduler::new_with_storage_and_code(
128 metadata_storage,
129 notification_storage,
130 job_code,
131 notification_code,
132 200,
133 )
134 .await
135 }
136 #[cfg(all(not(feature = "postgres_storage"), feature = "nats_storage"))]
137 {
138 let metadata_storage = Box::new(tokio_cron_scheduler::NatsMetadataStore::default());
139 let notification_storage =
140 Box::new(tokio_cron_scheduler::NatsNotificationStore::default());
141 let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
142 let notification_code =
143 Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
144 JobScheduler::new_with_storage_and_code(
145 metadata_storage,
146 notification_storage,
147 job_code,
148 notification_code,
149 200,
150 )
151 .await
152 }
153 #[cfg(all(not(feature = "postgres_storage"), not(feature = "nats_storage")))]
154 {
155 JobScheduler::new().await
156 }
157 }
158
159 async fn schedule(app: Arc<App>) -> Result<String> {
160 let mut sched = app.get_expect_component::<JobScheduler>();
161 let jobs = app.get_component_ref::<Jobs>();
162
163 let jobs = match jobs {
164 None => {
165 let msg = "No tasks are registered, so the task scheduler does not start.";
166 tracing::info!(msg);
167 return Ok(msg.to_string());
168 }
169 Some(jobs) => jobs,
170 };
171
172 for job in jobs.deref().iter() {
173 sched
174 .add(job.to_owned().build(app.clone()))
175 .await
176 .context("add job failed")?;
177 }
178
179 sched.shutdown_on_ctrl_c();
180
181 sched.set_shutdown_handler(Box::new(|| {
183 Box::pin(async move {
184 tracing::info!("Shut down done");
185 })
186 }));
187
188 sched.start().await.context("job scheduler start failed")?;
190
191 Ok("job schedule started".to_string())
192 }
193}