Skip to main content

roadster/worker/backend/pg/processor/
mod.rs

1//! Background task queue processor backed by Postgres using [pgmq](https://docs.rs/pgmq/latest/pgmq/).
2
3use crate::app::context::AppContext;
4use crate::config::AppConfig;
5use crate::config::service::worker::{BalanceStrategy, StaleCleanUpBehavior};
6use crate::error::RoadsterResult;
7use crate::util::tracing::optional_trace_field;
8use crate::worker::PeriodicArgsJson;
9use crate::worker::WorkerWrapper;
10use crate::worker::backend::pg::periodic_job::PeriodicJob;
11use crate::worker::backend::pg::{failure_action, retry_delay, success_action};
12use crate::worker::backend::shared_queues;
13use crate::worker::config::CompletedAction;
14use crate::worker::job::{Job, JobMetadata};
15use axum_core::extract::FromRef;
16use builder::PgProcessorBuilder;
17use chrono::{DateTime, TimeDelta, Utc};
18use cron::Schedule;
19use itertools::Itertools;
20use pgmq::{PGMQueue, PgmqError};
21use sqlx::Error;
22use sqlx::error::ErrorKind;
23use std::cmp::{Ordering, max};
24use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27use thiserror::Error;
28use tokio::task::JoinSet;
29use tokio::time::sleep;
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, instrument};
32
33pub mod builder;
34
35pub(crate) const PERIODIC_QUEUE_NAME: &str = "periodic";
36
37#[derive(Debug, Error)]
38#[non_exhaustive]
39pub enum PgProcessorError {
40    /// The provided [`crate::worker::Worker`] was already registered. Contains the
41    /// [`crate::worker::Worker::name`] of the provided worker.
42    #[error("The provided `Worker` was already registered: `{0}`")]
43    AlreadyRegistered(String),
44
45    /// A [`crate::worker::Worker`] was previously registered that has the same name but is a
46    /// different type.
47    #[error("The provided `Worker` name was already registered for a different type: `{0}`")]
48    AlreadyRegisteredWithDifferentType(String),
49
50    /// The provided [`crate::worker::Worker`] was already registered. Contains the
51    /// [`crate::worker::Worker::name`] of the provided worker.
52    #[error(
53        "The provided periodic worker job was already registered. Worker: `{0}`, schedule: `{1}`, args: `{2}`"
54    )]
55    AlreadyRegisteredPeriodic(String, String, serde_json::Value),
56
57    #[error("No queue configured for worker `{0}`.")]
58    NoQueue(String),
59
60    #[error("{0}")]
61    InvalidBalanceStrategy(String),
62
63    #[error(transparent)]
64    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
65}
66
67#[derive(Clone)]
68#[non_exhaustive]
69pub struct PgProcessor<S>
70where
71    S: Clone + Send + Sync + 'static,
72    AppContext: FromRef<S>,
73{
74    inner: Arc<PgProcessorInner<S>>,
75}
76
77#[non_exhaustive]
78pub(crate) struct PgProcessorInner<S>
79where
80    S: Clone + Send + Sync + 'static,
81    AppContext: FromRef<S>,
82{
83    state: S,
84    queues: BTreeSet<String>,
85    workers: BTreeMap<String, WorkerWrapper<S>>,
86    periodic_workers: HashSet<PeriodicArgsJson>,
87}
88
89impl<S> PgProcessor<S>
90where
91    S: Clone + Send + Sync + 'static,
92    AppContext: FromRef<S>,
93{
94    pub(crate) fn new(inner: PgProcessorInner<S>) -> Self {
95        Self {
96            inner: Arc::new(inner),
97        }
98    }
99
100    pub fn builder(state: &S) -> PgProcessorBuilder<S> {
101        PgProcessorBuilder::new(state)
102    }
103
104    pub async fn before_run(&self, state: &S) -> RoadsterResult<()> {
105        let context = AppContext::from_ref(state);
106        if context
107            .config()
108            .service
109            .worker
110            .pg
111            .custom
112            .common
113            .balance_strategy
114            == BalanceStrategy::None
115            && self.shared_queues(context.config()).len() > 1
116        {
117            return Err(PgProcessorError::InvalidBalanceStrategy(format!(
118                "{:?} is not supported when more than one shared queue is enabled.",
119                BalanceStrategy::None
120            ))
121            .into());
122        }
123
124        self.initialize_queues().await?;
125        self.initialize_periodic(state).await?;
126        Ok(())
127    }
128
129    /// Ensures all of the workers' queues' tables exist in the Postgres database.
130    async fn initialize_queues(&self) -> RoadsterResult<()> {
131        let context = AppContext::from_ref(&self.inner.state);
132        for queue in self.inner.queues.iter() {
133            context.pgmq().create(queue).await?;
134        }
135        Ok(())
136    }
137
138    /// Initialize the periodic queue tables and enqueue the periodic jobs in the queue.
139    async fn initialize_periodic(&self, state: &S) -> RoadsterResult<()> {
140        let context = AppContext::from_ref(state);
141
142        // Create the queue's tables
143        context.pgmq().create(PERIODIC_QUEUE_NAME).await?;
144        // Create a unique index on the periodic job hash. This ensures we don't enqueue duplicate
145        // periodic jobs.
146        sqlx::query!(
147            r#"CREATE UNIQUE INDEX IF NOT EXISTS roadster_periodic_hash_idx ON pgmq.q_periodic USING btree ((message->'periodic'->'hash'))"#
148        ).execute(&context.pgmq().connection).await?;
149
150        let periodic_config = &context.config().service.worker.pg.custom.custom.periodic;
151
152        let periodic_jobs = self
153            .inner
154            .periodic_workers
155            .iter()
156            .map(PeriodicJob::from)
157            .collect_vec();
158
159        match periodic_config.stale_cleanup {
160            StaleCleanUpBehavior::Manual => {}
161            StaleCleanUpBehavior::AutoCleanAll => {
162                let rows_affected = context.pgmq().purge(PERIODIC_QUEUE_NAME).await?;
163                info!(
164                    count = rows_affected,
165                    "Deleted all previously registered periodic jobs"
166                );
167            }
168            StaleCleanUpBehavior::AutoCleanStale => {
169                let current_job_hashes = periodic_jobs
170                    .iter()
171                    .map(|job| {
172                        serde_json::Value::Number(serde_json::Number::from(job.periodic.hash))
173                    })
174                    .collect_vec();
175                let result = sqlx::query!(
176                    r#"DELETE FROM pgmq.q_periodic where message->'periodic'->'hash' != ALL($1)"#,
177                    current_job_hashes.as_slice()
178                )
179                .execute(&context.pgmq().connection)
180                .await?;
181                info!(
182                    count = result.rows_affected(),
183                    "Deleted stale periodic jobs"
184                )
185            }
186        }
187
188        for job in periodic_jobs.iter() {
189            let delay = periodic_next_run_delay(&job.periodic.schedule, None);
190            let result = context
191                .pgmq()
192                .send_delay(PERIODIC_QUEUE_NAME, job, delay.as_secs())
193                .await;
194
195            match result {
196                Ok(_) => Ok(()),
197                Err(PgmqError::DatabaseError(Error::Database(err))) => match err.kind() {
198                    // We use a unique index constraint to ensure we don't enqueue duplicate periodic
199                    // jobs, so we ignore `UniqueViolation` errors, but allow all other errors
200                    // to be returned.
201                    ErrorKind::UniqueViolation => Ok(()),
202                    _ => Err(PgmqError::DatabaseError(Error::Database(err))),
203                },
204                Err(err) => Err(err),
205            }?;
206        }
207
208        Ok(())
209    }
210
211    pub(crate) fn queues(&self) -> &BTreeSet<String> {
212        &self.inner.queues
213    }
214
215    pub async fn run(self, _state: &S, cancellation_token: CancellationToken) {
216        let mut join_set = JoinSet::new();
217
218        let context = AppContext::from_ref(&self.inner.state);
219        let worker_config = &context.config().service.worker.pg.custom;
220        let dedicated_queues = &worker_config.common.queue_config;
221        let shared_queues = self.shared_queues(context.config());
222
223        if !shared_queues.is_empty() {
224            let total_worker_tasks = worker_config.common.num_workers;
225            for worker_num in 0..total_worker_tasks {
226                join_set.spawn(self.clone().process_queues(
227                    cancellation_token.clone(),
228                    worker_num + 1,
229                    total_worker_tasks,
230                    shared_queues.clone(),
231                ));
232            }
233        }
234
235        for (queue, config) in dedicated_queues {
236            let total_worker_tasks = config.num_workers.unwrap_or_default();
237            for worker_num in 0..total_worker_tasks {
238                join_set.spawn(self.clone().process_queues(
239                    cancellation_token.clone(),
240                    worker_num + 1,
241                    total_worker_tasks,
242                    vec![queue.to_owned()],
243                ));
244            }
245        }
246
247        if worker_config.custom.periodic.enable && !self.inner.periodic_workers.is_empty() {
248            join_set.spawn(self.clone().process_periodic(cancellation_token.clone()));
249        }
250
251        while let Some(result) = join_set.join_next().await {
252            // Once any of the tasks finish, cancel the cancellation token to ensure
253            // the processor and the app shut down gracefully.
254            cancellation_token.cancel();
255            if let Err(join_err) = result {
256                error!(
257                    "An error occurred when trying to join on one of the processor's workers. Error: {join_err}"
258                );
259            }
260        }
261    }
262
263    async fn process_queues(
264        self,
265        cancellation_token: CancellationToken,
266        worker_task_num: u32,
267        total_worker_tasks: u32,
268        queues: Vec<String>,
269    ) {
270        let num_queues = queues.len();
271        let queue_name = if num_queues == 1 {
272            queues.first().cloned()
273        } else {
274            None
275        };
276
277        let mut queues: BinaryHeap<QueueItem> = queues
278            .into_iter()
279            .map(|name| QueueItem {
280                name,
281                next_fetch: Utc::now(),
282            })
283            .collect();
284
285        let context = AppContext::from_ref(&self.inner.state);
286        let default_worker_config = &context.config().service.worker.worker_config;
287        let default_max_duration = default_worker_config.max_duration;
288        let default_view_timeout = default_max_duration
289            .as_ref()
290            .and_then(|duration| duration.as_secs().try_into().ok());
291
292        let empty_delay = context
293            .config()
294            .service
295            .worker
296            .pg
297            .custom
298            .custom
299            .queue_fetch_config
300            .as_ref()
301            .and_then(|config| config.empty_delay)
302            .unwrap_or_default();
303
304        let error_delay = context
305            .config()
306            .service
307            .worker
308            .pg
309            .custom
310            .custom
311            .queue_fetch_config
312            .as_ref()
313            .and_then(|config| config.error_delay)
314            .unwrap_or_default();
315
316        let pgmq = context.pgmq();
317        loop {
318            while let Some(mut queue) = queues.peek_mut() {
319                {
320                    let diff = max(TimeDelta::zero(), queue.next_fetch - Utc::now());
321                    let duration = diff.to_std().unwrap_or_else(|_| Duration::from_secs(0));
322                    tokio::select! {
323                        // `biased` ensures that the cancellation token is polled first
324                        biased;
325
326                        _ = cancellation_token.cancelled() => {
327                            info!(
328                                worker_task_num,
329                                total_worker_tasks,
330                                num_queues,
331                                queue = queue_name,
332                                "Exiting processor worker loop"
333                            );
334                            return;
335
336                        },
337                        _ = sleep(duration) => (),
338                    }
339                }
340
341                /*
342                Deserialize to `serde_json::Value` first. We do this because pgmq does not return
343                the message id if an error occurs when deserializing a custom type. So, if there
344                is a deserialization error, we wouldn't be able to update the view timeout of
345                the message and it will stay at the front of the queue indefinitely, blocking
346                all other work. Deserializing to `serde_json::Value` first will avoid all
347                deserialization errors (aside from those due to corrupted date, which should be
348                rare). Then, we can separately handle any deserialization errors ourselves.
349                 */
350                let msg = match pgmq
351                    .read::<serde_json::Value>(&queue.name, default_view_timeout)
352                    .await
353                {
354                    Ok(Some(msg)) => msg,
355                    Ok(None) => {
356                        queue.next_fetch = Utc::now() + empty_delay;
357                        continue;
358                    }
359                    Err(err) => {
360                        error!(
361                            worker.queue.name = queue.name,
362                            "An error occurred while reading from pgmq: {err}"
363                        );
364                        queue.next_fetch = Utc::now() + error_delay;
365                        continue;
366                    }
367                };
368
369                let job: Job = match serde_json::from_value(msg.message) {
370                    Ok(job) => job,
371                    Err(err) => {
372                        error!(
373                            job.msg_id = msg.msg_id,
374                            job.read_count = msg.read_ct,
375                            worker.queue.name = queue.name,
376                            "An error occurred while deserializing message from pgmq: {err}"
377                        );
378                        self.retry(
379                            pgmq,
380                            &queue,
381                            None,
382                            msg.msg_id,
383                            msg.read_ct,
384                            context.config(),
385                            None,
386                        )
387                        .await;
388
389                        queue.next_fetch = Utc::now();
390                        continue;
391                    }
392                };
393
394                let worker = if let Some(worker) = self.inner.workers.get(&job.metadata.worker_name)
395                {
396                    worker
397                } else {
398                    error!(
399                        job.id = %job.metadata.id,
400                        job.msg_id = msg.msg_id,
401                        job.read_count = msg.read_ct,
402                        worker.queue.name = queue.name,
403                        worker.name = job.metadata.worker_name,
404                        "Unable to handle job, worker not registered"
405                    );
406                    self.retry(
407                        pgmq,
408                        &queue,
409                        Some(&job.metadata),
410                        msg.msg_id,
411                        msg.read_ct,
412                        context.config(),
413                        None,
414                    )
415                    .await;
416
417                    queue.next_fetch = Utc::now();
418                    continue;
419                };
420
421                // Update the view timeout to match the max duration of the worker, if it's
422                // different from the default.
423                let max_duration = if let Some((worker_max, default_max)) = worker
424                    .inner
425                    .worker_config
426                    .max_duration
427                    .zip(default_max_duration)
428                {
429                    if worker_max != default_max {
430                        Some(worker_max)
431                    } else {
432                        None
433                    }
434                } else {
435                    worker.inner.worker_config.max_duration
436                };
437                if let Some(delay) = max_duration {
438                    self.update_job_view_timeout(
439                        pgmq,
440                        &queue,
441                        Some(&job.metadata),
442                        msg.msg_id,
443                        msg.read_ct,
444                        delay,
445                    )
446                    .await;
447                }
448
449                let result = worker
450                    .handle(&self.inner.state, &job.metadata, job.args)
451                    .await;
452
453                if let Err(err) = result {
454                    error!(
455                        job.id = %job.metadata.id,
456                        job.msg_id = msg.msg_id,
457                        job.read_count = msg.read_ct,
458                        worker.queue.name = queue.name,
459                        worker.name = job.metadata.worker_name,
460                        "An error occurred while handling a job: {err}"
461                    );
462                    self.retry(
463                        pgmq,
464                        &queue,
465                        Some(&job.metadata),
466                        msg.msg_id,
467                        msg.read_ct,
468                        context.config(),
469                        Some(worker),
470                    )
471                    .await;
472                } else {
473                    let action =
474                        success_action(context.config(), worker.inner.worker_config.pg.as_ref());
475                    self.job_completed(
476                        pgmq,
477                        &queue,
478                        Some(&job.metadata),
479                        msg.msg_id,
480                        msg.read_ct,
481                        action,
482                    )
483                    .await;
484                }
485
486                #[cfg(feature = "bench")]
487                (worker.inner.on_complete_fn)().await;
488
489                queue.next_fetch = Utc::now();
490            }
491        }
492    }
493
494    async fn process_periodic(self, cancellation_token: CancellationToken) {
495        let context = AppContext::from_ref(&self.inner.state);
496        let default_enqueue_config = &context.config().service.worker.enqueue_config;
497        let default_worker_config = &context.config().service.worker.worker_config;
498        let default_max_duration = default_worker_config.max_duration;
499        let default_view_timeout = default_max_duration
500            .as_ref()
501            .and_then(|duration| duration.as_secs().try_into().ok());
502
503        let empty_delay = context
504            .config()
505            .service
506            .worker
507            .pg
508            .custom
509            .custom
510            .queue_fetch_config
511            .as_ref()
512            .and_then(|config| config.empty_delay)
513            .unwrap_or_default();
514
515        let error_delay = context
516            .config()
517            .service
518            .worker
519            .pg
520            .custom
521            .custom
522            .queue_fetch_config
523            .as_ref()
524            .and_then(|config| config.error_delay)
525            .unwrap_or_default();
526
527        let mut next_fetch = Utc::now();
528
529        let pgmq = context.pgmq();
530        loop {
531            {
532                let diff = max(TimeDelta::zero(), next_fetch - Utc::now());
533                let duration = diff.to_std().unwrap_or_else(|_| Duration::from_secs(0));
534                tokio::select! {
535                    // `biased` ensures that the cancellation token is polled first
536                    biased;
537
538                    _ = cancellation_token.cancelled() => {
539                        info!("Exiting processor periodic worker loop");
540                        return;
541                    },
542                    _ = sleep(duration) => (),
543                }
544            }
545
546            /*
547            Deserialize to `serde_json::Value` first. We do this because pgmq does not return
548            the message id if an error occurs when deserializing a custom type. So, if there
549            is a deserialization error, we wouldn't be able to update the view timeout of
550            the message and it will stay at the front of the queue indefinitely, blocking
551            all other work. Deserializing to `serde_json::Value` first will avoid all
552            deserialization errors (aside from those due to corrupted date, which should be
553            rare). Then, we can separately handle any deserialization errors ourselves.
554             */
555            let msg = match pgmq
556                .read::<serde_json::Value>(PERIODIC_QUEUE_NAME, default_view_timeout)
557                .await
558            {
559                Ok(Some(msg)) => msg,
560                Ok(None) => {
561                    next_fetch = Utc::now() + empty_delay;
562                    continue;
563                }
564                Err(err) => {
565                    error!(
566                        worker.queue.name = PERIODIC_QUEUE_NAME,
567                        "An error occurred while reading from pgmq: {err}"
568                    );
569                    next_fetch = Utc::now() + error_delay;
570                    continue;
571                }
572            };
573
574            let job: PeriodicJob = match serde_json::from_value(msg.message) {
575                Ok(job) => job,
576                Err(err) => {
577                    error!(
578                        job.msg_id = msg.msg_id,
579                        job.read_count = msg.read_ct,
580                        worker.queue.name = PERIODIC_QUEUE_NAME,
581                        "An error occurred while deserializing message from pgmq: {err}"
582                    );
583                    // For periodic jobs, we simply delete the failing msg. It will
584                    // be re-enqueued the next time the app starts
585                    if let Err(err) = context.pgmq().delete(PERIODIC_QUEUE_NAME, msg.msg_id).await {
586                        error!(
587                            job.msg_id = msg.msg_id,
588                            job.read_count = msg.read_ct,
589                            worker.queue.name = PERIODIC_QUEUE_NAME,
590                            "An error occurred while deleting periodic job: {err}"
591                        );
592                        next_fetch = Utc::now() + error_delay;
593                    } else {
594                        next_fetch = Utc::now();
595                    }
596                    continue;
597                }
598            };
599
600            let worker = self.inner.workers.get(&job.metadata.worker_name);
601            let queue = worker
602                .and_then(|worker| worker.inner.enqueue_config.queue.as_ref())
603                .or(default_enqueue_config.queue.as_ref());
604
605            let (worker, queue) = if let Some((worker, queue)) = worker.zip(queue) {
606                (worker, queue)
607            } else {
608                error!(
609                    job.id = %job.metadata.id,
610                    job.msg_id = msg.msg_id,
611                    job.read_count = msg.read_ct,
612                    worker.name = job.metadata.worker_name,
613                    worker.queue.name = queue,
614                    "Unable to enqueue job; worker not registered or no queue configured"
615                );
616                // For periodic jobs, we simply delete the failing msg. It will
617                // be re-enqueued the next time the app starts
618                if let Err(err) = context.pgmq().delete(PERIODIC_QUEUE_NAME, msg.msg_id).await {
619                    error!(
620                        job.id = %job.metadata.id,
621                        job.msg_id = msg.msg_id,
622                        job.read_count = msg.read_ct,
623                        worker.queue.name = PERIODIC_QUEUE_NAME,
624                        "An error occurred while deleting periodic job: {err}"
625                    );
626                    next_fetch = Utc::now() + error_delay;
627                } else {
628                    next_fetch = Utc::now();
629                }
630                continue;
631            };
632
633            let job_to_enqueue = Job::builder()
634                .args(job.args.clone())
635                .metadata(
636                    JobMetadata::builder()
637                        .worker_name(job.metadata.worker_name)
638                        .build(),
639                )
640                .build();
641            if let Err(err) = context.pgmq().send(queue, &job_to_enqueue).await {
642                error!(
643                    job.id = %job.metadata.id,
644                    job.msg_id = msg.msg_id,
645                    job.read_count = msg.read_ct,
646                    worker.name = worker.inner.name,
647                    worker.queue.name = queue,
648                    "An error occurred while enqueuing periodic job: {err}"
649                );
650
651                next_fetch = Utc::now() + error_delay;
652                continue;
653            }
654
655            let delay = periodic_next_run_delay(&job.periodic.schedule, None);
656            if let Err(err) = pgmq
657                .set_vt::<serde_json::Value>(PERIODIC_QUEUE_NAME, msg.msg_id, Utc::now() + delay)
658                .await
659            {
660                error!(
661                    job.id = %job.metadata.id,
662                    job.msg_id = msg.msg_id,
663                    job.read_count = msg.read_ct,
664                    job.delay = ?delay,
665                    worker.queue.name = PERIODIC_QUEUE_NAME,
666                    worker.name = worker.inner.name,
667                    "An error occurred while updating periodic job's view timeout: {err}"
668                );
669                next_fetch = Utc::now() + error_delay;
670                continue;
671            }
672
673            next_fetch = Utc::now();
674        }
675    }
676
677    fn shared_queues(&self, config: &AppConfig) -> Vec<String> {
678        let worker_config = &config.service.worker.pg.custom;
679        shared_queues(
680            &worker_config.common.queues,
681            &self.inner.queues,
682            &worker_config.common.queue_config,
683        )
684        .map(|queue| queue.to_owned())
685        .collect_vec()
686    }
687
688    #[instrument(skip_all)]
689    #[allow(clippy::too_many_arguments)]
690    async fn retry(
691        &self,
692        pgmq: &PGMQueue,
693        queue: &QueueItem,
694        job_metadata: Option<&JobMetadata>,
695        msg_id: i64,
696        read_count: i32,
697        app_config: &AppConfig,
698        worker: Option<&WorkerWrapper<S>>,
699    ) {
700        if let Some(delay) = retry_delay(
701            app_config,
702            worker.and_then(|worker| worker.inner.worker_config.retry_config.as_ref()),
703            read_count,
704        ) {
705            // If the job can retry, update its view timeout by the calculated delay.
706            self.update_job_view_timeout(pgmq, queue, job_metadata, msg_id, read_count, delay)
707                .await;
708        } else {
709            // Otherwise, perform the failure action for the worker.
710            let action = failure_action(
711                app_config,
712                worker.and_then(|worker| worker.inner.worker_config.pg.as_ref()),
713            );
714            self.job_completed(pgmq, queue, job_metadata, msg_id, read_count, action)
715                .await;
716        }
717    }
718
719    #[instrument(skip_all)]
720    async fn update_job_view_timeout(
721        &self,
722        pgmq: &PGMQueue,
723        queue: &QueueItem,
724        job_metadata: Option<&JobMetadata>,
725        msg_id: i64,
726        read_count: i32,
727        delay: Duration,
728    ) {
729        if let Err(err) = pgmq
730            .set_vt::<serde_json::Value>(&queue.name, msg_id, Utc::now() + delay)
731            .await
732        {
733            error!(
734                job.id = optional_trace_field(job_metadata.map(|meta| meta.id)),
735                job.msg_id = msg_id,
736                job.read_count = read_count,
737                worker.queue.name = queue.name,
738                worker.name = job_metadata.map(|metadata| &metadata.worker_name),
739                "An error occurred while updating job's view timeout: {err}"
740            );
741        }
742    }
743
744    #[instrument(skip_all)]
745    async fn job_completed(
746        &self,
747        pgmq: &PGMQueue,
748        queue: &QueueItem,
749        job_metadata: Option<&JobMetadata>,
750        msg_id: i64,
751        read_count: i32,
752        action: &CompletedAction,
753    ) {
754        debug!(
755            job.id = optional_trace_field(job_metadata.map(|meta| meta.id)),
756            job.msg_id = msg_id,
757            job.read_count = read_count,
758            job.completed_action = ?action,
759            worker.queue.name = queue.name,
760            worker.name = job_metadata.map(|metadata| &metadata.worker_name),
761            "Performing completed action for a job"
762        );
763
764        let result = match action {
765            CompletedAction::Archive => pgmq.archive(&queue.name, msg_id).await,
766            CompletedAction::Delete => pgmq.delete(&queue.name, msg_id).await,
767        };
768
769        if let Err(err) = result {
770            error!(
771                job.id = optional_trace_field(job_metadata.map(|meta| meta.id)),
772                job.msg_id = msg_id,
773                job.read_count = read_count,
774                job.completed_action = ?action,
775                worker.queue.name = queue.name,
776                worker.name = job_metadata.map(|metadata| &metadata.worker_name),
777                "An error occurred while performing completed action for a job: {err}"
778            );
779        }
780    }
781}
782
783struct QueueItem {
784    name: String,
785    next_fetch: DateTime<Utc>,
786}
787
788impl Eq for QueueItem {}
789
790impl PartialEq<Self> for QueueItem {
791    fn eq(&self, other: &Self) -> bool {
792        self.next_fetch == other.next_fetch
793    }
794}
795
796impl PartialOrd<Self> for QueueItem {
797    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
798        Some(self.cmp(other))
799    }
800}
801
802impl Ord for QueueItem {
803    fn cmp(&self, other: &Self) -> Ordering {
804        // This is intentionally reversed so that `QueueItem` forms a min heap when used in
805        // a binary heap.
806        other.next_fetch.cmp(&self.next_fetch)
807    }
808}
809
810fn periodic_next_run_delay(schedule: &Schedule, now: Option<DateTime<Utc>>) -> Duration {
811    let now = now.unwrap_or_else(Utc::now);
812    let next_run = schedule.after(&now).next().unwrap_or(now);
813    let diff = max(TimeDelta::zero(), next_run - now);
814    diff.to_std().unwrap_or_else(|_| Duration::from_secs(0))
815}
816
817#[cfg(test)]
818mod tests {
819    use chrono::DateTime;
820    use chrono::Utc;
821    use cron::Schedule;
822    use insta::assert_debug_snapshot;
823    use std::str::FromStr;
824
825    #[test]
826    #[cfg_attr(coverage_nightly, coverage(off))]
827    fn periodic_queue_name() {
828        assert_eq!(super::PERIODIC_QUEUE_NAME, "periodic");
829    }
830
831    mod queue_item {
832        use crate::worker::backend::pg::processor::QueueItem;
833        use chrono::Utc;
834        use std::collections::BinaryHeap;
835        use std::time::Duration;
836
837        #[test]
838        #[cfg_attr(coverage_nightly, coverage(off))]
839        fn min_heap() {
840            let now = Utc::now();
841            let mut items = BinaryHeap::new();
842            items.push(QueueItem {
843                name: "a".to_owned(),
844                next_fetch: now + Duration::from_secs(1),
845            });
846            items.push(QueueItem {
847                name: "b".to_owned(),
848                next_fetch: now,
849            });
850            items.push(QueueItem {
851                name: "c".to_owned(),
852                next_fetch: now + Duration::from_secs(10),
853            });
854
855            assert_eq!(items.pop().unwrap().name, "b");
856            assert_eq!(items.pop().unwrap().name, "a");
857            assert_eq!(items.pop().unwrap().name, "c");
858        }
859
860        #[test]
861        #[cfg_attr(coverage_nightly, coverage(off))]
862        fn peek_mut_change_order() {
863            let now = Utc::now();
864            let mut items = BinaryHeap::new();
865            items.push(QueueItem {
866                name: "a".to_owned(),
867                next_fetch: now,
868            });
869            items.push(QueueItem {
870                name: "b".to_owned(),
871                next_fetch: now + Duration::from_secs(1),
872            });
873
874            if let Some(mut item) = items.peek_mut() {
875                item.next_fetch = now + Duration::from_secs(10);
876            }
877
878            assert_eq!(items.pop().unwrap().name, "b");
879            assert_eq!(items.pop().unwrap().name, "a");
880        }
881    }
882
883    #[test]
884    #[cfg_attr(coverage_nightly, coverage(off))]
885    fn periodic_next_run_delay() {
886        let now = DateTime::<Utc>::from_timestamp(1751701139, 0).unwrap();
887        let schedule = Schedule::from_str("* 12 * * * *").unwrap();
888        let delay = super::periodic_next_run_delay(&schedule, Some(now));
889        assert_debug_snapshot!(delay);
890    }
891}