rexecutor/
lib.rs

1//! A robust job processing library for rust.
2//!
3//! # Setting up `Rexecutor`
4//!
5//! To create an instance of [`Rexecutor`] you will need to have an implementation of [`Backend`].
6//! The only one provided in this crate is [`backend::memory::InMemoryBackend`] which is primarily
7//! provided for testing purposes. Instead a backend should be used from one of the implementations
8//! provided by other crates.
9//!
10//! # Creating executors
11//!
12//! Jobs are defined by creating a struct/enum and implementing `Executor` for it.
13//!
14//! ## Example defining an executor
15//!
16//! You can define and enqueue a job as follows:
17//!
18//! ```
19//! use rexecutor::prelude::*;
20//! use chrono::{Utc, TimeDelta};
21//! use rexecutor::backend::memory::InMemoryBackend;
22//! use rexecutor::assert_enqueued;
23//! let backend = InMemoryBackend::new().paused();
24//! Rexecutor::new(backend).set_global_backend().unwrap();
25//! struct EmailJob;
26//!
27//! #[async_trait::async_trait]
28//! impl Executor for EmailJob {
29//!     type Data = String;
30//!     type Metadata = String;
31//!     const NAME: &'static str = "email_job";
32//!     const MAX_ATTEMPTS: u16 = 2;
33//!     async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
34//!         println!("{} running, with args: {}", Self::NAME, job.data);
35//!         /// Do something important with an email
36//!         ExecutionResult::Done
37//!     }
38//! }
39//!
40//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
41//! let _ = EmailJob::builder()
42//!     .with_data("bob.shuruncle@example.com".to_owned())
43//!     .schedule_in(TimeDelta::hours(3))
44//!     .enqueue()
45//!     .await;
46//!
47//! assert_enqueued!(
48//!     with_data: "bob.shuruncle@example.com".to_owned(),
49//!     scheduled_after: Utc::now() + TimeDelta::minutes(170),
50//!     scheduled_before: Utc::now() + TimeDelta::minutes(190),
51//!     for_executor: EmailJob
52//! );
53//! # });
54//! ```
55//!
56//! ## Unique jobs
57//!
58//! It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as
59//! part of the implementation of `Executor` via [`Executor::UNIQUENESS_CRITERIA`] or when
60//! inserting the job via [`job::builder::JobBuilder::unique`].
61//!
62//! For example to ensure that only one unique job is ran every five minutes it is possible to use
63//! the following uniqueness criteria.
64//!
65//! ```
66//! # use rexecutor::prelude::*;
67//! # use chrono::{Utc, TimeDelta};
68//! # use rexecutor::backend::memory::InMemoryBackend;
69//! # use rexecutor::assert_enqueued;
70//! # let backend = InMemoryBackend::new().paused();
71//! # Rexecutor::new(backend).set_global_backend().unwrap();
72//! struct UniqueJob;
73//!
74//! #[async_trait::async_trait]
75//! impl Executor for UniqueJob {
76//!     type Data = ();
77//!     type Metadata = ();
78//!     const NAME: &'static str = "unique_job";
79//!     const MAX_ATTEMPTS: u16 = 1;
80//!     const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
81//!         UniquenessCriteria::by_executor()
82//!             .and_within(TimeDelta::seconds(300))
83//!             .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
84//!     );
85//!     async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
86//!         println!("{} running, with args: {:?}", Self::NAME, job.data);
87//!         // Do something important
88//!         ExecutionResult::Done
89//!     }
90//! }
91//!
92//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
93//! let _ = UniqueJob::builder().enqueue().await;
94//! let _ = UniqueJob::builder().enqueue().await;
95//!
96//! // Only one of jobs was enqueued
97//! assert_enqueued!(
98//!     1 job,
99//!     scheduled_before: Utc::now(),
100//!     for_executor: UniqueJob
101//! );
102//! # });
103//! ```
104//!
105//! Additionally it is possible to specify what action should be taken when there is a conflicting
106//! job. In the example above the priority is override. For more details of how to use uniqueness
107//! see [`job::uniqueness_criteria::UniquenessCriteria`].
108//!
109//!
110//! # Overriding [`Executor`] default values
111//!
112//! When defining an [`Executor`] you specify the maximum number of attempts via
113//! [`Executor::MAX_ATTEMPTS`]. However, when inserting a job it is possible to override this value
114//! by calling [`job::builder::JobBuilder::with_max_attempts`] (if not called the max attempts will be equal to
115//! [`Executor::MAX_ATTEMPTS`]).
116//!
117//! Similarly, the executor can define a job uniqueness criteria via
118//! [`Executor::UNIQUENESS_CRITERIA`]. However, using [`job::builder::JobBuilder::unique`] it is possible to
119//! override this value for a specific job.
120//!
121//! # Setting up executors to run
122//!
123//! For each executor you would like to run [`Rexecutor::with_executor`] should be called. Being
124//! explicit about this opens the possibility of having specific nodes in a cluster running as
125//! worker nodes for certain enqueued jobs while other node not responsible for their execution.
126//!
127//! ## Example setting up executors
128//!
129//! ```
130//! # use rexecutor::prelude::*;
131//! # use std::str::FromStr;
132//! # use chrono::TimeDelta;
133//! # use rexecutor::backend::memory::InMemoryBackend;
134//! # pub(crate) struct RefreshWorker;
135//! # pub(crate) struct EmailScheduler;
136//! # pub(crate) struct RegistrationWorker;
137//! #
138//! # #[async_trait::async_trait]
139//! # impl Executor for RefreshWorker {
140//! #     type Data = String;
141//! #     type Metadata = String;
142//! #     const NAME: &'static str = "refresh_worker";
143//! #     const MAX_ATTEMPTS: u16 = 2;
144//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
145//! #         ExecutionResult::Done
146//! #     }
147//! # }
148//! # #[async_trait::async_trait]
149//! # impl Executor for EmailScheduler {
150//! #     type Data = String;
151//! #     type Metadata = String;
152//! #     const NAME: &'static str = "email_scheduler";
153//! #     const MAX_ATTEMPTS: u16 = 2;
154//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
155//! #         ExecutionResult::Done
156//! #     }
157//! # }
158//! # #[async_trait::async_trait]
159//! # impl Executor for RegistrationWorker {
160//! #     type Data = String;
161//! #     type Metadata = String;
162//! #     const NAME: &'static str = "registration_worker";
163//! #     const MAX_ATTEMPTS: u16 = 2;
164//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
165//! #         ExecutionResult::Done
166//! #     }
167//! # }
168//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
169//! let backend = InMemoryBackend::new();
170//! Rexecutor::new(backend)
171//!     .with_executor::<RefreshWorker>()
172//!     .with_executor::<EmailScheduler>()
173//!     .with_executor::<RegistrationWorker>();
174//! # });
175//! ```
176//!
177//! # Enqueuing jobs
178//!
179//! Generally jobs will be enqueued using the [`job::builder::JobBuilder`] returned by [`Executor::builder`].
180//!
181//! When enqueuing jobs the data and metadata of the job can be specified. Additionally, the
182//! default value of the [`Executor`] can be overriden.
183//!
184//! ## Overriding [`Executor`] default values
185//!
186//! When defining an [`Executor`] you specify the maximum number of attempts via
187//! [`Executor::MAX_ATTEMPTS`]. However, when inserting a job it is possible to override this value
188//! by calling [`job::builder::JobBuilder::with_max_attempts`] (if not called the max attempts will be equal to
189//! [`Executor::MAX_ATTEMPTS`]).
190//!
191//! Similarly, the executor can define a job uniqueness criteria via
192//! [`Executor::UNIQUENESS_CRITERIA`]. However, using [`job::builder::JobBuilder::unique`] it is possible to
193//! override this value for a specific job.
194//!
195//! ## Example enqueuing a job
196//!
197//! ```
198//! # use rexecutor::prelude::*;
199//! # use std::sync::Arc;
200//! # use chrono::{Utc, TimeDelta};
201//! # use rexecutor::backend::memory::InMemoryBackend;
202//! # use rexecutor::assert_enqueued;
203//! # pub(crate) struct ExampleExecutor;
204//! #
205//! # #[async_trait::async_trait]
206//! # impl Executor for ExampleExecutor {
207//! #     type Data = String;
208//! #     type Metadata = String;
209//! #     const NAME: &'static str = "simple_executor";
210//! #     const MAX_ATTEMPTS: u16 = 2;
211//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
212//! #         ExecutionResult::Done
213//! #     }
214//! # }
215//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
216//! let backend = Arc::new(InMemoryBackend::new().paused());
217//! Rexecutor::new(backend.clone()).set_global_backend().unwrap();
218//!
219//! ExampleExecutor::builder()
220//!     .with_max_attempts(2)
221//!     .with_tags(vec!["initial_job", "delayed"])
222//!     .with_data("First job".into())
223//!     .schedule_in(TimeDelta::hours(2))
224//!     .enqueue_to_backend(&backend)
225//!     .await
226//!     .unwrap();
227//!
228//! assert_enqueued!(
229//!     to: backend,
230//!     with_data: "First job".to_owned(),
231//!     tagged_with: ["initial_job", "delayed"],
232//!     scheduled_after: Utc::now() + TimeDelta::minutes(110),
233//!     scheduled_before: Utc::now() + TimeDelta::minutes(130),
234//!     for_executor: ExampleExecutor
235//! );
236//! # });
237//! ```
238//!
239//! # Compile time scheduling of cron jobs
240//!
241//! It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
242//! either [`Rexecutor::with_cron_executor`] or [`Rexecutor::with_cron_executor_for_timezone`]. The
243//! later is use to specify the specific timezone that the jobs should be scheduled to run in.
244//!
245//! ## Example setting up a UTC cron job
246//!
247//! To setup a cron jobs to run every day at midnight you can use the following code.
248//!
249//! ```
250//! # use rexecutor::prelude::*;
251//! # use rexecutor::backend::{Backend, memory::InMemoryBackend};
252//! struct CronJob;
253//! #[async_trait::async_trait]
254//! impl Executor for CronJob {
255//!     type Data = String;
256//!     type Metadata = ();
257//!     const NAME: &'static str = "cron_job";
258//!     const MAX_ATTEMPTS: u16 = 1;
259//!     async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
260//!         /// Do something important
261//!         ExecutionResult::Done
262//!     }
263//! }
264//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
265//! let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
266//!
267//! let backend = InMemoryBackend::new();
268//! Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
269//! # });
270//! ```
271//!
272//! # Pruning jobs
273//!
274//! After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
275//! To setup the job pruner [`Rexecutor::with_job_pruner`] should be called passing in the given
276//! [`PrunerConfig`].
277//!
278//! Given the different ways in which jobs can finish it is often useful to be able to have fine
279//! grained control over how old jobs should be cleaned up. [`PrunerConfig`] enables such control.
280//!
281//! When constructing [`PrunerConfig`] a [`cron::Schedule`] is provided to specify when the pruner
282//! should run.
283//!
284//! Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
285//! from once a year through to multiple times per hour.
286//!
287//! ## Example configuring the job pruner
288//!
289//! ```
290//! # use rexecutor::prelude::*;
291//! # use std::str::FromStr;
292//! # use chrono::TimeDelta;
293//! # use rexecutor::backend::memory::InMemoryBackend;
294//! # pub(crate) struct RefreshWorker;
295//! # pub(crate) struct EmailScheduler;
296//! # pub(crate) struct RegistrationWorker;
297//! #
298//! # #[async_trait::async_trait]
299//! # impl Executor for RefreshWorker {
300//! #     type Data = String;
301//! #     type Metadata = String;
302//! #     const NAME: &'static str = "refresh_worker";
303//! #     const MAX_ATTEMPTS: u16 = 2;
304//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
305//! #         ExecutionResult::Done
306//! #     }
307//! # }
308//! # #[async_trait::async_trait]
309//! # impl Executor for EmailScheduler {
310//! #     type Data = String;
311//! #     type Metadata = String;
312//! #     const NAME: &'static str = "email_scheduler";
313//! #     const MAX_ATTEMPTS: u16 = 2;
314//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
315//! #         ExecutionResult::Done
316//! #     }
317//! # }
318//! # #[async_trait::async_trait]
319//! # impl Executor for RegistrationWorker {
320//! #     type Data = String;
321//! #     type Metadata = String;
322//! #     const NAME: &'static str = "registration_worker";
323//! #     const MAX_ATTEMPTS: u16 = 2;
324//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
325//! #         ExecutionResult::Done
326//! #     }
327//! # }
328//! let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
329//!     .with_max_concurrency(Some(2))
330//!     .with_pruner(
331//!         Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
332//!             .only::<RefreshWorker>()
333//!             .and::<EmailScheduler>(),
334//!     )
335//!     .with_pruner(
336//!         Pruner::max_length(200, JobStatus::Discarded)
337//!             .except::<RefreshWorker>()
338//!             .and::<EmailScheduler>(),
339//!     );
340//!
341//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
342//! let backend = InMemoryBackend::new();
343//! Rexecutor::new(backend)
344//!     .with_executor::<RefreshWorker>()
345//!     .with_executor::<EmailScheduler>()
346//!     .with_executor::<RegistrationWorker>()
347//!     .with_job_pruner(config);
348//! # });
349//! ```
350//!
351//! # Shutting rexecutor down
352//!
353//! To avoid jobs getting killed mid way through their executions it is important to make use of
354//! graceful shutdown. This can either explicitly be called using [`Rexecutor::graceful_shutdown`],
355//! or via use of the [`DropGuard`] obtained via [`Rexecutor::drop_guard`].
356//!
357//! Using [`Rexecutor::graceful_shutdown`] or [`Rexecutor::drop_guard`] will ensure that all
358//! currently executing jobs will be allowed time to complete before shutting rexecutor down.
359//!
360//! ## Example using the `DropGuard`
361//!
362//! ```
363//! # use rexecutor::prelude::*;
364//! # use std::str::FromStr;
365//! # use chrono::TimeDelta;
366//! # use rexecutor::backend::memory::InMemoryBackend;
367//! # pub(crate) struct RefreshWorker;
368//! # pub(crate) struct EmailScheduler;
369//! # pub(crate) struct RegistrationWorker;
370//! #
371//! # #[async_trait::async_trait]
372//! # impl Executor for RefreshWorker {
373//! #     type Data = String;
374//! #     type Metadata = String;
375//! #     const NAME: &'static str = "refresh_worker";
376//! #     const MAX_ATTEMPTS: u16 = 2;
377//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
378//! #         ExecutionResult::Done
379//! #     }
380//! # }
381//! # #[async_trait::async_trait]
382//! # impl Executor for EmailScheduler {
383//! #     type Data = String;
384//! #     type Metadata = String;
385//! #     const NAME: &'static str = "email_scheduler";
386//! #     const MAX_ATTEMPTS: u16 = 2;
387//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
388//! #         ExecutionResult::Done
389//! #     }
390//! # }
391//! # #[async_trait::async_trait]
392//! # impl Executor for RegistrationWorker {
393//! #     type Data = String;
394//! #     type Metadata = String;
395//! #     const NAME: &'static str = "registration_worker";
396//! #     const MAX_ATTEMPTS: u16 = 2;
397//! #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
398//! #         ExecutionResult::Done
399//! #     }
400//! # }
401//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
402//! let backend = InMemoryBackend::new();
403//! // Note this must be given a name to ensure it is dropped at the end of the scope.
404//! // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
405//! let _guard = Rexecutor::new(backend)
406//!     .with_executor::<RefreshWorker>()
407//!     .with_executor::<EmailScheduler>()
408//!     .with_executor::<RegistrationWorker>()
409//!     .drop_guard();
410//! # });
411//! ```
412//!
413//! # Global backend
414//!
415//! Rexecutor can be ran either with use of a global backend. This enables the use of the
416//! convenience [`job::builder::JobBuilder::enqueue`] method which does not require a reference to
417//! the backend to be passed down to the code that needs to enqueue a job.
418//!
419//! The global backend can be set using [`Rexecutor::set_global_backend`] this should only be
420//! called once otherwise it will return an error.
421//!
422//! In fact for a single [`Rexecutor`] instance it is impossible to call this twice
423//!
424//! ```compile_fail
425//! # use rexecutor::prelude::*;
426//! let backend = rexecutor::backend::memory::InMemoryBackend::new();
427//! Rexecutor::new(backend).set_global_backend().set_global_backend();
428//! ```
429//!
430//! Note, using a global backend has many of the same drawbacks of any global variable in
431//! particular it can make unit testing more difficult.
432#![deny(missing_docs)]
433use std::{hash::Hash, marker::PhantomData};
434
435pub mod backend;
436pub mod backoff;
437mod cron_runner;
438pub mod executor;
439#[doc(hidden)]
440pub mod global_backend;
441pub mod job;
442pub mod prelude;
443pub mod pruner;
444pub mod testing;
445
446use backend::{Backend, BackendError};
447use chrono::{TimeZone, Utc};
448use cron_runner::CronRunner;
449use executor::Executor;
450use global_backend::GlobalBackend;
451use job::runner::JobRunner;
452use pruner::{PrunerConfig, runner::PrunerRunner};
453use serde::{Serialize, de::DeserializeOwned};
454use thiserror::Error;
455use tokio_util::sync::CancellationToken;
456
457trait InternalRexecutorState {}
458
459/// The state of [`Rexecutor`] after the global backend has been set, this makes it possible to
460/// avoid calling [`Rexecutor::set_global_backend`] multiple times from the same instance.
461#[doc(hidden)]
462pub struct GlobalSet;
463
464/// The state of [`Rexecutor`] before the global backend has been set. Using different states,
465/// makes it possible to avoid calling [`Rexecutor::set_global_backend`] multiple times from the
466/// same instance.
467#[doc(hidden)]
468pub struct GlobalUnset;
469impl InternalRexecutorState for GlobalUnset {}
470impl InternalRexecutorState for GlobalSet {}
471
472/// The entry point for setting up rexecutor.
473///
474/// To create an instance of [`Rexecutor`] you will need to have an implementation of [`Backend`].
475/// The only one provided in this crate is [`backend::memory::InMemoryBackend`] which is primarily
476/// provided for testing purposes. Instead a backend should be used from one of the implementations
477/// provided by other crates.
478///
479/// # Setting up executors
480///
481/// For each executor you would like to run [`Rexecutor::with_executor`] should be called. Being
482/// explicit about this opens the possibility of having specific nodes in a cluster running as
483/// worker nodes for certain enqueued jobs while other node not responsible for their execution.
484///
485/// ## Example setting up executors
486///
487/// ```
488/// # use rexecutor::prelude::*;
489/// # use std::str::FromStr;
490/// # use chrono::TimeDelta;
491/// # use rexecutor::backend::memory::InMemoryBackend;
492/// # pub(crate) struct RefreshWorker;
493/// # pub(crate) struct EmailScheduler;
494/// # pub(crate) struct RegistrationWorker;
495/// #
496/// # #[async_trait::async_trait]
497/// # impl Executor for RefreshWorker {
498/// #     type Data = String;
499/// #     type Metadata = String;
500/// #     const NAME: &'static str = "refresh_worker";
501/// #     const MAX_ATTEMPTS: u16 = 2;
502/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
503/// #         ExecutionResult::Done
504/// #     }
505/// # }
506/// # #[async_trait::async_trait]
507/// # impl Executor for EmailScheduler {
508/// #     type Data = String;
509/// #     type Metadata = String;
510/// #     const NAME: &'static str = "email_scheduler";
511/// #     const MAX_ATTEMPTS: u16 = 2;
512/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
513/// #         ExecutionResult::Done
514/// #     }
515/// # }
516/// # #[async_trait::async_trait]
517/// # impl Executor for RegistrationWorker {
518/// #     type Data = String;
519/// #     type Metadata = String;
520/// #     const NAME: &'static str = "registration_worker";
521/// #     const MAX_ATTEMPTS: u16 = 2;
522/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
523/// #         ExecutionResult::Done
524/// #     }
525/// # }
526/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
527/// let backend = InMemoryBackend::new();
528/// Rexecutor::new(backend)
529///     .with_executor::<RefreshWorker>()
530///     .with_executor::<EmailScheduler>()
531///     .with_executor::<RegistrationWorker>();
532/// # });
533/// ```
534///
535/// # Compile time scheduling of cron jobs
536///
537/// It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
538/// either [`Rexecutor::with_cron_executor`] or [`Rexecutor::with_cron_executor_for_timezone`]. The
539/// later is use to specify the specific timezone that the jobs should be scheduled to run in.
540///
541/// ## Example setting up a UTC cron job
542///
543/// To setup a cron jobs to run every day at midnight you can use the following code.
544///
545/// ```
546/// # use rexecutor::prelude::*;
547/// # use rexecutor::backend::{Backend, memory::InMemoryBackend};
548/// struct CronJob;
549/// #[async_trait::async_trait]
550/// impl Executor for CronJob {
551///     type Data = String;
552///     type Metadata = ();
553///     const NAME: &'static str = "cron_job";
554///     const MAX_ATTEMPTS: u16 = 1;
555///     async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
556///         /// Do something important
557///         ExecutionResult::Done
558///     }
559/// }
560/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
561/// let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
562///
563/// let backend = InMemoryBackend::new();
564/// Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
565/// # });
566/// ```
567///
568/// # Pruning jobs
569///
570/// After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
571/// To setup the job pruner [`Rexecutor::with_job_pruner`] should be called passing in the given
572/// [`PrunerConfig`].
573///
574/// Given the different ways in which jobs can finish it is often useful to be able to have fine
575/// grained control over how old jobs should be cleaned up. [`PrunerConfig`] enables such control.
576///
577/// When constructing [`PrunerConfig`] a [`cron::Schedule`] is provided to specify when the pruner
578/// should run.
579///
580/// Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
581/// from once a year through to multiple times per hour.
582///
583/// ## Example configuring the job pruner
584///
585/// ```
586/// # use rexecutor::prelude::*;
587/// # use std::str::FromStr;
588/// # use chrono::TimeDelta;
589/// # use rexecutor::backend::memory::InMemoryBackend;
590/// # pub(crate) struct RefreshWorker;
591/// # pub(crate) struct EmailScheduler;
592/// # pub(crate) struct RegistrationWorker;
593/// #
594/// # #[async_trait::async_trait]
595/// # impl Executor for RefreshWorker {
596/// #     type Data = String;
597/// #     type Metadata = String;
598/// #     const NAME: &'static str = "refresh_worker";
599/// #     const MAX_ATTEMPTS: u16 = 2;
600/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
601/// #         ExecutionResult::Done
602/// #     }
603/// # }
604/// # #[async_trait::async_trait]
605/// # impl Executor for EmailScheduler {
606/// #     type Data = String;
607/// #     type Metadata = String;
608/// #     const NAME: &'static str = "email_scheduler";
609/// #     const MAX_ATTEMPTS: u16 = 2;
610/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
611/// #         ExecutionResult::Done
612/// #     }
613/// # }
614/// # #[async_trait::async_trait]
615/// # impl Executor for RegistrationWorker {
616/// #     type Data = String;
617/// #     type Metadata = String;
618/// #     const NAME: &'static str = "registration_worker";
619/// #     const MAX_ATTEMPTS: u16 = 2;
620/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
621/// #         ExecutionResult::Done
622/// #     }
623/// # }
624/// let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
625///     .with_max_concurrency(Some(2))
626///     .with_pruner(
627///         Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
628///             .only::<RefreshWorker>()
629///             .and::<EmailScheduler>(),
630///     )
631///     .with_pruner(
632///         Pruner::max_length(200, JobStatus::Discarded)
633///             .except::<RefreshWorker>()
634///             .and::<EmailScheduler>(),
635///     );
636///
637/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
638/// let backend = InMemoryBackend::new();
639/// Rexecutor::new(backend)
640///     .with_executor::<RefreshWorker>()
641///     .with_executor::<EmailScheduler>()
642///     .with_executor::<RegistrationWorker>()
643///     .with_job_pruner(config);
644/// # });
645/// ```
646///
647/// # Shutting rexecutor down
648///
649/// To avoid jobs getting killed mid way through their executions it is important to make use of
650/// graceful shutdown. This can either explicitly be called using [`Rexecutor::graceful_shutdown`],
651/// or via use of the [`DropGuard`] obtained via [`Rexecutor::drop_guard`].
652///
653/// Using [`Rexecutor::graceful_shutdown`] or [`Rexecutor::drop_guard`] will ensure that all
654/// currently executing jobs will be allowed time to complete before shutting rexecutor down.
655///
656/// ## Example using the `DropGuard`
657///
658/// ```
659/// # use rexecutor::prelude::*;
660/// # use std::str::FromStr;
661/// # use chrono::TimeDelta;
662/// # use rexecutor::backend::memory::InMemoryBackend;
663/// # pub(crate) struct RefreshWorker;
664/// # pub(crate) struct EmailScheduler;
665/// # pub(crate) struct RegistrationWorker;
666/// #
667/// # #[async_trait::async_trait]
668/// # impl Executor for RefreshWorker {
669/// #     type Data = String;
670/// #     type Metadata = String;
671/// #     const NAME: &'static str = "refresh_worker";
672/// #     const MAX_ATTEMPTS: u16 = 2;
673/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
674/// #         ExecutionResult::Done
675/// #     }
676/// # }
677/// # #[async_trait::async_trait]
678/// # impl Executor for EmailScheduler {
679/// #     type Data = String;
680/// #     type Metadata = String;
681/// #     const NAME: &'static str = "email_scheduler";
682/// #     const MAX_ATTEMPTS: u16 = 2;
683/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
684/// #         ExecutionResult::Done
685/// #     }
686/// # }
687/// # #[async_trait::async_trait]
688/// # impl Executor for RegistrationWorker {
689/// #     type Data = String;
690/// #     type Metadata = String;
691/// #     const NAME: &'static str = "registration_worker";
692/// #     const MAX_ATTEMPTS: u16 = 2;
693/// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
694/// #         ExecutionResult::Done
695/// #     }
696/// # }
697/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
698/// let backend = InMemoryBackend::new();
699/// // Note this must be given a name to ensure it is dropped at the end of the scope.
700/// // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
701/// let _guard = Rexecutor::new(backend)
702///     .with_executor::<RefreshWorker>()
703///     .with_executor::<EmailScheduler>()
704///     .with_executor::<RegistrationWorker>()
705///     .drop_guard();
706/// # });
707/// ```
708///
709/// # Global backend
710///
711/// Rexecutor can be ran either with use of a global backend. This enables the use of the
712/// convenience [`job::builder::JobBuilder::enqueue`] method which does not require a reference to
713/// the backend to be passed down to the code that needs to enqueue a job.
714///
715/// The global backend can be set using [`Rexecutor::set_global_backend`] this should only be
716/// called once otherwise it will return an error.
717///
718/// In fact for a single [`Rexecutor`] instance it is impossible to call this twice
719///
720/// ```compile_fail
721/// # use rexecutor::prelude::*;
722/// let backend = rexecutor::backend::memory::InMemoryBackend::new();
723/// Rexecutor::new(backend).set_global_backend().set_global_backend();
724/// ```
725///
726/// Note, using a global backend has many of the same drawbacks of any global variable in
727/// particular it can make unit testing more difficult.
728#[derive(Debug)]
729#[allow(private_bounds)]
730pub struct Rexecutor<B: Backend, State: InternalRexecutorState> {
731    backend: B,
732    cancellation_token: CancellationToken,
733    _state: PhantomData<State>,
734}
735
736impl<B> Default for Rexecutor<B, GlobalUnset>
737where
738    B: Backend + Default,
739{
740    fn default() -> Self {
741        Self::new(Default::default())
742    }
743}
744
745impl<B> Rexecutor<B, GlobalUnset>
746where
747    B: Backend,
748{
749    /// Create an instance of [`Rexecutor`] using the given `backend`.
750    pub fn new(backend: B) -> Self {
751        Self {
752            cancellation_token: Default::default(),
753            backend,
754            _state: PhantomData,
755        }
756    }
757}
758
759/// To enable automatic clean up this guard will shut down rexucutor and all associated tasks when
760/// dropped.
761#[allow(dead_code)]
762pub struct DropGuard(tokio_util::sync::DropGuard);
763
764impl<B> Rexecutor<B, GlobalUnset>
765where
766    B: Backend + Send + 'static + Sync + Clone,
767{
768    /// Sets the global backend to the backend associated with the current instance of
769    /// [`Rexecutor`].
770    ///
771    /// This should only be called once. If called a second time it will return
772    /// [`RexecutorError::GlobalBackend`].
773    ///
774    /// Calling this makes is possible to enqueue jobs without maintaining a reference to the
775    /// backend throughout the codebase and enables the use of
776    /// [`job::builder::JobBuilder::enqueue`].
777    ///
778    /// Note is is not possible to call this twice for the same [`Rexecutor`] instance
779    ///
780    /// ```compile_fail
781    /// # use rexecutor::prelude::*;
782    /// let backend = rexecutor::backend::memory::InMemoryBackend::new();
783    /// Rexecutor::new(backend).set_global_backend().set_global_backend();
784    /// ```
785    pub fn set_global_backend(self) -> Result<Rexecutor<B, GlobalSet>, RexecutorError> {
786        GlobalBackend::set(self.backend.clone())?;
787
788        Ok(Rexecutor {
789            cancellation_token: self.cancellation_token,
790            backend: self.backend,
791            _state: PhantomData,
792        })
793    }
794}
795
796#[allow(private_bounds)]
797impl<B, State> Rexecutor<B, State>
798where
799    B: Backend + Send + 'static + Sync + Clone,
800    State: InternalRexecutorState,
801{
802    /// Enable the execution of jobs for the provided [`Executor`].
803    ///
804    /// If this isn't called for an executor, then it's jobs will not be ran on this instance.
805    /// This can be used to only run jobs on specific executor nodes.
806    ///
807    /// Jobs can still be enqueued to the backend without calling this method, but they will not be
808    /// executed unless this method has been called for at least one instance of the running
809    /// application.
810    pub fn with_executor<E>(self) -> Self
811    where
812        E: Executor + 'static + Sync + Send,
813        E::Data: Send + DeserializeOwned,
814        E::Metadata: Serialize + DeserializeOwned + Send,
815    {
816        JobRunner::<B, E>::new(self.backend.clone()).spawn(self.cancellation_token.clone());
817        self
818    }
819
820    /// Setup a cron job to run on the given schedule with the given data.
821    ///
822    /// Note this will run the schedule according to UTC. To schedule the job in another timezone use
823    /// [`Rexecutor::with_cron_executor_for_timezone`].
824    ///
825    /// # Example
826    ///
827    /// To setup a cron jobs to run every day at midnight you can use the following code.
828    ///
829    /// ```
830    /// # use rexecutor::prelude::*;
831    /// # use rexecutor::backend::{Backend, memory::InMemoryBackend};
832    /// struct CronJob;
833    /// #[async_trait::async_trait]
834    /// impl Executor for CronJob {
835    ///     type Data = String;
836    ///     type Metadata = ();
837    ///     const NAME: &'static str = "cron_job";
838    ///     const MAX_ATTEMPTS: u16 = 1;
839    ///     async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
840    ///         /// Do something important
841    ///         ExecutionResult::Done
842    ///     }
843    /// }
844    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
845    /// let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
846    ///
847    /// let backend = InMemoryBackend::new();
848    /// Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
849    /// # });
850    /// ```
851    pub fn with_cron_executor<E>(self, schedule: cron::Schedule, data: E::Data) -> Self
852    where
853        E: Executor + 'static + Sync + Send,
854        E::Data: Send + Sync + Serialize + DeserializeOwned + Clone + Hash,
855        E::Metadata: Serialize + DeserializeOwned + Send + Sync,
856    {
857        self.with_cron_executor_for_timezone::<E, _>(schedule, data, Utc)
858    }
859
860    /// Setup a cron job to run on the given schedule with the given data in the given timezome.
861    ///
862    /// # Example
863    ///
864    /// To setup a cron jobs to run every day at midnight you can use the following code.
865    ///
866    /// ```
867    /// # use rexecutor::prelude::*;
868    /// # use rexecutor::backend::{Backend, memory::InMemoryBackend};
869    /// struct CronJob;
870    /// #[async_trait::async_trait]
871    /// impl Executor for CronJob {
872    ///     type Data = String;
873    ///     type Metadata = ();
874    ///     const NAME: &'static str = "cron_job";
875    ///     const MAX_ATTEMPTS: u16 = 1;
876    ///     async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
877    ///         /// Do something important
878    ///         ExecutionResult::Done
879    ///     }
880    /// }
881    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
882    /// let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
883    ///
884    /// let backend = InMemoryBackend::new();
885    /// Rexecutor::new(backend).with_cron_executor_for_timezone::<CronJob, _>(
886    ///     schedule,
887    ///     "important data".to_owned(),
888    ///     chrono::Local,
889    /// );
890    /// # });
891    /// ```
892    pub fn with_cron_executor_for_timezone<E, Z>(
893        self,
894        schedule: cron::Schedule,
895        data: E::Data,
896        timezone: Z,
897    ) -> Self
898    where
899        E: Executor + 'static + Sync + Send,
900        E::Data: Send + Sync + Serialize + DeserializeOwned + Clone + Hash,
901        E::Metadata: Serialize + DeserializeOwned + Send + Sync,
902        Z: TimeZone + Send + 'static,
903    {
904        CronRunner::<B, E>::new(self.backend.clone(), schedule, data)
905            .spawn(timezone, self.cancellation_token.clone());
906
907        self.with_executor::<E>()
908    }
909
910    /// Set the job pruner config.
911    ///
912    /// After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
913    ///
914    /// Given the different ways in which jobs can finish it is often useful to be able to have fine
915    /// grained control over how old jobs should be cleaned up. [`PrunerConfig`] enables such control.
916    ///
917    /// When constructing [`PrunerConfig`] a [`cron::Schedule`] is provided to specify when the pruner
918    /// should run.
919    ///
920    /// Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
921    /// from once a year through to multiple times per hour.
922    ///
923    /// # Example
924    ///
925    /// To remove all completed jobs more than a month old for both the `RefreshWorker` and
926    /// `EmailScheduler` while only maintaining the last 200 discarded jobs for all executors expect
927    /// the `EmailScheduler` and `RefreshWorker`, you can do the following:
928    ///
929    /// ```
930    /// # use rexecutor::prelude::*;
931    /// # use std::str::FromStr;
932    /// # use chrono::TimeDelta;
933    /// # use rexecutor::backend::memory::InMemoryBackend;
934    /// # pub(crate) struct RefreshWorker;
935    /// # pub(crate) struct EmailScheduler;
936    /// # pub(crate) struct RegistrationWorker;
937    /// #
938    /// # #[async_trait::async_trait]
939    /// # impl Executor for RefreshWorker {
940    /// #     type Data = String;
941    /// #     type Metadata = String;
942    /// #     const NAME: &'static str = "refresh_worker";
943    /// #     const MAX_ATTEMPTS: u16 = 2;
944    /// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
945    /// #         ExecutionResult::Done
946    /// #     }
947    /// # }
948    /// # #[async_trait::async_trait]
949    /// # impl Executor for EmailScheduler {
950    /// #     type Data = String;
951    /// #     type Metadata = String;
952    /// #     const NAME: &'static str = "email_scheduler";
953    /// #     const MAX_ATTEMPTS: u16 = 2;
954    /// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
955    /// #         ExecutionResult::Done
956    /// #     }
957    /// # }
958    /// # #[async_trait::async_trait]
959    /// # impl Executor for RegistrationWorker {
960    /// #     type Data = String;
961    /// #     type Metadata = String;
962    /// #     const NAME: &'static str = "registration_worker";
963    /// #     const MAX_ATTEMPTS: u16 = 2;
964    /// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
965    /// #         ExecutionResult::Done
966    /// #     }
967    /// # }
968    /// let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
969    ///     .with_max_concurrency(Some(2))
970    ///     .with_pruner(
971    ///         Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
972    ///             .only::<RefreshWorker>()
973    ///             .and::<EmailScheduler>(),
974    ///     )
975    ///     .with_pruner(
976    ///         Pruner::max_length(200, JobStatus::Discarded)
977    ///             .except::<RefreshWorker>()
978    ///             .and::<EmailScheduler>(),
979    ///     );
980    ///
981    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
982    /// let backend = InMemoryBackend::new();
983    /// Rexecutor::new(backend)
984    ///     .with_executor::<RefreshWorker>()
985    ///     .with_executor::<EmailScheduler>()
986    ///     .with_executor::<RegistrationWorker>()
987    ///     .with_job_pruner(config);
988    /// # });
989    /// ```
990    pub fn with_job_pruner(self, config: PrunerConfig) -> Self {
991        PrunerRunner::new(self.backend.clone(), config).spawn(self.cancellation_token.clone());
992        self
993    }
994
995    /// Instruct rexecutor to shutdown gracefully giving time for any currently executing jobs to
996    /// complete before shutting down.
997    pub fn graceful_shutdown(self) {
998        tracing::debug!("Shutting down Rexecutor tasks");
999        self.cancellation_token.cancel();
1000    }
1001
1002    /// Returns a drop guard which will gracefully shutdown rexecutor when droped.
1003    ///
1004    /// # Example
1005    /// ```
1006    /// # use rexecutor::prelude::*;
1007    /// # use std::str::FromStr;
1008    /// # use chrono::TimeDelta;
1009    /// # use rexecutor::backend::memory::InMemoryBackend;
1010    /// # pub(crate) struct RefreshWorker;
1011    /// # pub(crate) struct EmailScheduler;
1012    /// # pub(crate) struct RegistrationWorker;
1013    /// #
1014    /// # #[async_trait::async_trait]
1015    /// # impl Executor for RefreshWorker {
1016    /// #     type Data = String;
1017    /// #     type Metadata = String;
1018    /// #     const NAME: &'static str = "refresh_worker";
1019    /// #     const MAX_ATTEMPTS: u16 = 2;
1020    /// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
1021    /// #         ExecutionResult::Done
1022    /// #     }
1023    /// # }
1024    /// # #[async_trait::async_trait]
1025    /// # impl Executor for EmailScheduler {
1026    /// #     type Data = String;
1027    /// #     type Metadata = String;
1028    /// #     const NAME: &'static str = "email_scheduler";
1029    /// #     const MAX_ATTEMPTS: u16 = 2;
1030    /// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
1031    /// #         ExecutionResult::Done
1032    /// #     }
1033    /// # }
1034    /// # #[async_trait::async_trait]
1035    /// # impl Executor for RegistrationWorker {
1036    /// #     type Data = String;
1037    /// #     type Metadata = String;
1038    /// #     const NAME: &'static str = "registration_worker";
1039    /// #     const MAX_ATTEMPTS: u16 = 2;
1040    /// #     async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
1041    /// #         ExecutionResult::Done
1042    /// #     }
1043    /// # }
1044    /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
1045    /// let backend = InMemoryBackend::new();
1046    /// // Note this must be given a name to ensure it is dropped at the end of the scope.
1047    /// // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
1048    /// let _guard = Rexecutor::new(backend)
1049    ///     .with_executor::<RefreshWorker>()
1050    ///     .with_executor::<EmailScheduler>()
1051    ///     .with_executor::<RegistrationWorker>()
1052    ///     .drop_guard();
1053    /// # });
1054    #[must_use]
1055    pub fn drop_guard(self) -> DropGuard {
1056        DropGuard(self.cancellation_token.drop_guard())
1057    }
1058}
1059
1060/// Errors that can occur when working with rexecutor.
1061// TODO: split errors
1062#[derive(Debug, Error)]
1063pub enum RexecutorError {
1064    /// Errors that result from the specific backend chosen.
1065    #[error("Error communicating with the backend")]
1066    BackendError(#[from] BackendError),
1067
1068    /// This error results from trying to enqueue a job to the global backend when it is not set or
1069    /// when trying to set the global backend multiple times.
1070    #[error("Error setting global backend")]
1071    GlobalBackend,
1072
1073    /// Error encoding or decoding data from json.
1074    #[error("Error encoding or decoding value")]
1075    EncodeError(#[from] serde_json::Error),
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080    use std::{sync::Arc, time::Duration};
1081
1082    use super::*;
1083    use crate::{
1084        backend::{Job, MockBackend},
1085        executor::test::{MockError, MockExecutionResult, MockReturnExecutor},
1086        job::JobStatus,
1087        pruner::Pruner,
1088    };
1089
1090    #[tokio::test]
1091    async fn run_job_error_in_stream() {
1092        let mut backend = MockBackend::default();
1093        let sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1094        let backend = Arc::new(backend);
1095
1096        let _guard = Rexecutor::<_, _>::new(backend.clone())
1097            .with_executor::<MockReturnExecutor>()
1098            .drop_guard();
1099
1100        tokio::task::yield_now().await;
1101
1102        sender.send(Err(BackendError::BadState)).unwrap();
1103
1104        tokio::task::yield_now().await;
1105    }
1106
1107    #[tokio::test]
1108    async fn run_job_success() {
1109        let mut backend = MockBackend::default();
1110        backend.expect_mark_job_complete().returning(|_| Ok(()));
1111
1112        let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Done);
1113
1114        run_job(backend, job).await;
1115    }
1116
1117    #[tokio::test]
1118    async fn run_job_success_error_marking_success() {
1119        let mut backend = MockBackend::default();
1120        backend
1121            .expect_mark_job_complete()
1122            .returning(|_| Err(BackendError::BadState));
1123
1124        let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Done);
1125
1126        run_job(backend, job).await;
1127    }
1128
1129    #[tokio::test]
1130    async fn run_job_retryable() {
1131        let mut backend = MockBackend::default();
1132        backend
1133            .expect_mark_job_retryable()
1134            .returning(|_, _, _| Ok(()));
1135
1136        let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Error {
1137            error: MockError("oh no".to_owned()),
1138        });
1139
1140        run_job(backend, job).await;
1141    }
1142
1143    #[tokio::test]
1144    async fn run_job_retryable_error_marking_retryable() {
1145        let mut backend = MockBackend::default();
1146        backend
1147            .expect_mark_job_retryable()
1148            .returning(|_, _, _| Err(BackendError::BadState));
1149
1150        let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Error {
1151            error: MockError("oh no".to_owned()),
1152        });
1153
1154        run_job(backend, job).await;
1155    }
1156
1157    #[tokio::test]
1158    async fn run_job_retryable_timeout() {
1159        let mut backend = MockBackend::default();
1160        backend
1161            .expect_mark_job_retryable()
1162            .returning(|_, _, _| Ok(()));
1163
1164        let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Timeout);
1165
1166        run_job(backend, job).await;
1167    }
1168
1169    #[tokio::test]
1170    async fn run_job_discarded() {
1171        let mut backend = MockBackend::default();
1172        backend.expect_mark_job_discarded().returning(|_, _| Ok(()));
1173
1174        let job = Job::mock_job::<MockReturnExecutor>()
1175            .with_data(MockExecutionResult::Error {
1176                error: MockError("oh no".to_owned()),
1177            })
1178            .with_max_attempts(1)
1179            .with_attempt(1);
1180
1181        run_job(backend, job).await;
1182    }
1183
1184    #[tokio::test]
1185    async fn run_job_discarded_error_marking_job_discarded() {
1186        let mut backend = MockBackend::default();
1187        backend
1188            .expect_mark_job_discarded()
1189            .returning(|_, _| Err(BackendError::BadState));
1190
1191        let job = Job::mock_job::<MockReturnExecutor>()
1192            .with_data(MockExecutionResult::Error {
1193                error: MockError("oh no".to_owned()),
1194            })
1195            .with_max_attempts(1)
1196            .with_attempt(1);
1197
1198        run_job(backend, job).await;
1199    }
1200
1201    #[tokio::test]
1202    async fn run_job_discarded_panic() {
1203        let mut backend = MockBackend::default();
1204        backend.expect_mark_job_discarded().returning(|_, _| Ok(()));
1205
1206        let job = Job::mock_job::<MockReturnExecutor>()
1207            .with_data(MockExecutionResult::Panic)
1208            .with_max_attempts(1)
1209            .with_attempt(1);
1210
1211        run_job(backend, job).await;
1212    }
1213
1214    #[tokio::test]
1215    async fn run_job_discarded_panic_error_marking_job_discarded() {
1216        let mut backend = MockBackend::default();
1217        backend
1218            .expect_mark_job_discarded()
1219            .returning(|_, _| Err(BackendError::BadState));
1220
1221        let job = Job::mock_job::<MockReturnExecutor>()
1222            .with_data(MockExecutionResult::Panic)
1223            .with_max_attempts(1)
1224            .with_attempt(1);
1225
1226        run_job(backend, job).await;
1227    }
1228
1229    #[tokio::test]
1230    async fn run_job_snoozed() {
1231        let mut backend = MockBackend::default();
1232        backend.expect_mark_job_snoozed().returning(|_, _| Ok(()));
1233
1234        let job = Job::mock_job::<MockReturnExecutor>()
1235            .with_data(MockExecutionResult::Snooze {
1236                delay: Duration::from_secs(10),
1237            })
1238            .with_max_attempts(1)
1239            .with_attempt(1);
1240
1241        run_job(backend, job).await;
1242    }
1243
1244    #[tokio::test]
1245    async fn run_job_snoozed_error_marking_job_snoozed() {
1246        let mut backend = MockBackend::default();
1247        backend
1248            .expect_mark_job_snoozed()
1249            .returning(|_, _| Err(BackendError::BadState));
1250
1251        let job = Job::mock_job::<MockReturnExecutor>()
1252            .with_data(MockExecutionResult::Snooze {
1253                delay: Duration::from_secs(10),
1254            })
1255            .with_max_attempts(1)
1256            .with_attempt(1);
1257
1258        run_job(backend, job).await;
1259    }
1260
1261    #[tokio::test]
1262    async fn run_job_cancelled() {
1263        let mut backend = MockBackend::default();
1264        backend.expect_mark_job_cancelled().returning(|_, _| Ok(()));
1265
1266        let job = Job::mock_job::<MockReturnExecutor>()
1267            .with_data(MockExecutionResult::Cancelled {
1268                reason: "No need anymore".to_owned(),
1269            })
1270            .with_max_attempts(1)
1271            .with_attempt(1);
1272
1273        run_job(backend, job).await;
1274    }
1275
1276    #[tokio::test]
1277    async fn run_job_cancelled_error_marking_job_cancelled() {
1278        let mut backend = MockBackend::default();
1279        backend
1280            .expect_mark_job_cancelled()
1281            .returning(|_, _| Err(BackendError::BadState));
1282
1283        let job = Job::mock_job::<MockReturnExecutor>()
1284            .with_data(MockExecutionResult::Cancelled {
1285                reason: "No need anymore".to_owned(),
1286            })
1287            .with_max_attempts(1)
1288            .with_attempt(1);
1289
1290        run_job(backend, job).await;
1291    }
1292
1293    #[tokio::test]
1294    async fn cron_job() {
1295        let every_second = cron::Schedule::try_from("* * * * * *").unwrap();
1296        let mut backend = MockBackend::default();
1297        let _sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1298        backend.expect_enqueue().returning(|_| Ok(0.into()));
1299        let backend = Arc::new(backend);
1300
1301        let _guard = Rexecutor::new(backend.clone())
1302            .with_cron_executor::<MockReturnExecutor>(every_second, MockExecutionResult::Done)
1303            .drop_guard();
1304
1305        tokio::task::yield_now().await;
1306        tokio::time::sleep(Duration::from_secs(1)).await;
1307    }
1308
1309    #[tokio::test]
1310    async fn cron_job_error() {
1311        let every_second = cron::Schedule::try_from("* * * * * *").unwrap();
1312        let mut backend = MockBackend::default();
1313        let _sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1314        backend
1315            .expect_enqueue()
1316            .returning(|_| Err(BackendError::BadState));
1317        let backend = Arc::new(backend);
1318
1319        let _guard = Rexecutor::new(backend.clone())
1320            .with_cron_executor::<MockReturnExecutor>(every_second, MockExecutionResult::Done)
1321            .drop_guard();
1322
1323        tokio::task::yield_now().await;
1324        tokio::time::sleep(Duration::from_secs(1)).await;
1325    }
1326
1327    #[tokio::test]
1328    async fn with_pruner() {
1329        let schedule = cron::Schedule::try_from("* * * * * *").unwrap();
1330        let mut backend = MockBackend::default();
1331        backend.expect_prune_jobs().returning(|_| Ok(()));
1332        let backend = Arc::new(backend);
1333
1334        let pruner = PrunerConfig::new(schedule)
1335            .with_pruner(Pruner::max_length(5, JobStatus::Complete).only::<MockReturnExecutor>());
1336
1337        let _guard = Rexecutor::new(backend.clone())
1338            .with_job_pruner(pruner)
1339            .drop_guard();
1340
1341        tokio::task::yield_now().await;
1342        tokio::time::sleep(Duration::from_secs(1)).await;
1343    }
1344
1345    #[tokio::test]
1346    async fn with_pruner_error() {
1347        let schedule = cron::Schedule::try_from("* * * * * *").unwrap();
1348        let mut backend = MockBackend::default();
1349        backend
1350            .expect_prune_jobs()
1351            .returning(|_| Err(BackendError::BadState));
1352        let backend = Arc::new(backend);
1353
1354        let pruner = PrunerConfig::new(schedule)
1355            .with_pruner(Pruner::max_length(5, JobStatus::Complete).only::<MockReturnExecutor>());
1356
1357        let _guard = Rexecutor::new(backend.clone())
1358            .with_job_pruner(pruner)
1359            .drop_guard();
1360
1361        tokio::task::yield_now().await;
1362        tokio::time::sleep(Duration::from_secs(1)).await;
1363    }
1364
1365    async fn run_job(mut backend: MockBackend, job: Job) {
1366        let sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1367        let backend = Arc::new(backend);
1368        let _guard = Rexecutor::new(backend.clone())
1369            .with_executor::<MockReturnExecutor>()
1370            .drop_guard();
1371
1372        tokio::task::yield_now().await;
1373        sender.send(Ok(job)).unwrap();
1374        tokio::task::yield_now().await;
1375    }
1376}