1use crate::core::JobParameter;
2use crate::models::{AmqpCommand, Job, RecurringJob};
3use crate::models::{DelayedStage, EnqueuedStage, JobConfig, Stage, WaitingStage};
4use crate::mq::MqClient;
5use crate::persist::Persist;
6use crate::stats::{Event, EventsHandler, NoOpStats, Stats};
7use crate::storage::Storage;
8use crate::{encoder, RecurringJobId};
9use crate::{metrics, BackgroundJobServerPublisher, JobId, UtcDateTime};
10use anyhow::Context;
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration;
14
15impl BackgroundJobServerPublisher {
16 pub async fn new(
17 id: String,
18 mq_client: Arc<Box<dyn MqClient>>,
19 storage: Box<dyn Storage>,
20 ) -> anyhow::Result<Self> {
21 let routing_key = format!("later-{}", id);
22 let publisher = mq_client.new_publisher(&routing_key).await?;
23 let persist = Arc::new(Persist::new(storage, routing_key.clone()));
24 let stats: Box<dyn EventsHandler> = {
25 if cfg!(feature = "dashboard") {
26 tracing::info!("Enabling Dashboard Stats Collector");
27 Box::new(
28 Stats::new(&routing_key, persist.clone(), &mq_client.clone())
29 .await
30 .expect("configure stats"),
31 )
32 } else {
33 Box::new(NoOpStats {})
34 }
35 };
36
37 Ok(Self {
38 storage: persist.clone(),
39
40 stats,
41 publisher,
42 routing_key,
43 })
44 }
45
46 pub async fn ensure_worker_ready(&self) -> anyhow::Result<()> {
49 Ok(self.publisher.ensure_consumer().await?)
50 }
51
52 pub fn get_metrics(&self) -> anyhow::Result<String> {
53 metrics::COUNTER.output()
54 }
55
56 pub async fn enqueue_continue(
57 &self,
58 parent_job_id: JobId,
59 message: impl JobParameter,
60 ) -> anyhow::Result<JobId> {
61 self.enqueue_internal(message, Some(parent_job_id), None, None)
62 .await
63 }
64
65 pub async fn enqueue_delayed(
66 &self,
67 message: impl JobParameter,
68 delay: std::time::Duration,
69 ) -> anyhow::Result<JobId> {
70 let enqueue_time = chrono::Utc::now()
71 .checked_add_signed(chrono::Duration::from_std(delay)?)
72 .ok_or(anyhow::anyhow!("Error calculating enqueue time"))?;
73
74 self.enqueue_delayed_at(message, enqueue_time).await
75 }
76
77 pub async fn enqueue_delayed_at(
78 &self,
79 message: impl JobParameter,
80 time: chrono::DateTime<chrono::Utc>,
81 ) -> anyhow::Result<JobId> {
82 if time <= chrono::Utc::now() {
83 return Err(anyhow::anyhow!("Time must be in the future"));
84 }
85 self.enqueue_internal(message, None, Some(time), None).await
86 }
87
88 pub async fn enqueue(&self, message: impl JobParameter) -> anyhow::Result<JobId> {
89 self.enqueue_internal(message, None, None, None).await
90 }
91
92 #[tracing::instrument(skip(self, message), fields(ptype = message.get_ptype(), job_id), name = "init_create_job")]
93 async fn enqueue_internal(
94 &self,
95 message: impl JobParameter,
96 parent_job_id: Option<JobId>,
97 delay_until: Option<UtcDateTime>,
98 recurring_job_id: Option<RecurringJobId>,
99 ) -> anyhow::Result<JobId> {
100 let job = create_job(message, parent_job_id, delay_until, recurring_job_id)?;
101
102 tracing::Span::current().record("job_id", job.id.to_string());
103
104 Ok(self.enqueue_internal_job(job).await?)
105 }
106
107 pub(crate) async fn enqueue_internal_job(&self, job: Job) -> Result<JobId, anyhow::Error> {
108 let id = job.id.clone();
109
110 self.save(&job).await?;
111 Event::NewJob((&job).into()).publish(&self).await;
112 self.handle_job_enqueue_initial(job).await?;
113 Ok(id)
114 }
115
116 pub async fn enqueue_recurring(
117 &self,
118 identifier: String,
119 message: impl JobParameter,
120 cron: String,
121 ) -> anyhow::Result<JobId> {
122 let _ = cron::Schedule::from_str(&cron).context("error parsing cron expression")?;
124
125 let recurring_job = RecurringJob {
126 id: RecurringJobId(identifier),
127 payload_type: message.get_ptype(),
128 payload: message
129 .to_bytes()
130 .context("Unable to serialize the message to bytes")?,
131 cron_schedule: cron,
132 date_added: chrono::Utc::now(),
133 config: JobConfig::default(),
134 };
135
136 self.storage.save_recurring_job(&recurring_job).await?;
137
138 let first_job = recurring_job.try_into()?;
139 let id = self.enqueue_internal_job(first_job).await?;
140
141 Ok(id)
142 }
143
144 pub(crate) async fn save(&self, job: &Job) -> anyhow::Result<()> {
145 if job.stage.is_polling_required() {
146 self.storage.save_job_id(&job.id, &job.stage).await?;
147 }
148 if let Stage::Waiting(w) = &job.stage {
149 self.storage
150 .save_continuation(&job.id, w.parent_id.clone())
151 .await?;
152 }
153 self.storage.save_job(job).await
154 }
155
156 pub(crate) async fn expire(&self, job: &Job, _duration: Duration) -> anyhow::Result<()> {
157 self.storage.expire(job.id.clone()).await
159 }
160
161 #[async_recursion::async_recursion]
162 pub(crate) async fn handle_job_enqueue_initial(&self, job: Job) -> anyhow::Result<()> {
163 tracing::debug!(
164 "handle_job_enqueue_initial: Id: {}, Stage: {:?}",
165 &job.id,
166 &job.stage
167 );
168
169 match &job.stage {
170 Stage::Delayed(delayed) => {
171 if delayed.is_time() {
175 let job = job.transition(); self.save(&job).await?;
177
178 self.handle_job_enqueue_initial(job).await?;
179 }
180 }
181 Stage::Waiting(waiting) => {
182 if let Some(parent_job) = self.storage.get_job(waiting.parent_id.clone()).await {
187 if !parent_job.stage.is_success() {
188 return Ok(());
189 }
190
191 tracing::info!(
192 "Parent job {} is already completed, enqueuing this job immediately",
193 parent_job.id
194 );
195 }
196
197 let job = job.transition();
199 self.save(&job).await?;
200
201 self.handle_job_enqueue_initial(job).await?;
202 }
203 Stage::Enqueued(_) => {
204 tracing::debug!("Enqueue job {}", job.id);
205
206 self.publish_amqp_command(AmqpCommand::ExecuteJob(job.into()))
207 .await?
208 }
209 Stage::Running(_) | Stage::Requeued(_) | Stage::Success(_) | Stage::Failed(_) => {
210 tracing::warn!("Invalid job here {}, Stage {:?}", job.id, &job.stage);
211 }
213 }
214
215 Ok(())
216 }
217
218 pub(crate) async fn publish_amqp_command(&self, cmd: AmqpCommand) -> anyhow::Result<()> {
219 let message_bytes = encoder::encode(&cmd)?;
220
221 self.publisher.publish(&message_bytes).await?;
222
223 Ok(())
224 }
225}
226
227fn create_job(
228 message: impl JobParameter,
229 parent_job_id: Option<JobId>,
230 delay_until: Option<chrono::DateTime<chrono::Utc>>,
231 recurring_job_id: Option<RecurringJobId>,
232) -> anyhow::Result<Job> {
233 let id = crate::generate_id();
234 let job = Job {
235 id: JobId(id.clone()),
236 payload_type: message.get_ptype(),
237 payload: message
238 .to_bytes()
239 .context("Unable to serialize the message to bytes")?,
240 stage: {
241 if let Some(parent_job_id) = parent_job_id {
242 Stage::Waiting(WaitingStage {
243 date: chrono::Utc::now(),
244 parent_id: parent_job_id,
245 })
246 } else if let Some(delay_until) = delay_until {
247 Stage::Delayed(DelayedStage {
248 date: chrono::Utc::now(),
249 not_before: delay_until,
250 })
251 } else {
252 Stage::Enqueued(EnqueuedStage {
253 date: chrono::Utc::now(),
254 })
255 }
256 },
257 previous_stages: Vec::default(),
258 config: JobConfig::default(),
259 recurring_job_id: recurring_job_id,
260 };
261 Ok(job)
262}