1use crate::app::context::AppContext;
4use crate::config::AppConfig;
5use crate::config::service::worker::{BalanceStrategy, StaleCleanUpBehavior};
6use crate::error::RoadsterResult;
7use crate::util::tracing::optional_trace_field;
8use crate::worker::PeriodicArgsJson;
9use crate::worker::WorkerWrapper;
10use crate::worker::backend::pg::periodic_job::PeriodicJob;
11use crate::worker::backend::pg::{failure_action, retry_delay, success_action};
12use crate::worker::backend::shared_queues;
13use crate::worker::config::CompletedAction;
14use crate::worker::job::{Job, JobMetadata};
15use axum_core::extract::FromRef;
16use builder::PgProcessorBuilder;
17use chrono::{DateTime, TimeDelta, Utc};
18use cron::Schedule;
19use itertools::Itertools;
20use pgmq::{PGMQueue, PgmqError};
21use sqlx::Error;
22use sqlx::error::ErrorKind;
23use std::cmp::{Ordering, max};
24use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27use thiserror::Error;
28use tokio::task::JoinSet;
29use tokio::time::sleep;
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info, instrument};
32
33pub mod builder;
34
35pub(crate) const PERIODIC_QUEUE_NAME: &str = "periodic";
36
37#[derive(Debug, Error)]
38#[non_exhaustive]
39pub enum PgProcessorError {
40 #[error("The provided `Worker` was already registered: `{0}`")]
43 AlreadyRegistered(String),
44
45 #[error("The provided `Worker` name was already registered for a different type: `{0}`")]
48 AlreadyRegisteredWithDifferentType(String),
49
50 #[error(
53 "The provided periodic worker job was already registered. Worker: `{0}`, schedule: `{1}`, args: `{2}`"
54 )]
55 AlreadyRegisteredPeriodic(String, String, serde_json::Value),
56
57 #[error("No queue configured for worker `{0}`.")]
58 NoQueue(String),
59
60 #[error("{0}")]
61 InvalidBalanceStrategy(String),
62
63 #[error(transparent)]
64 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
65}
66
67#[derive(Clone)]
68#[non_exhaustive]
69pub struct PgProcessor<S>
70where
71 S: Clone + Send + Sync + 'static,
72 AppContext: FromRef<S>,
73{
74 inner: Arc<PgProcessorInner<S>>,
75}
76
77#[non_exhaustive]
78pub(crate) struct PgProcessorInner<S>
79where
80 S: Clone + Send + Sync + 'static,
81 AppContext: FromRef<S>,
82{
83 state: S,
84 queues: BTreeSet<String>,
85 workers: BTreeMap<String, WorkerWrapper<S>>,
86 periodic_workers: HashSet<PeriodicArgsJson>,
87}
88
89impl<S> PgProcessor<S>
90where
91 S: Clone + Send + Sync + 'static,
92 AppContext: FromRef<S>,
93{
94 pub(crate) fn new(inner: PgProcessorInner<S>) -> Self {
95 Self {
96 inner: Arc::new(inner),
97 }
98 }
99
100 pub fn builder(state: &S) -> PgProcessorBuilder<S> {
101 PgProcessorBuilder::new(state)
102 }
103
104 pub async fn before_run(&self, state: &S) -> RoadsterResult<()> {
105 let context = AppContext::from_ref(state);
106 if context
107 .config()
108 .service
109 .worker
110 .pg
111 .custom
112 .common
113 .balance_strategy
114 == BalanceStrategy::None
115 && self.shared_queues(context.config()).len() > 1
116 {
117 return Err(PgProcessorError::InvalidBalanceStrategy(format!(
118 "{:?} is not supported when more than one shared queue is enabled.",
119 BalanceStrategy::None
120 ))
121 .into());
122 }
123
124 self.initialize_queues().await?;
125 self.initialize_periodic(state).await?;
126 Ok(())
127 }
128
129 async fn initialize_queues(&self) -> RoadsterResult<()> {
131 let context = AppContext::from_ref(&self.inner.state);
132 for queue in self.inner.queues.iter() {
133 context.pgmq().create(queue).await?;
134 }
135 Ok(())
136 }
137
138 async fn initialize_periodic(&self, state: &S) -> RoadsterResult<()> {
140 let context = AppContext::from_ref(state);
141
142 context.pgmq().create(PERIODIC_QUEUE_NAME).await?;
144 sqlx::query!(
147 r#"CREATE UNIQUE INDEX IF NOT EXISTS roadster_periodic_hash_idx ON pgmq.q_periodic USING btree ((message->'periodic'->'hash'))"#
148 ).execute(&context.pgmq().connection).await?;
149
150 let periodic_config = &context.config().service.worker.pg.custom.custom.periodic;
151
152 let periodic_jobs = self
153 .inner
154 .periodic_workers
155 .iter()
156 .map(PeriodicJob::from)
157 .collect_vec();
158
159 match periodic_config.stale_cleanup {
160 StaleCleanUpBehavior::Manual => {}
161 StaleCleanUpBehavior::AutoCleanAll => {
162 let rows_affected = context.pgmq().purge(PERIODIC_QUEUE_NAME).await?;
163 info!(
164 count = rows_affected,
165 "Deleted all previously registered periodic jobs"
166 );
167 }
168 StaleCleanUpBehavior::AutoCleanStale => {
169 let current_job_hashes = periodic_jobs
170 .iter()
171 .map(|job| {
172 serde_json::Value::Number(serde_json::Number::from(job.periodic.hash))
173 })
174 .collect_vec();
175 let result = sqlx::query!(
176 r#"DELETE FROM pgmq.q_periodic where message->'periodic'->'hash' != ALL($1)"#,
177 current_job_hashes.as_slice()
178 )
179 .execute(&context.pgmq().connection)
180 .await?;
181 info!(
182 count = result.rows_affected(),
183 "Deleted stale periodic jobs"
184 )
185 }
186 }
187
188 for job in periodic_jobs.iter() {
189 let delay = periodic_next_run_delay(&job.periodic.schedule, None);
190 let result = context
191 .pgmq()
192 .send_delay(PERIODIC_QUEUE_NAME, job, delay.as_secs())
193 .await;
194
195 match result {
196 Ok(_) => Ok(()),
197 Err(PgmqError::DatabaseError(Error::Database(err))) => match err.kind() {
198 ErrorKind::UniqueViolation => Ok(()),
202 _ => Err(PgmqError::DatabaseError(Error::Database(err))),
203 },
204 Err(err) => Err(err),
205 }?;
206 }
207
208 Ok(())
209 }
210
211 pub(crate) fn queues(&self) -> &BTreeSet<String> {
212 &self.inner.queues
213 }
214
215 pub async fn run(self, _state: &S, cancellation_token: CancellationToken) {
216 let mut join_set = JoinSet::new();
217
218 let context = AppContext::from_ref(&self.inner.state);
219 let worker_config = &context.config().service.worker.pg.custom;
220 let dedicated_queues = &worker_config.common.queue_config;
221 let shared_queues = self.shared_queues(context.config());
222
223 if !shared_queues.is_empty() {
224 let total_worker_tasks = worker_config.common.num_workers;
225 for worker_num in 0..total_worker_tasks {
226 join_set.spawn(self.clone().process_queues(
227 cancellation_token.clone(),
228 worker_num + 1,
229 total_worker_tasks,
230 shared_queues.clone(),
231 ));
232 }
233 }
234
235 for (queue, config) in dedicated_queues {
236 let total_worker_tasks = config.num_workers.unwrap_or_default();
237 for worker_num in 0..total_worker_tasks {
238 join_set.spawn(self.clone().process_queues(
239 cancellation_token.clone(),
240 worker_num + 1,
241 total_worker_tasks,
242 vec![queue.to_owned()],
243 ));
244 }
245 }
246
247 if worker_config.custom.periodic.enable && !self.inner.periodic_workers.is_empty() {
248 join_set.spawn(self.clone().process_periodic(cancellation_token.clone()));
249 }
250
251 while let Some(result) = join_set.join_next().await {
252 cancellation_token.cancel();
255 if let Err(join_err) = result {
256 error!(
257 "An error occurred when trying to join on one of the processor's workers. Error: {join_err}"
258 );
259 }
260 }
261 }
262
263 async fn process_queues(
264 self,
265 cancellation_token: CancellationToken,
266 worker_task_num: u32,
267 total_worker_tasks: u32,
268 queues: Vec<String>,
269 ) {
270 let num_queues = queues.len();
271 let queue_name = if num_queues == 1 {
272 queues.first().cloned()
273 } else {
274 None
275 };
276
277 let mut queues: BinaryHeap<QueueItem> = queues
278 .into_iter()
279 .map(|name| QueueItem {
280 name,
281 next_fetch: Utc::now(),
282 })
283 .collect();
284
285 let context = AppContext::from_ref(&self.inner.state);
286 let default_worker_config = &context.config().service.worker.worker_config;
287 let default_max_duration = default_worker_config.max_duration;
288 let default_view_timeout = default_max_duration
289 .as_ref()
290 .and_then(|duration| duration.as_secs().try_into().ok());
291
292 let empty_delay = context
293 .config()
294 .service
295 .worker
296 .pg
297 .custom
298 .custom
299 .queue_fetch_config
300 .as_ref()
301 .and_then(|config| config.empty_delay)
302 .unwrap_or_default();
303
304 let error_delay = context
305 .config()
306 .service
307 .worker
308 .pg
309 .custom
310 .custom
311 .queue_fetch_config
312 .as_ref()
313 .and_then(|config| config.error_delay)
314 .unwrap_or_default();
315
316 let pgmq = context.pgmq();
317 loop {
318 while let Some(mut queue) = queues.peek_mut() {
319 {
320 let diff = max(TimeDelta::zero(), queue.next_fetch - Utc::now());
321 let duration = diff.to_std().unwrap_or_else(|_| Duration::from_secs(0));
322 tokio::select! {
323 biased;
325
326 _ = cancellation_token.cancelled() => {
327 info!(
328 worker_task_num,
329 total_worker_tasks,
330 num_queues,
331 queue = queue_name,
332 "Exiting processor worker loop"
333 );
334 return;
335
336 },
337 _ = sleep(duration) => (),
338 }
339 }
340
341 let msg = match pgmq
351 .read::<serde_json::Value>(&queue.name, default_view_timeout)
352 .await
353 {
354 Ok(Some(msg)) => msg,
355 Ok(None) => {
356 queue.next_fetch = Utc::now() + empty_delay;
357 continue;
358 }
359 Err(err) => {
360 error!(
361 worker.queue.name = queue.name,
362 "An error occurred while reading from pgmq: {err}"
363 );
364 queue.next_fetch = Utc::now() + error_delay;
365 continue;
366 }
367 };
368
369 let job: Job = match serde_json::from_value(msg.message) {
370 Ok(job) => job,
371 Err(err) => {
372 error!(
373 job.msg_id = msg.msg_id,
374 job.read_count = msg.read_ct,
375 worker.queue.name = queue.name,
376 "An error occurred while deserializing message from pgmq: {err}"
377 );
378 self.retry(
379 pgmq,
380 &queue,
381 None,
382 msg.msg_id,
383 msg.read_ct,
384 context.config(),
385 None,
386 )
387 .await;
388
389 queue.next_fetch = Utc::now();
390 continue;
391 }
392 };
393
394 let worker = if let Some(worker) = self.inner.workers.get(&job.metadata.worker_name)
395 {
396 worker
397 } else {
398 error!(
399 job.id = %job.metadata.id,
400 job.msg_id = msg.msg_id,
401 job.read_count = msg.read_ct,
402 worker.queue.name = queue.name,
403 worker.name = job.metadata.worker_name,
404 "Unable to handle job, worker not registered"
405 );
406 self.retry(
407 pgmq,
408 &queue,
409 Some(&job.metadata),
410 msg.msg_id,
411 msg.read_ct,
412 context.config(),
413 None,
414 )
415 .await;
416
417 queue.next_fetch = Utc::now();
418 continue;
419 };
420
421 let max_duration = if let Some((worker_max, default_max)) = worker
424 .inner
425 .worker_config
426 .max_duration
427 .zip(default_max_duration)
428 {
429 if worker_max != default_max {
430 Some(worker_max)
431 } else {
432 None
433 }
434 } else {
435 worker.inner.worker_config.max_duration
436 };
437 if let Some(delay) = max_duration {
438 self.update_job_view_timeout(
439 pgmq,
440 &queue,
441 Some(&job.metadata),
442 msg.msg_id,
443 msg.read_ct,
444 delay,
445 )
446 .await;
447 }
448
449 let result = worker
450 .handle(&self.inner.state, &job.metadata, job.args)
451 .await;
452
453 if let Err(err) = result {
454 error!(
455 job.id = %job.metadata.id,
456 job.msg_id = msg.msg_id,
457 job.read_count = msg.read_ct,
458 worker.queue.name = queue.name,
459 worker.name = job.metadata.worker_name,
460 "An error occurred while handling a job: {err}"
461 );
462 self.retry(
463 pgmq,
464 &queue,
465 Some(&job.metadata),
466 msg.msg_id,
467 msg.read_ct,
468 context.config(),
469 Some(worker),
470 )
471 .await;
472 } else {
473 let action =
474 success_action(context.config(), worker.inner.worker_config.pg.as_ref());
475 self.job_completed(
476 pgmq,
477 &queue,
478 Some(&job.metadata),
479 msg.msg_id,
480 msg.read_ct,
481 action,
482 )
483 .await;
484 }
485
486 #[cfg(feature = "bench")]
487 (worker.inner.on_complete_fn)().await;
488
489 queue.next_fetch = Utc::now();
490 }
491 }
492 }
493
494 async fn process_periodic(self, cancellation_token: CancellationToken) {
495 let context = AppContext::from_ref(&self.inner.state);
496 let default_enqueue_config = &context.config().service.worker.enqueue_config;
497 let default_worker_config = &context.config().service.worker.worker_config;
498 let default_max_duration = default_worker_config.max_duration;
499 let default_view_timeout = default_max_duration
500 .as_ref()
501 .and_then(|duration| duration.as_secs().try_into().ok());
502
503 let empty_delay = context
504 .config()
505 .service
506 .worker
507 .pg
508 .custom
509 .custom
510 .queue_fetch_config
511 .as_ref()
512 .and_then(|config| config.empty_delay)
513 .unwrap_or_default();
514
515 let error_delay = context
516 .config()
517 .service
518 .worker
519 .pg
520 .custom
521 .custom
522 .queue_fetch_config
523 .as_ref()
524 .and_then(|config| config.error_delay)
525 .unwrap_or_default();
526
527 let mut next_fetch = Utc::now();
528
529 let pgmq = context.pgmq();
530 loop {
531 {
532 let diff = max(TimeDelta::zero(), next_fetch - Utc::now());
533 let duration = diff.to_std().unwrap_or_else(|_| Duration::from_secs(0));
534 tokio::select! {
535 biased;
537
538 _ = cancellation_token.cancelled() => {
539 info!("Exiting processor periodic worker loop");
540 return;
541 },
542 _ = sleep(duration) => (),
543 }
544 }
545
546 let msg = match pgmq
556 .read::<serde_json::Value>(PERIODIC_QUEUE_NAME, default_view_timeout)
557 .await
558 {
559 Ok(Some(msg)) => msg,
560 Ok(None) => {
561 next_fetch = Utc::now() + empty_delay;
562 continue;
563 }
564 Err(err) => {
565 error!(
566 worker.queue.name = PERIODIC_QUEUE_NAME,
567 "An error occurred while reading from pgmq: {err}"
568 );
569 next_fetch = Utc::now() + error_delay;
570 continue;
571 }
572 };
573
574 let job: PeriodicJob = match serde_json::from_value(msg.message) {
575 Ok(job) => job,
576 Err(err) => {
577 error!(
578 job.msg_id = msg.msg_id,
579 job.read_count = msg.read_ct,
580 worker.queue.name = PERIODIC_QUEUE_NAME,
581 "An error occurred while deserializing message from pgmq: {err}"
582 );
583 if let Err(err) = context.pgmq().delete(PERIODIC_QUEUE_NAME, msg.msg_id).await {
586 error!(
587 job.msg_id = msg.msg_id,
588 job.read_count = msg.read_ct,
589 worker.queue.name = PERIODIC_QUEUE_NAME,
590 "An error occurred while deleting periodic job: {err}"
591 );
592 next_fetch = Utc::now() + error_delay;
593 } else {
594 next_fetch = Utc::now();
595 }
596 continue;
597 }
598 };
599
600 let worker = self.inner.workers.get(&job.metadata.worker_name);
601 let queue = worker
602 .and_then(|worker| worker.inner.enqueue_config.queue.as_ref())
603 .or(default_enqueue_config.queue.as_ref());
604
605 let (worker, queue) = if let Some((worker, queue)) = worker.zip(queue) {
606 (worker, queue)
607 } else {
608 error!(
609 job.id = %job.metadata.id,
610 job.msg_id = msg.msg_id,
611 job.read_count = msg.read_ct,
612 worker.name = job.metadata.worker_name,
613 worker.queue.name = queue,
614 "Unable to enqueue job; worker not registered or no queue configured"
615 );
616 if let Err(err) = context.pgmq().delete(PERIODIC_QUEUE_NAME, msg.msg_id).await {
619 error!(
620 job.id = %job.metadata.id,
621 job.msg_id = msg.msg_id,
622 job.read_count = msg.read_ct,
623 worker.queue.name = PERIODIC_QUEUE_NAME,
624 "An error occurred while deleting periodic job: {err}"
625 );
626 next_fetch = Utc::now() + error_delay;
627 } else {
628 next_fetch = Utc::now();
629 }
630 continue;
631 };
632
633 let job_to_enqueue = Job::builder()
634 .args(job.args.clone())
635 .metadata(
636 JobMetadata::builder()
637 .worker_name(job.metadata.worker_name)
638 .build(),
639 )
640 .build();
641 if let Err(err) = context.pgmq().send(queue, &job_to_enqueue).await {
642 error!(
643 job.id = %job.metadata.id,
644 job.msg_id = msg.msg_id,
645 job.read_count = msg.read_ct,
646 worker.name = worker.inner.name,
647 worker.queue.name = queue,
648 "An error occurred while enqueuing periodic job: {err}"
649 );
650
651 next_fetch = Utc::now() + error_delay;
652 continue;
653 }
654
655 let delay = periodic_next_run_delay(&job.periodic.schedule, None);
656 if let Err(err) = pgmq
657 .set_vt::<serde_json::Value>(PERIODIC_QUEUE_NAME, msg.msg_id, Utc::now() + delay)
658 .await
659 {
660 error!(
661 job.id = %job.metadata.id,
662 job.msg_id = msg.msg_id,
663 job.read_count = msg.read_ct,
664 job.delay = ?delay,
665 worker.queue.name = PERIODIC_QUEUE_NAME,
666 worker.name = worker.inner.name,
667 "An error occurred while updating periodic job's view timeout: {err}"
668 );
669 next_fetch = Utc::now() + error_delay;
670 continue;
671 }
672
673 next_fetch = Utc::now();
674 }
675 }
676
677 fn shared_queues(&self, config: &AppConfig) -> Vec<String> {
678 let worker_config = &config.service.worker.pg.custom;
679 shared_queues(
680 &worker_config.common.queues,
681 &self.inner.queues,
682 &worker_config.common.queue_config,
683 )
684 .map(|queue| queue.to_owned())
685 .collect_vec()
686 }
687
688 #[instrument(skip_all)]
689 #[allow(clippy::too_many_arguments)]
690 async fn retry(
691 &self,
692 pgmq: &PGMQueue,
693 queue: &QueueItem,
694 job_metadata: Option<&JobMetadata>,
695 msg_id: i64,
696 read_count: i32,
697 app_config: &AppConfig,
698 worker: Option<&WorkerWrapper<S>>,
699 ) {
700 if let Some(delay) = retry_delay(
701 app_config,
702 worker.and_then(|worker| worker.inner.worker_config.retry_config.as_ref()),
703 read_count,
704 ) {
705 self.update_job_view_timeout(pgmq, queue, job_metadata, msg_id, read_count, delay)
707 .await;
708 } else {
709 let action = failure_action(
711 app_config,
712 worker.and_then(|worker| worker.inner.worker_config.pg.as_ref()),
713 );
714 self.job_completed(pgmq, queue, job_metadata, msg_id, read_count, action)
715 .await;
716 }
717 }
718
719 #[instrument(skip_all)]
720 async fn update_job_view_timeout(
721 &self,
722 pgmq: &PGMQueue,
723 queue: &QueueItem,
724 job_metadata: Option<&JobMetadata>,
725 msg_id: i64,
726 read_count: i32,
727 delay: Duration,
728 ) {
729 if let Err(err) = pgmq
730 .set_vt::<serde_json::Value>(&queue.name, msg_id, Utc::now() + delay)
731 .await
732 {
733 error!(
734 job.id = optional_trace_field(job_metadata.map(|meta| meta.id)),
735 job.msg_id = msg_id,
736 job.read_count = read_count,
737 worker.queue.name = queue.name,
738 worker.name = job_metadata.map(|metadata| &metadata.worker_name),
739 "An error occurred while updating job's view timeout: {err}"
740 );
741 }
742 }
743
744 #[instrument(skip_all)]
745 async fn job_completed(
746 &self,
747 pgmq: &PGMQueue,
748 queue: &QueueItem,
749 job_metadata: Option<&JobMetadata>,
750 msg_id: i64,
751 read_count: i32,
752 action: &CompletedAction,
753 ) {
754 debug!(
755 job.id = optional_trace_field(job_metadata.map(|meta| meta.id)),
756 job.msg_id = msg_id,
757 job.read_count = read_count,
758 job.completed_action = ?action,
759 worker.queue.name = queue.name,
760 worker.name = job_metadata.map(|metadata| &metadata.worker_name),
761 "Performing completed action for a job"
762 );
763
764 let result = match action {
765 CompletedAction::Archive => pgmq.archive(&queue.name, msg_id).await,
766 CompletedAction::Delete => pgmq.delete(&queue.name, msg_id).await,
767 };
768
769 if let Err(err) = result {
770 error!(
771 job.id = optional_trace_field(job_metadata.map(|meta| meta.id)),
772 job.msg_id = msg_id,
773 job.read_count = read_count,
774 job.completed_action = ?action,
775 worker.queue.name = queue.name,
776 worker.name = job_metadata.map(|metadata| &metadata.worker_name),
777 "An error occurred while performing completed action for a job: {err}"
778 );
779 }
780 }
781}
782
783struct QueueItem {
784 name: String,
785 next_fetch: DateTime<Utc>,
786}
787
788impl Eq for QueueItem {}
789
790impl PartialEq<Self> for QueueItem {
791 fn eq(&self, other: &Self) -> bool {
792 self.next_fetch == other.next_fetch
793 }
794}
795
796impl PartialOrd<Self> for QueueItem {
797 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
798 Some(self.cmp(other))
799 }
800}
801
802impl Ord for QueueItem {
803 fn cmp(&self, other: &Self) -> Ordering {
804 other.next_fetch.cmp(&self.next_fetch)
807 }
808}
809
810fn periodic_next_run_delay(schedule: &Schedule, now: Option<DateTime<Utc>>) -> Duration {
811 let now = now.unwrap_or_else(Utc::now);
812 let next_run = schedule.after(&now).next().unwrap_or(now);
813 let diff = max(TimeDelta::zero(), next_run - now);
814 diff.to_std().unwrap_or_else(|_| Duration::from_secs(0))
815}
816
817#[cfg(test)]
818mod tests {
819 use chrono::DateTime;
820 use chrono::Utc;
821 use cron::Schedule;
822 use insta::assert_debug_snapshot;
823 use std::str::FromStr;
824
825 #[test]
826 #[cfg_attr(coverage_nightly, coverage(off))]
827 fn periodic_queue_name() {
828 assert_eq!(super::PERIODIC_QUEUE_NAME, "periodic");
829 }
830
831 mod queue_item {
832 use crate::worker::backend::pg::processor::QueueItem;
833 use chrono::Utc;
834 use std::collections::BinaryHeap;
835 use std::time::Duration;
836
837 #[test]
838 #[cfg_attr(coverage_nightly, coverage(off))]
839 fn min_heap() {
840 let now = Utc::now();
841 let mut items = BinaryHeap::new();
842 items.push(QueueItem {
843 name: "a".to_owned(),
844 next_fetch: now + Duration::from_secs(1),
845 });
846 items.push(QueueItem {
847 name: "b".to_owned(),
848 next_fetch: now,
849 });
850 items.push(QueueItem {
851 name: "c".to_owned(),
852 next_fetch: now + Duration::from_secs(10),
853 });
854
855 assert_eq!(items.pop().unwrap().name, "b");
856 assert_eq!(items.pop().unwrap().name, "a");
857 assert_eq!(items.pop().unwrap().name, "c");
858 }
859
860 #[test]
861 #[cfg_attr(coverage_nightly, coverage(off))]
862 fn peek_mut_change_order() {
863 let now = Utc::now();
864 let mut items = BinaryHeap::new();
865 items.push(QueueItem {
866 name: "a".to_owned(),
867 next_fetch: now,
868 });
869 items.push(QueueItem {
870 name: "b".to_owned(),
871 next_fetch: now + Duration::from_secs(1),
872 });
873
874 if let Some(mut item) = items.peek_mut() {
875 item.next_fetch = now + Duration::from_secs(10);
876 }
877
878 assert_eq!(items.pop().unwrap().name, "b");
879 assert_eq!(items.pop().unwrap().name, "a");
880 }
881 }
882
883 #[test]
884 #[cfg_attr(coverage_nightly, coverage(off))]
885 fn periodic_next_run_delay() {
886 let now = DateTime::<Utc>::from_timestamp(1751701139, 0).unwrap();
887 let schedule = Schedule::from_str("* 12 * * * *").unwrap();
888 let delay = super::periodic_next_run_delay(&schedule, Some(now));
889 assert_debug_snapshot!(delay);
890 }
891}