1#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4
5pub mod extractor;
6pub mod handler;
7pub mod job;
8
9use spring::signal;
10pub use spring_macros::cron;
13pub use spring_macros::fix_delay;
14pub use spring_macros::fix_rate;
15pub use spring_macros::one_shot;
16
17use anyhow::Context;
18use job::Job;
19use spring::async_trait;
20use spring::error::Result;
21use spring::plugin::component::ComponentRef;
22use spring::plugin::ComponentRegistry;
23use spring::plugin::MutableComponentRegistry;
24use spring::{
25 app::{App, AppBuilder},
26 plugin::Plugin,
27};
28use std::ops::Deref;
29use std::sync::Arc;
30use tokio_cron_scheduler::JobSchedulerError;
31use uuid::Uuid;
32
33#[derive(Clone, Default)]
34pub struct Jobs(Vec<Job>);
35
36impl Jobs {
37 pub fn new() -> Self {
38 Self::default()
39 }
40 fn single(job: Job) -> Self {
41 Self(vec![job])
42 }
43
44 pub fn add_job(mut self, job: Job) -> Self {
45 self.0.push(job);
46 self
47 }
48
49 pub fn add_jobs(mut self, jobs: Jobs) -> Self {
50 for job in jobs.0 {
51 self.0.push(job);
52 }
53 self
54 }
55
56 fn merge(&mut self, jobs: Jobs) {
57 for job in jobs.0 {
58 self.0.push(job);
59 }
60 }
61}
62
63impl Deref for Jobs {
64 type Target = Vec<Job>;
65
66 fn deref(&self) -> &Self::Target {
67 &self.0
68 }
69}
70
71pub type JobId = Uuid;
72pub type JobScheduler = tokio_cron_scheduler::JobScheduler;
73
74pub trait JobConfigurator {
75 fn add_job(&mut self, job: Job) -> &mut Self;
76 fn add_jobs(&mut self, job: Jobs) -> &mut Self;
77}
78
79impl JobConfigurator for AppBuilder {
80 fn add_job(&mut self, job: Job) -> &mut Self {
81 if let Some(jobs) = self.get_component_ref::<Jobs>() {
82 unsafe {
83 let raw_ptr = ComponentRef::into_raw(jobs);
84 let jobs = &mut *(raw_ptr as *mut Vec<Job>);
85 jobs.push(job);
86 }
87 self
88 } else {
89 self.add_component(Jobs::single(job))
90 }
91 }
92
93 fn add_jobs(&mut self, new_jobs: Jobs) -> &mut Self {
94 if let Some(jobs) = self.get_component_ref::<Jobs>() {
95 unsafe {
96 let raw_ptr = ComponentRef::into_raw(jobs);
97 let jobs = &mut *(raw_ptr as *mut Jobs);
98 jobs.merge(new_jobs);
99 }
100 self
101 } else {
102 self.add_component(new_jobs)
103 }
104 }
105}
106
107pub struct JobPlugin;
108
109#[async_trait]
110impl Plugin for JobPlugin {
111 async fn build(&self, app: &mut AppBuilder) {
112 let sched = Self::new_scheduler().await.expect("build scheduler failed");
113 app.add_component(sched)
114 .add_scheduler(|app: Arc<App>| Box::new(Self::schedule(app)));
115 }
116}
117
118impl JobPlugin {
119 async fn new_scheduler() -> std::result::Result<JobScheduler, JobSchedulerError> {
120 #[cfg(feature = "postgres_storage")]
121 {
122 let metadata_storage = Box::new(tokio_cron_scheduler::PostgresMetadataStore::default());
123 let notification_storage =
124 Box::new(tokio_cron_scheduler::PostgresNotificationStore::default());
125 let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
126 let notification_code =
127 Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
128 JobScheduler::new_with_storage_and_code(
129 metadata_storage,
130 notification_storage,
131 job_code,
132 notification_code,
133 200,
134 )
135 .await
136 }
137 #[cfg(all(not(feature = "postgres_storage"), feature = "nats_storage"))]
138 {
139 let metadata_storage = Box::new(tokio_cron_scheduler::NatsMetadataStore::default());
140 let notification_storage =
141 Box::new(tokio_cron_scheduler::NatsNotificationStore::default());
142 let job_code = Box::new(tokio_cron_scheduler::SimpleJobCode::default());
143 let notification_code =
144 Box::new(tokio_cron_scheduler::SimpleNotificationCode::default());
145 JobScheduler::new_with_storage_and_code(
146 metadata_storage,
147 notification_storage,
148 job_code,
149 notification_code,
150 200,
151 )
152 .await
153 }
154 #[cfg(all(not(feature = "postgres_storage"), not(feature = "nats_storage")))]
155 {
156 JobScheduler::new().await
157 }
158 }
159
160 async fn schedule(app: Arc<App>) -> Result<String> {
161 let mut sched = app.get_expect_component::<JobScheduler>();
162 let jobs = app.get_component_ref::<Jobs>();
163
164 let jobs = match jobs {
165 None => {
166 let msg = "No tasks are registered, so the task scheduler does not start.";
167 tracing::info!(msg);
168 return Ok(msg.to_string());
169 }
170 Some(jobs) => jobs,
171 };
172
173 for job in jobs.deref().iter() {
174 sched
175 .add(job.to_owned().build(app.clone()))
176 .await
177 .context("add job failed")?;
178 }
179
180 let mut l = sched.clone();
181 tokio::spawn(async move {
183 let _ = signal::shutdown_signal("job").await;
184
185 if let Err(err) = l.shutdown().await {
186 tracing::error!("{:?}", err);
187 }
188 });
189
190 sched.set_shutdown_handler(Box::new(|| {
192 Box::pin(async move {
193 tracing::info!("Shut down done");
194 })
195 }));
196
197 sched.start().await.context("job scheduler start failed")?;
199
200 Ok("job schedule started".to_string())
201 }
202}