later/
bg_job_server_publisher.rs

1use crate::core::JobParameter;
2use crate::models::{AmqpCommand, Job, RecurringJob};
3use crate::models::{DelayedStage, EnqueuedStage, JobConfig, Stage, WaitingStage};
4use crate::mq::MqClient;
5use crate::persist::Persist;
6use crate::stats::{Event, EventsHandler, NoOpStats, Stats};
7use crate::storage::Storage;
8use crate::{encoder, RecurringJobId};
9use crate::{metrics, BackgroundJobServerPublisher, JobId, UtcDateTime};
10use anyhow::Context;
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration;
14
15impl BackgroundJobServerPublisher {
16    pub async fn new(
17        id: String,
18        mq_client: Arc<Box<dyn MqClient>>,
19        storage: Box<dyn Storage>,
20    ) -> anyhow::Result<Self> {
21        let routing_key = format!("later-{}", id);
22        let publisher = mq_client.new_publisher(&routing_key).await?;
23        let persist = Arc::new(Persist::new(storage, routing_key.clone()));
24        let stats: Box<dyn EventsHandler> = {
25            if cfg!(feature = "dashboard") {
26                tracing::info!("Enabling Dashboard Stats Collector");
27                Box::new(
28                    Stats::new(&routing_key, persist.clone(), &mq_client.clone())
29                        .await
30                        .expect("configure stats"),
31                )
32            } else {
33                Box::new(NoOpStats {})
34            }
35        };
36
37        Ok(Self {
38            storage: persist.clone(),
39
40            stats,
41            publisher,
42            routing_key,
43        })
44    }
45
46    /// Blocks until there is at least worker available.
47    /// This is used during startup to ensure readiness.
48    pub async fn ensure_worker_ready(&self) -> anyhow::Result<()> {
49        Ok(self.publisher.ensure_consumer().await?)
50    }
51
52    pub fn get_metrics(&self) -> anyhow::Result<String> {
53        metrics::COUNTER.output()
54    }
55
56    pub async fn enqueue_continue(
57        &self,
58        parent_job_id: JobId,
59        message: impl JobParameter,
60    ) -> anyhow::Result<JobId> {
61        self.enqueue_internal(message, Some(parent_job_id), None, None)
62            .await
63    }
64
65    pub async fn enqueue_delayed(
66        &self,
67        message: impl JobParameter,
68        delay: std::time::Duration,
69    ) -> anyhow::Result<JobId> {
70        let enqueue_time = chrono::Utc::now()
71            .checked_add_signed(chrono::Duration::from_std(delay)?)
72            .ok_or(anyhow::anyhow!("Error calculating enqueue time"))?;
73
74        self.enqueue_delayed_at(message, enqueue_time).await
75    }
76
77    pub async fn enqueue_delayed_at(
78        &self,
79        message: impl JobParameter,
80        time: chrono::DateTime<chrono::Utc>,
81    ) -> anyhow::Result<JobId> {
82        if time <= chrono::Utc::now() {
83            return Err(anyhow::anyhow!("Time must be in the future"));
84        }
85        self.enqueue_internal(message, None, Some(time), None).await
86    }
87
88    pub async fn enqueue(&self, message: impl JobParameter) -> anyhow::Result<JobId> {
89        self.enqueue_internal(message, None, None, None).await
90    }
91
92    #[tracing::instrument(skip(self, message), fields(ptype = message.get_ptype(), job_id), name = "init_create_job")]
93    async fn enqueue_internal(
94        &self,
95        message: impl JobParameter,
96        parent_job_id: Option<JobId>,
97        delay_until: Option<UtcDateTime>,
98        recurring_job_id: Option<RecurringJobId>,
99    ) -> anyhow::Result<JobId> {
100        let job = create_job(message, parent_job_id, delay_until, recurring_job_id)?;
101
102        tracing::Span::current().record("job_id", job.id.to_string());
103
104        Ok(self.enqueue_internal_job(job).await?)
105    }
106
107    pub(crate) async fn enqueue_internal_job(&self, job: Job) -> Result<JobId, anyhow::Error> {
108        let id = job.id.clone();
109
110        self.save(&job).await?;
111        Event::NewJob((&job).into()).publish(&self).await;
112        self.handle_job_enqueue_initial(job).await?;
113        Ok(id)
114    }
115
116    pub async fn enqueue_recurring(
117        &self,
118        identifier: String,
119        message: impl JobParameter,
120        cron: String,
121    ) -> anyhow::Result<JobId> {
122        // validate
123        let _ = cron::Schedule::from_str(&cron).context("error parsing cron expression")?;
124
125        let recurring_job = RecurringJob {
126            id: RecurringJobId(identifier),
127            payload_type: message.get_ptype(),
128            payload: message
129                .to_bytes()
130                .context("Unable to serialize the message to bytes")?,
131            cron_schedule: cron,
132            date_added: chrono::Utc::now(),
133            config: JobConfig::default(),
134        };
135
136        self.storage.save_recurring_job(&recurring_job).await?;
137
138        let first_job = recurring_job.try_into()?;
139        let id = self.enqueue_internal_job(first_job).await?;
140
141        Ok(id)
142    }
143
144    pub(crate) async fn save(&self, job: &Job) -> anyhow::Result<()> {
145        if job.stage.is_polling_required() {
146            self.storage.save_job_id(&job.id, &job.stage).await?;
147        }
148        if let Stage::Waiting(w) = &job.stage {
149            self.storage
150                .save_continuation(&job.id, w.parent_id.clone())
151                .await?;
152        }
153        self.storage.save_job(job).await
154    }
155
156    pub(crate) async fn expire(&self, job: &Job, _duration: Duration) -> anyhow::Result<()> {
157        // ToDo: expire properly
158        self.storage.expire(job.id.clone()).await
159    }
160
161    #[async_recursion::async_recursion]
162    pub(crate) async fn handle_job_enqueue_initial(&self, job: Job) -> anyhow::Result<()> {
163        tracing::debug!(
164            "handle_job_enqueue_initial: Id: {}, Stage: {:?}",
165            &job.id,
166            &job.stage
167        );
168
169        match &job.stage {
170            Stage::Delayed(delayed) => {
171                // delayed job
172                // should be polled
173
174                if delayed.is_time() {
175                    let job = job.transition(); // Delayed -> Enqueued
176                    self.save(&job).await?;
177
178                    self.handle_job_enqueue_initial(job).await?;
179                }
180            }
181            Stage::Waiting(waiting) => {
182                // continuation
183                // - enqueue if parent is already complete
184                // - schedule self message to check an enqueue later (to prevent race)
185
186                if let Some(parent_job) = self.storage.get_job(waiting.parent_id.clone()).await {
187                    if !parent_job.stage.is_success() {
188                        return Ok(());
189                    }
190
191                    tracing::info!(
192                        "Parent job {} is already completed, enqueuing this job immediately",
193                        parent_job.id
194                    );
195                }
196
197                // parent job is success or not found (means successful long time ago)
198                let job = job.transition();
199                self.save(&job).await?;
200
201                self.handle_job_enqueue_initial(job).await?;
202            }
203            Stage::Enqueued(_) => {
204                tracing::debug!("Enqueue job {}", job.id);
205
206                self.publish_amqp_command(AmqpCommand::ExecuteJob(job.into()))
207                    .await?
208            }
209            Stage::Running(_) | Stage::Requeued(_) | Stage::Success(_) | Stage::Failed(_) => {
210                tracing::warn!("Invalid job here {}, Stage {:?}", job.id, &job.stage);
211                //unreachable!("stage is handled in consumer")
212            }
213        }
214
215        Ok(())
216    }
217
218    pub(crate) async fn publish_amqp_command(&self, cmd: AmqpCommand) -> anyhow::Result<()> {
219        let message_bytes = encoder::encode(&cmd)?;
220
221        self.publisher.publish(&message_bytes).await?;
222
223        Ok(())
224    }
225}
226
227fn create_job(
228    message: impl JobParameter,
229    parent_job_id: Option<JobId>,
230    delay_until: Option<chrono::DateTime<chrono::Utc>>,
231    recurring_job_id: Option<RecurringJobId>,
232) -> anyhow::Result<Job> {
233    let id = crate::generate_id();
234    let job = Job {
235        id: JobId(id.clone()),
236        payload_type: message.get_ptype(),
237        payload: message
238            .to_bytes()
239            .context("Unable to serialize the message to bytes")?,
240        stage: {
241            if let Some(parent_job_id) = parent_job_id {
242                Stage::Waiting(WaitingStage {
243                    date: chrono::Utc::now(),
244                    parent_id: parent_job_id,
245                })
246            } else if let Some(delay_until) = delay_until {
247                Stage::Delayed(DelayedStage {
248                    date: chrono::Utc::now(),
249                    not_before: delay_until,
250                })
251            } else {
252                Stage::Enqueued(EnqueuedStage {
253                    date: chrono::Utc::now(),
254                })
255            }
256        },
257        previous_stages: Vec::default(),
258        config: JobConfig::default(),
259        recurring_job_id: recurring_job_id,
260    };
261    Ok(job)
262}