rexecutor/lib.rs
1//! A robust job processing library for rust.
2//!
3//! # Setting up `Rexecutor`
4//!
5//! To create an instance of [`Rexecutor`] you will need to have an implementation of [`Backend`].
6//! The only one provided in this crate is [`backend::memory::InMemoryBackend`] which is primarily
7//! provided for testing purposes. Instead a backend should be used from one of the implementations
8//! provided by other crates.
9//!
10//! # Creating executors
11//!
12//! Jobs are defined by creating a struct/enum and implementing `Executor` for it.
13//!
14//! ## Example defining an executor
15//!
16//! You can define and enqueue a job as follows:
17//!
18//! ```
19//! use rexecutor::prelude::*;
20//! use chrono::{Utc, TimeDelta};
21//! use rexecutor::backend::memory::InMemoryBackend;
22//! use rexecutor::assert_enqueued;
23//! let backend = InMemoryBackend::new().paused();
24//! Rexecutor::new(backend).set_global_backend().unwrap();
25//! struct EmailJob;
26//!
27//! #[async_trait::async_trait]
28//! impl Executor for EmailJob {
29//! type Data = String;
30//! type Metadata = String;
31//! const NAME: &'static str = "email_job";
32//! const MAX_ATTEMPTS: u16 = 2;
33//! async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
34//! println!("{} running, with args: {}", Self::NAME, job.data);
35//! /// Do something important with an email
36//! ExecutionResult::Done
37//! }
38//! }
39//!
40//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
41//! let _ = EmailJob::builder()
42//! .with_data("bob.shuruncle@example.com".to_owned())
43//! .schedule_in(TimeDelta::hours(3))
44//! .enqueue()
45//! .await;
46//!
47//! assert_enqueued!(
48//! with_data: "bob.shuruncle@example.com".to_owned(),
49//! scheduled_after: Utc::now() + TimeDelta::minutes(170),
50//! scheduled_before: Utc::now() + TimeDelta::minutes(190),
51//! for_executor: EmailJob
52//! );
53//! # });
54//! ```
55//!
56//! ## Unique jobs
57//!
58//! It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as
59//! part of the implementation of `Executor` via [`Executor::UNIQUENESS_CRITERIA`] or when
60//! inserting the job via [`job::builder::JobBuilder::unique`].
61//!
62//! For example to ensure that only one unique job is ran every five minutes it is possible to use
63//! the following uniqueness criteria.
64//!
65//! ```
66//! # use rexecutor::prelude::*;
67//! # use chrono::{Utc, TimeDelta};
68//! # use rexecutor::backend::memory::InMemoryBackend;
69//! # use rexecutor::assert_enqueued;
70//! # let backend = InMemoryBackend::new().paused();
71//! # Rexecutor::new(backend).set_global_backend().unwrap();
72//! struct UniqueJob;
73//!
74//! #[async_trait::async_trait]
75//! impl Executor for UniqueJob {
76//! type Data = ();
77//! type Metadata = ();
78//! const NAME: &'static str = "unique_job";
79//! const MAX_ATTEMPTS: u16 = 1;
80//! const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
81//! UniquenessCriteria::by_executor()
82//! .and_within(TimeDelta::seconds(300))
83//! .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
84//! );
85//! async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
86//! println!("{} running, with args: {:?}", Self::NAME, job.data);
87//! // Do something important
88//! ExecutionResult::Done
89//! }
90//! }
91//!
92//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
93//! let _ = UniqueJob::builder().enqueue().await;
94//! let _ = UniqueJob::builder().enqueue().await;
95//!
96//! // Only one of jobs was enqueued
97//! assert_enqueued!(
98//! 1 job,
99//! scheduled_before: Utc::now(),
100//! for_executor: UniqueJob
101//! );
102//! # });
103//! ```
104//!
105//! Additionally it is possible to specify what action should be taken when there is a conflicting
106//! job. In the example above the priority is override. For more details of how to use uniqueness
107//! see [`job::uniqueness_criteria::UniquenessCriteria`].
108//!
109//!
110//! # Overriding [`Executor`] default values
111//!
112//! When defining an [`Executor`] you specify the maximum number of attempts via
113//! [`Executor::MAX_ATTEMPTS`]. However, when inserting a job it is possible to override this value
114//! by calling [`job::builder::JobBuilder::with_max_attempts`] (if not called the max attempts will be equal to
115//! [`Executor::MAX_ATTEMPTS`]).
116//!
117//! Similarly, the executor can define a job uniqueness criteria via
118//! [`Executor::UNIQUENESS_CRITERIA`]. However, using [`job::builder::JobBuilder::unique`] it is possible to
119//! override this value for a specific job.
120//!
121//! # Setting up executors to run
122//!
123//! For each executor you would like to run [`Rexecutor::with_executor`] should be called. Being
124//! explicit about this opens the possibility of having specific nodes in a cluster running as
125//! worker nodes for certain enqueued jobs while other node not responsible for their execution.
126//!
127//! ## Example setting up executors
128//!
129//! ```
130//! # use rexecutor::prelude::*;
131//! # use std::str::FromStr;
132//! # use chrono::TimeDelta;
133//! # use rexecutor::backend::memory::InMemoryBackend;
134//! # pub(crate) struct RefreshWorker;
135//! # pub(crate) struct EmailScheduler;
136//! # pub(crate) struct RegistrationWorker;
137//! #
138//! # #[async_trait::async_trait]
139//! # impl Executor for RefreshWorker {
140//! # type Data = String;
141//! # type Metadata = String;
142//! # const NAME: &'static str = "refresh_worker";
143//! # const MAX_ATTEMPTS: u16 = 2;
144//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
145//! # ExecutionResult::Done
146//! # }
147//! # }
148//! # #[async_trait::async_trait]
149//! # impl Executor for EmailScheduler {
150//! # type Data = String;
151//! # type Metadata = String;
152//! # const NAME: &'static str = "email_scheduler";
153//! # const MAX_ATTEMPTS: u16 = 2;
154//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
155//! # ExecutionResult::Done
156//! # }
157//! # }
158//! # #[async_trait::async_trait]
159//! # impl Executor for RegistrationWorker {
160//! # type Data = String;
161//! # type Metadata = String;
162//! # const NAME: &'static str = "registration_worker";
163//! # const MAX_ATTEMPTS: u16 = 2;
164//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
165//! # ExecutionResult::Done
166//! # }
167//! # }
168//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
169//! let backend = InMemoryBackend::new();
170//! Rexecutor::new(backend)
171//! .with_executor::<RefreshWorker>()
172//! .with_executor::<EmailScheduler>()
173//! .with_executor::<RegistrationWorker>();
174//! # });
175//! ```
176//!
177//! # Enqueuing jobs
178//!
179//! Generally jobs will be enqueued using the [`job::builder::JobBuilder`] returned by [`Executor::builder`].
180//!
181//! When enqueuing jobs the data and metadata of the job can be specified. Additionally, the
182//! default value of the [`Executor`] can be overriden.
183//!
184//! ## Overriding [`Executor`] default values
185//!
186//! When defining an [`Executor`] you specify the maximum number of attempts via
187//! [`Executor::MAX_ATTEMPTS`]. However, when inserting a job it is possible to override this value
188//! by calling [`job::builder::JobBuilder::with_max_attempts`] (if not called the max attempts will be equal to
189//! [`Executor::MAX_ATTEMPTS`]).
190//!
191//! Similarly, the executor can define a job uniqueness criteria via
192//! [`Executor::UNIQUENESS_CRITERIA`]. However, using [`job::builder::JobBuilder::unique`] it is possible to
193//! override this value for a specific job.
194//!
195//! ## Example enqueuing a job
196//!
197//! ```
198//! # use rexecutor::prelude::*;
199//! # use std::sync::Arc;
200//! # use chrono::{Utc, TimeDelta};
201//! # use rexecutor::backend::memory::InMemoryBackend;
202//! # use rexecutor::assert_enqueued;
203//! # pub(crate) struct ExampleExecutor;
204//! #
205//! # #[async_trait::async_trait]
206//! # impl Executor for ExampleExecutor {
207//! # type Data = String;
208//! # type Metadata = String;
209//! # const NAME: &'static str = "simple_executor";
210//! # const MAX_ATTEMPTS: u16 = 2;
211//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
212//! # ExecutionResult::Done
213//! # }
214//! # }
215//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
216//! let backend = Arc::new(InMemoryBackend::new().paused());
217//! Rexecutor::new(backend.clone()).set_global_backend().unwrap();
218//!
219//! ExampleExecutor::builder()
220//! .with_max_attempts(2)
221//! .with_tags(vec!["initial_job", "delayed"])
222//! .with_data("First job".into())
223//! .schedule_in(TimeDelta::hours(2))
224//! .enqueue_to_backend(&backend)
225//! .await
226//! .unwrap();
227//!
228//! assert_enqueued!(
229//! to: backend,
230//! with_data: "First job".to_owned(),
231//! tagged_with: ["initial_job", "delayed"],
232//! scheduled_after: Utc::now() + TimeDelta::minutes(110),
233//! scheduled_before: Utc::now() + TimeDelta::minutes(130),
234//! for_executor: ExampleExecutor
235//! );
236//! # });
237//! ```
238//!
239//! # Compile time scheduling of cron jobs
240//!
241//! It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
242//! either [`Rexecutor::with_cron_executor`] or [`Rexecutor::with_cron_executor_for_timezone`]. The
243//! later is use to specify the specific timezone that the jobs should be scheduled to run in.
244//!
245//! ## Example setting up a UTC cron job
246//!
247//! To setup a cron jobs to run every day at midnight you can use the following code.
248//!
249//! ```
250//! # use rexecutor::prelude::*;
251//! # use rexecutor::backend::{Backend, memory::InMemoryBackend};
252//! struct CronJob;
253//! #[async_trait::async_trait]
254//! impl Executor for CronJob {
255//! type Data = String;
256//! type Metadata = ();
257//! const NAME: &'static str = "cron_job";
258//! const MAX_ATTEMPTS: u16 = 1;
259//! async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
260//! /// Do something important
261//! ExecutionResult::Done
262//! }
263//! }
264//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
265//! let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
266//!
267//! let backend = InMemoryBackend::new();
268//! Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
269//! # });
270//! ```
271//!
272//! # Pruning jobs
273//!
274//! After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
275//! To setup the job pruner [`Rexecutor::with_job_pruner`] should be called passing in the given
276//! [`PrunerConfig`].
277//!
278//! Given the different ways in which jobs can finish it is often useful to be able to have fine
279//! grained control over how old jobs should be cleaned up. [`PrunerConfig`] enables such control.
280//!
281//! When constructing [`PrunerConfig`] a [`cron::Schedule`] is provided to specify when the pruner
282//! should run.
283//!
284//! Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
285//! from once a year through to multiple times per hour.
286//!
287//! ## Example configuring the job pruner
288//!
289//! ```
290//! # use rexecutor::prelude::*;
291//! # use std::str::FromStr;
292//! # use chrono::TimeDelta;
293//! # use rexecutor::backend::memory::InMemoryBackend;
294//! # pub(crate) struct RefreshWorker;
295//! # pub(crate) struct EmailScheduler;
296//! # pub(crate) struct RegistrationWorker;
297//! #
298//! # #[async_trait::async_trait]
299//! # impl Executor for RefreshWorker {
300//! # type Data = String;
301//! # type Metadata = String;
302//! # const NAME: &'static str = "refresh_worker";
303//! # const MAX_ATTEMPTS: u16 = 2;
304//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
305//! # ExecutionResult::Done
306//! # }
307//! # }
308//! # #[async_trait::async_trait]
309//! # impl Executor for EmailScheduler {
310//! # type Data = String;
311//! # type Metadata = String;
312//! # const NAME: &'static str = "email_scheduler";
313//! # const MAX_ATTEMPTS: u16 = 2;
314//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
315//! # ExecutionResult::Done
316//! # }
317//! # }
318//! # #[async_trait::async_trait]
319//! # impl Executor for RegistrationWorker {
320//! # type Data = String;
321//! # type Metadata = String;
322//! # const NAME: &'static str = "registration_worker";
323//! # const MAX_ATTEMPTS: u16 = 2;
324//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
325//! # ExecutionResult::Done
326//! # }
327//! # }
328//! let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
329//! .with_max_concurrency(Some(2))
330//! .with_pruner(
331//! Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
332//! .only::<RefreshWorker>()
333//! .and::<EmailScheduler>(),
334//! )
335//! .with_pruner(
336//! Pruner::max_length(200, JobStatus::Discarded)
337//! .except::<RefreshWorker>()
338//! .and::<EmailScheduler>(),
339//! );
340//!
341//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
342//! let backend = InMemoryBackend::new();
343//! Rexecutor::new(backend)
344//! .with_executor::<RefreshWorker>()
345//! .with_executor::<EmailScheduler>()
346//! .with_executor::<RegistrationWorker>()
347//! .with_job_pruner(config);
348//! # });
349//! ```
350//!
351//! # Shutting rexecutor down
352//!
353//! To avoid jobs getting killed mid way through their executions it is important to make use of
354//! graceful shutdown. This can either explicitly be called using [`Rexecutor::graceful_shutdown`],
355//! or via use of the [`DropGuard`] obtained via [`Rexecutor::drop_guard`].
356//!
357//! Using [`Rexecutor::graceful_shutdown`] or [`Rexecutor::drop_guard`] will ensure that all
358//! currently executing jobs will be allowed time to complete before shutting rexecutor down.
359//!
360//! ## Example using the `DropGuard`
361//!
362//! ```
363//! # use rexecutor::prelude::*;
364//! # use std::str::FromStr;
365//! # use chrono::TimeDelta;
366//! # use rexecutor::backend::memory::InMemoryBackend;
367//! # pub(crate) struct RefreshWorker;
368//! # pub(crate) struct EmailScheduler;
369//! # pub(crate) struct RegistrationWorker;
370//! #
371//! # #[async_trait::async_trait]
372//! # impl Executor for RefreshWorker {
373//! # type Data = String;
374//! # type Metadata = String;
375//! # const NAME: &'static str = "refresh_worker";
376//! # const MAX_ATTEMPTS: u16 = 2;
377//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
378//! # ExecutionResult::Done
379//! # }
380//! # }
381//! # #[async_trait::async_trait]
382//! # impl Executor for EmailScheduler {
383//! # type Data = String;
384//! # type Metadata = String;
385//! # const NAME: &'static str = "email_scheduler";
386//! # const MAX_ATTEMPTS: u16 = 2;
387//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
388//! # ExecutionResult::Done
389//! # }
390//! # }
391//! # #[async_trait::async_trait]
392//! # impl Executor for RegistrationWorker {
393//! # type Data = String;
394//! # type Metadata = String;
395//! # const NAME: &'static str = "registration_worker";
396//! # const MAX_ATTEMPTS: u16 = 2;
397//! # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
398//! # ExecutionResult::Done
399//! # }
400//! # }
401//! # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
402//! let backend = InMemoryBackend::new();
403//! // Note this must be given a name to ensure it is dropped at the end of the scope.
404//! // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
405//! let _guard = Rexecutor::new(backend)
406//! .with_executor::<RefreshWorker>()
407//! .with_executor::<EmailScheduler>()
408//! .with_executor::<RegistrationWorker>()
409//! .drop_guard();
410//! # });
411//! ```
412//!
413//! # Global backend
414//!
415//! Rexecutor can be ran either with use of a global backend. This enables the use of the
416//! convenience [`job::builder::JobBuilder::enqueue`] method which does not require a reference to
417//! the backend to be passed down to the code that needs to enqueue a job.
418//!
419//! The global backend can be set using [`Rexecutor::set_global_backend`] this should only be
420//! called once otherwise it will return an error.
421//!
422//! In fact for a single [`Rexecutor`] instance it is impossible to call this twice
423//!
424//! ```compile_fail
425//! # use rexecutor::prelude::*;
426//! let backend = rexecutor::backend::memory::InMemoryBackend::new();
427//! Rexecutor::new(backend).set_global_backend().set_global_backend();
428//! ```
429//!
430//! Note, using a global backend has many of the same drawbacks of any global variable in
431//! particular it can make unit testing more difficult.
432#![deny(missing_docs)]
433use std::{hash::Hash, marker::PhantomData};
434
435pub mod backend;
436pub mod backoff;
437mod cron_runner;
438pub mod executor;
439#[doc(hidden)]
440pub mod global_backend;
441pub mod job;
442pub mod prelude;
443pub mod pruner;
444pub mod testing;
445
446use backend::{Backend, BackendError};
447use chrono::{TimeZone, Utc};
448use cron_runner::CronRunner;
449use executor::Executor;
450use global_backend::GlobalBackend;
451use job::runner::JobRunner;
452use pruner::{PrunerConfig, runner::PrunerRunner};
453use serde::{Serialize, de::DeserializeOwned};
454use thiserror::Error;
455use tokio_util::sync::CancellationToken;
456
457trait InternalRexecutorState {}
458
459/// The state of [`Rexecutor`] after the global backend has been set, this makes it possible to
460/// avoid calling [`Rexecutor::set_global_backend`] multiple times from the same instance.
461#[doc(hidden)]
462pub struct GlobalSet;
463
464/// The state of [`Rexecutor`] before the global backend has been set. Using different states,
465/// makes it possible to avoid calling [`Rexecutor::set_global_backend`] multiple times from the
466/// same instance.
467#[doc(hidden)]
468pub struct GlobalUnset;
469impl InternalRexecutorState for GlobalUnset {}
470impl InternalRexecutorState for GlobalSet {}
471
472/// The entry point for setting up rexecutor.
473///
474/// To create an instance of [`Rexecutor`] you will need to have an implementation of [`Backend`].
475/// The only one provided in this crate is [`backend::memory::InMemoryBackend`] which is primarily
476/// provided for testing purposes. Instead a backend should be used from one of the implementations
477/// provided by other crates.
478///
479/// # Setting up executors
480///
481/// For each executor you would like to run [`Rexecutor::with_executor`] should be called. Being
482/// explicit about this opens the possibility of having specific nodes in a cluster running as
483/// worker nodes for certain enqueued jobs while other node not responsible for their execution.
484///
485/// ## Example setting up executors
486///
487/// ```
488/// # use rexecutor::prelude::*;
489/// # use std::str::FromStr;
490/// # use chrono::TimeDelta;
491/// # use rexecutor::backend::memory::InMemoryBackend;
492/// # pub(crate) struct RefreshWorker;
493/// # pub(crate) struct EmailScheduler;
494/// # pub(crate) struct RegistrationWorker;
495/// #
496/// # #[async_trait::async_trait]
497/// # impl Executor for RefreshWorker {
498/// # type Data = String;
499/// # type Metadata = String;
500/// # const NAME: &'static str = "refresh_worker";
501/// # const MAX_ATTEMPTS: u16 = 2;
502/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
503/// # ExecutionResult::Done
504/// # }
505/// # }
506/// # #[async_trait::async_trait]
507/// # impl Executor for EmailScheduler {
508/// # type Data = String;
509/// # type Metadata = String;
510/// # const NAME: &'static str = "email_scheduler";
511/// # const MAX_ATTEMPTS: u16 = 2;
512/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
513/// # ExecutionResult::Done
514/// # }
515/// # }
516/// # #[async_trait::async_trait]
517/// # impl Executor for RegistrationWorker {
518/// # type Data = String;
519/// # type Metadata = String;
520/// # const NAME: &'static str = "registration_worker";
521/// # const MAX_ATTEMPTS: u16 = 2;
522/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
523/// # ExecutionResult::Done
524/// # }
525/// # }
526/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
527/// let backend = InMemoryBackend::new();
528/// Rexecutor::new(backend)
529/// .with_executor::<RefreshWorker>()
530/// .with_executor::<EmailScheduler>()
531/// .with_executor::<RegistrationWorker>();
532/// # });
533/// ```
534///
535/// # Compile time scheduling of cron jobs
536///
537/// It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
538/// either [`Rexecutor::with_cron_executor`] or [`Rexecutor::with_cron_executor_for_timezone`]. The
539/// later is use to specify the specific timezone that the jobs should be scheduled to run in.
540///
541/// ## Example setting up a UTC cron job
542///
543/// To setup a cron jobs to run every day at midnight you can use the following code.
544///
545/// ```
546/// # use rexecutor::prelude::*;
547/// # use rexecutor::backend::{Backend, memory::InMemoryBackend};
548/// struct CronJob;
549/// #[async_trait::async_trait]
550/// impl Executor for CronJob {
551/// type Data = String;
552/// type Metadata = ();
553/// const NAME: &'static str = "cron_job";
554/// const MAX_ATTEMPTS: u16 = 1;
555/// async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
556/// /// Do something important
557/// ExecutionResult::Done
558/// }
559/// }
560/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
561/// let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
562///
563/// let backend = InMemoryBackend::new();
564/// Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
565/// # });
566/// ```
567///
568/// # Pruning jobs
569///
570/// After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
571/// To setup the job pruner [`Rexecutor::with_job_pruner`] should be called passing in the given
572/// [`PrunerConfig`].
573///
574/// Given the different ways in which jobs can finish it is often useful to be able to have fine
575/// grained control over how old jobs should be cleaned up. [`PrunerConfig`] enables such control.
576///
577/// When constructing [`PrunerConfig`] a [`cron::Schedule`] is provided to specify when the pruner
578/// should run.
579///
580/// Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
581/// from once a year through to multiple times per hour.
582///
583/// ## Example configuring the job pruner
584///
585/// ```
586/// # use rexecutor::prelude::*;
587/// # use std::str::FromStr;
588/// # use chrono::TimeDelta;
589/// # use rexecutor::backend::memory::InMemoryBackend;
590/// # pub(crate) struct RefreshWorker;
591/// # pub(crate) struct EmailScheduler;
592/// # pub(crate) struct RegistrationWorker;
593/// #
594/// # #[async_trait::async_trait]
595/// # impl Executor for RefreshWorker {
596/// # type Data = String;
597/// # type Metadata = String;
598/// # const NAME: &'static str = "refresh_worker";
599/// # const MAX_ATTEMPTS: u16 = 2;
600/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
601/// # ExecutionResult::Done
602/// # }
603/// # }
604/// # #[async_trait::async_trait]
605/// # impl Executor for EmailScheduler {
606/// # type Data = String;
607/// # type Metadata = String;
608/// # const NAME: &'static str = "email_scheduler";
609/// # const MAX_ATTEMPTS: u16 = 2;
610/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
611/// # ExecutionResult::Done
612/// # }
613/// # }
614/// # #[async_trait::async_trait]
615/// # impl Executor for RegistrationWorker {
616/// # type Data = String;
617/// # type Metadata = String;
618/// # const NAME: &'static str = "registration_worker";
619/// # const MAX_ATTEMPTS: u16 = 2;
620/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
621/// # ExecutionResult::Done
622/// # }
623/// # }
624/// let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
625/// .with_max_concurrency(Some(2))
626/// .with_pruner(
627/// Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
628/// .only::<RefreshWorker>()
629/// .and::<EmailScheduler>(),
630/// )
631/// .with_pruner(
632/// Pruner::max_length(200, JobStatus::Discarded)
633/// .except::<RefreshWorker>()
634/// .and::<EmailScheduler>(),
635/// );
636///
637/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
638/// let backend = InMemoryBackend::new();
639/// Rexecutor::new(backend)
640/// .with_executor::<RefreshWorker>()
641/// .with_executor::<EmailScheduler>()
642/// .with_executor::<RegistrationWorker>()
643/// .with_job_pruner(config);
644/// # });
645/// ```
646///
647/// # Shutting rexecutor down
648///
649/// To avoid jobs getting killed mid way through their executions it is important to make use of
650/// graceful shutdown. This can either explicitly be called using [`Rexecutor::graceful_shutdown`],
651/// or via use of the [`DropGuard`] obtained via [`Rexecutor::drop_guard`].
652///
653/// Using [`Rexecutor::graceful_shutdown`] or [`Rexecutor::drop_guard`] will ensure that all
654/// currently executing jobs will be allowed time to complete before shutting rexecutor down.
655///
656/// ## Example using the `DropGuard`
657///
658/// ```
659/// # use rexecutor::prelude::*;
660/// # use std::str::FromStr;
661/// # use chrono::TimeDelta;
662/// # use rexecutor::backend::memory::InMemoryBackend;
663/// # pub(crate) struct RefreshWorker;
664/// # pub(crate) struct EmailScheduler;
665/// # pub(crate) struct RegistrationWorker;
666/// #
667/// # #[async_trait::async_trait]
668/// # impl Executor for RefreshWorker {
669/// # type Data = String;
670/// # type Metadata = String;
671/// # const NAME: &'static str = "refresh_worker";
672/// # const MAX_ATTEMPTS: u16 = 2;
673/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
674/// # ExecutionResult::Done
675/// # }
676/// # }
677/// # #[async_trait::async_trait]
678/// # impl Executor for EmailScheduler {
679/// # type Data = String;
680/// # type Metadata = String;
681/// # const NAME: &'static str = "email_scheduler";
682/// # const MAX_ATTEMPTS: u16 = 2;
683/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
684/// # ExecutionResult::Done
685/// # }
686/// # }
687/// # #[async_trait::async_trait]
688/// # impl Executor for RegistrationWorker {
689/// # type Data = String;
690/// # type Metadata = String;
691/// # const NAME: &'static str = "registration_worker";
692/// # const MAX_ATTEMPTS: u16 = 2;
693/// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
694/// # ExecutionResult::Done
695/// # }
696/// # }
697/// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
698/// let backend = InMemoryBackend::new();
699/// // Note this must be given a name to ensure it is dropped at the end of the scope.
700/// // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
701/// let _guard = Rexecutor::new(backend)
702/// .with_executor::<RefreshWorker>()
703/// .with_executor::<EmailScheduler>()
704/// .with_executor::<RegistrationWorker>()
705/// .drop_guard();
706/// # });
707/// ```
708///
709/// # Global backend
710///
711/// Rexecutor can be ran either with use of a global backend. This enables the use of the
712/// convenience [`job::builder::JobBuilder::enqueue`] method which does not require a reference to
713/// the backend to be passed down to the code that needs to enqueue a job.
714///
715/// The global backend can be set using [`Rexecutor::set_global_backend`] this should only be
716/// called once otherwise it will return an error.
717///
718/// In fact for a single [`Rexecutor`] instance it is impossible to call this twice
719///
720/// ```compile_fail
721/// # use rexecutor::prelude::*;
722/// let backend = rexecutor::backend::memory::InMemoryBackend::new();
723/// Rexecutor::new(backend).set_global_backend().set_global_backend();
724/// ```
725///
726/// Note, using a global backend has many of the same drawbacks of any global variable in
727/// particular it can make unit testing more difficult.
728#[derive(Debug)]
729#[allow(private_bounds)]
730pub struct Rexecutor<B: Backend, State: InternalRexecutorState> {
731 backend: B,
732 cancellation_token: CancellationToken,
733 _state: PhantomData<State>,
734}
735
736impl<B> Default for Rexecutor<B, GlobalUnset>
737where
738 B: Backend + Default,
739{
740 fn default() -> Self {
741 Self::new(Default::default())
742 }
743}
744
745impl<B> Rexecutor<B, GlobalUnset>
746where
747 B: Backend,
748{
749 /// Create an instance of [`Rexecutor`] using the given `backend`.
750 pub fn new(backend: B) -> Self {
751 Self {
752 cancellation_token: Default::default(),
753 backend,
754 _state: PhantomData,
755 }
756 }
757}
758
759/// To enable automatic clean up this guard will shut down rexucutor and all associated tasks when
760/// dropped.
761#[allow(dead_code)]
762pub struct DropGuard(tokio_util::sync::DropGuard);
763
764impl<B> Rexecutor<B, GlobalUnset>
765where
766 B: Backend + Send + 'static + Sync + Clone,
767{
768 /// Sets the global backend to the backend associated with the current instance of
769 /// [`Rexecutor`].
770 ///
771 /// This should only be called once. If called a second time it will return
772 /// [`RexecutorError::GlobalBackend`].
773 ///
774 /// Calling this makes is possible to enqueue jobs without maintaining a reference to the
775 /// backend throughout the codebase and enables the use of
776 /// [`job::builder::JobBuilder::enqueue`].
777 ///
778 /// Note is is not possible to call this twice for the same [`Rexecutor`] instance
779 ///
780 /// ```compile_fail
781 /// # use rexecutor::prelude::*;
782 /// let backend = rexecutor::backend::memory::InMemoryBackend::new();
783 /// Rexecutor::new(backend).set_global_backend().set_global_backend();
784 /// ```
785 pub fn set_global_backend(self) -> Result<Rexecutor<B, GlobalSet>, RexecutorError> {
786 GlobalBackend::set(self.backend.clone())?;
787
788 Ok(Rexecutor {
789 cancellation_token: self.cancellation_token,
790 backend: self.backend,
791 _state: PhantomData,
792 })
793 }
794}
795
796#[allow(private_bounds)]
797impl<B, State> Rexecutor<B, State>
798where
799 B: Backend + Send + 'static + Sync + Clone,
800 State: InternalRexecutorState,
801{
802 /// Enable the execution of jobs for the provided [`Executor`].
803 ///
804 /// If this isn't called for an executor, then it's jobs will not be ran on this instance.
805 /// This can be used to only run jobs on specific executor nodes.
806 ///
807 /// Jobs can still be enqueued to the backend without calling this method, but they will not be
808 /// executed unless this method has been called for at least one instance of the running
809 /// application.
810 pub fn with_executor<E>(self) -> Self
811 where
812 E: Executor + 'static + Sync + Send,
813 E::Data: Send + DeserializeOwned,
814 E::Metadata: Serialize + DeserializeOwned + Send,
815 {
816 JobRunner::<B, E>::new(self.backend.clone()).spawn(self.cancellation_token.clone());
817 self
818 }
819
820 /// Setup a cron job to run on the given schedule with the given data.
821 ///
822 /// Note this will run the schedule according to UTC. To schedule the job in another timezone use
823 /// [`Rexecutor::with_cron_executor_for_timezone`].
824 ///
825 /// # Example
826 ///
827 /// To setup a cron jobs to run every day at midnight you can use the following code.
828 ///
829 /// ```
830 /// # use rexecutor::prelude::*;
831 /// # use rexecutor::backend::{Backend, memory::InMemoryBackend};
832 /// struct CronJob;
833 /// #[async_trait::async_trait]
834 /// impl Executor for CronJob {
835 /// type Data = String;
836 /// type Metadata = ();
837 /// const NAME: &'static str = "cron_job";
838 /// const MAX_ATTEMPTS: u16 = 1;
839 /// async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
840 /// /// Do something important
841 /// ExecutionResult::Done
842 /// }
843 /// }
844 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
845 /// let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
846 ///
847 /// let backend = InMemoryBackend::new();
848 /// Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
849 /// # });
850 /// ```
851 pub fn with_cron_executor<E>(self, schedule: cron::Schedule, data: E::Data) -> Self
852 where
853 E: Executor + 'static + Sync + Send,
854 E::Data: Send + Sync + Serialize + DeserializeOwned + Clone + Hash,
855 E::Metadata: Serialize + DeserializeOwned + Send + Sync,
856 {
857 self.with_cron_executor_for_timezone::<E, _>(schedule, data, Utc)
858 }
859
860 /// Setup a cron job to run on the given schedule with the given data in the given timezome.
861 ///
862 /// # Example
863 ///
864 /// To setup a cron jobs to run every day at midnight you can use the following code.
865 ///
866 /// ```
867 /// # use rexecutor::prelude::*;
868 /// # use rexecutor::backend::{Backend, memory::InMemoryBackend};
869 /// struct CronJob;
870 /// #[async_trait::async_trait]
871 /// impl Executor for CronJob {
872 /// type Data = String;
873 /// type Metadata = ();
874 /// const NAME: &'static str = "cron_job";
875 /// const MAX_ATTEMPTS: u16 = 1;
876 /// async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
877 /// /// Do something important
878 /// ExecutionResult::Done
879 /// }
880 /// }
881 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
882 /// let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
883 ///
884 /// let backend = InMemoryBackend::new();
885 /// Rexecutor::new(backend).with_cron_executor_for_timezone::<CronJob, _>(
886 /// schedule,
887 /// "important data".to_owned(),
888 /// chrono::Local,
889 /// );
890 /// # });
891 /// ```
892 pub fn with_cron_executor_for_timezone<E, Z>(
893 self,
894 schedule: cron::Schedule,
895 data: E::Data,
896 timezone: Z,
897 ) -> Self
898 where
899 E: Executor + 'static + Sync + Send,
900 E::Data: Send + Sync + Serialize + DeserializeOwned + Clone + Hash,
901 E::Metadata: Serialize + DeserializeOwned + Send + Sync,
902 Z: TimeZone + Send + 'static,
903 {
904 CronRunner::<B, E>::new(self.backend.clone(), schedule, data)
905 .spawn(timezone, self.cancellation_token.clone());
906
907 self.with_executor::<E>()
908 }
909
910 /// Set the job pruner config.
911 ///
912 /// After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
913 ///
914 /// Given the different ways in which jobs can finish it is often useful to be able to have fine
915 /// grained control over how old jobs should be cleaned up. [`PrunerConfig`] enables such control.
916 ///
917 /// When constructing [`PrunerConfig`] a [`cron::Schedule`] is provided to specify when the pruner
918 /// should run.
919 ///
920 /// Depending on the load/throughput of the system the pruner can be scheduled to run anywhere
921 /// from once a year through to multiple times per hour.
922 ///
923 /// # Example
924 ///
925 /// To remove all completed jobs more than a month old for both the `RefreshWorker` and
926 /// `EmailScheduler` while only maintaining the last 200 discarded jobs for all executors expect
927 /// the `EmailScheduler` and `RefreshWorker`, you can do the following:
928 ///
929 /// ```
930 /// # use rexecutor::prelude::*;
931 /// # use std::str::FromStr;
932 /// # use chrono::TimeDelta;
933 /// # use rexecutor::backend::memory::InMemoryBackend;
934 /// # pub(crate) struct RefreshWorker;
935 /// # pub(crate) struct EmailScheduler;
936 /// # pub(crate) struct RegistrationWorker;
937 /// #
938 /// # #[async_trait::async_trait]
939 /// # impl Executor for RefreshWorker {
940 /// # type Data = String;
941 /// # type Metadata = String;
942 /// # const NAME: &'static str = "refresh_worker";
943 /// # const MAX_ATTEMPTS: u16 = 2;
944 /// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
945 /// # ExecutionResult::Done
946 /// # }
947 /// # }
948 /// # #[async_trait::async_trait]
949 /// # impl Executor for EmailScheduler {
950 /// # type Data = String;
951 /// # type Metadata = String;
952 /// # const NAME: &'static str = "email_scheduler";
953 /// # const MAX_ATTEMPTS: u16 = 2;
954 /// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
955 /// # ExecutionResult::Done
956 /// # }
957 /// # }
958 /// # #[async_trait::async_trait]
959 /// # impl Executor for RegistrationWorker {
960 /// # type Data = String;
961 /// # type Metadata = String;
962 /// # const NAME: &'static str = "registration_worker";
963 /// # const MAX_ATTEMPTS: u16 = 2;
964 /// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
965 /// # ExecutionResult::Done
966 /// # }
967 /// # }
968 /// let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
969 /// .with_max_concurrency(Some(2))
970 /// .with_pruner(
971 /// Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
972 /// .only::<RefreshWorker>()
973 /// .and::<EmailScheduler>(),
974 /// )
975 /// .with_pruner(
976 /// Pruner::max_length(200, JobStatus::Discarded)
977 /// .except::<RefreshWorker>()
978 /// .and::<EmailScheduler>(),
979 /// );
980 ///
981 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
982 /// let backend = InMemoryBackend::new();
983 /// Rexecutor::new(backend)
984 /// .with_executor::<RefreshWorker>()
985 /// .with_executor::<EmailScheduler>()
986 /// .with_executor::<RegistrationWorker>()
987 /// .with_job_pruner(config);
988 /// # });
989 /// ```
990 pub fn with_job_pruner(self, config: PrunerConfig) -> Self {
991 PrunerRunner::new(self.backend.clone(), config).spawn(self.cancellation_token.clone());
992 self
993 }
994
995 /// Instruct rexecutor to shutdown gracefully giving time for any currently executing jobs to
996 /// complete before shutting down.
997 pub fn graceful_shutdown(self) {
998 tracing::debug!("Shutting down Rexecutor tasks");
999 self.cancellation_token.cancel();
1000 }
1001
1002 /// Returns a drop guard which will gracefully shutdown rexecutor when droped.
1003 ///
1004 /// # Example
1005 /// ```
1006 /// # use rexecutor::prelude::*;
1007 /// # use std::str::FromStr;
1008 /// # use chrono::TimeDelta;
1009 /// # use rexecutor::backend::memory::InMemoryBackend;
1010 /// # pub(crate) struct RefreshWorker;
1011 /// # pub(crate) struct EmailScheduler;
1012 /// # pub(crate) struct RegistrationWorker;
1013 /// #
1014 /// # #[async_trait::async_trait]
1015 /// # impl Executor for RefreshWorker {
1016 /// # type Data = String;
1017 /// # type Metadata = String;
1018 /// # const NAME: &'static str = "refresh_worker";
1019 /// # const MAX_ATTEMPTS: u16 = 2;
1020 /// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
1021 /// # ExecutionResult::Done
1022 /// # }
1023 /// # }
1024 /// # #[async_trait::async_trait]
1025 /// # impl Executor for EmailScheduler {
1026 /// # type Data = String;
1027 /// # type Metadata = String;
1028 /// # const NAME: &'static str = "email_scheduler";
1029 /// # const MAX_ATTEMPTS: u16 = 2;
1030 /// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
1031 /// # ExecutionResult::Done
1032 /// # }
1033 /// # }
1034 /// # #[async_trait::async_trait]
1035 /// # impl Executor for RegistrationWorker {
1036 /// # type Data = String;
1037 /// # type Metadata = String;
1038 /// # const NAME: &'static str = "registration_worker";
1039 /// # const MAX_ATTEMPTS: u16 = 2;
1040 /// # async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
1041 /// # ExecutionResult::Done
1042 /// # }
1043 /// # }
1044 /// # tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
1045 /// let backend = InMemoryBackend::new();
1046 /// // Note this must be given a name to ensure it is dropped at the end of the scope.
1047 /// // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
1048 /// let _guard = Rexecutor::new(backend)
1049 /// .with_executor::<RefreshWorker>()
1050 /// .with_executor::<EmailScheduler>()
1051 /// .with_executor::<RegistrationWorker>()
1052 /// .drop_guard();
1053 /// # });
1054 #[must_use]
1055 pub fn drop_guard(self) -> DropGuard {
1056 DropGuard(self.cancellation_token.drop_guard())
1057 }
1058}
1059
1060/// Errors that can occur when working with rexecutor.
1061// TODO: split errors
1062#[derive(Debug, Error)]
1063pub enum RexecutorError {
1064 /// Errors that result from the specific backend chosen.
1065 #[error("Error communicating with the backend")]
1066 BackendError(#[from] BackendError),
1067
1068 /// This error results from trying to enqueue a job to the global backend when it is not set or
1069 /// when trying to set the global backend multiple times.
1070 #[error("Error setting global backend")]
1071 GlobalBackend,
1072
1073 /// Error encoding or decoding data from json.
1074 #[error("Error encoding or decoding value")]
1075 EncodeError(#[from] serde_json::Error),
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use std::{sync::Arc, time::Duration};
1081
1082 use super::*;
1083 use crate::{
1084 backend::{Job, MockBackend},
1085 executor::test::{MockError, MockExecutionResult, MockReturnExecutor},
1086 job::JobStatus,
1087 pruner::Pruner,
1088 };
1089
1090 #[tokio::test]
1091 async fn run_job_error_in_stream() {
1092 let mut backend = MockBackend::default();
1093 let sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1094 let backend = Arc::new(backend);
1095
1096 let _guard = Rexecutor::<_, _>::new(backend.clone())
1097 .with_executor::<MockReturnExecutor>()
1098 .drop_guard();
1099
1100 tokio::task::yield_now().await;
1101
1102 sender.send(Err(BackendError::BadState)).unwrap();
1103
1104 tokio::task::yield_now().await;
1105 }
1106
1107 #[tokio::test]
1108 async fn run_job_success() {
1109 let mut backend = MockBackend::default();
1110 backend.expect_mark_job_complete().returning(|_| Ok(()));
1111
1112 let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Done);
1113
1114 run_job(backend, job).await;
1115 }
1116
1117 #[tokio::test]
1118 async fn run_job_success_error_marking_success() {
1119 let mut backend = MockBackend::default();
1120 backend
1121 .expect_mark_job_complete()
1122 .returning(|_| Err(BackendError::BadState));
1123
1124 let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Done);
1125
1126 run_job(backend, job).await;
1127 }
1128
1129 #[tokio::test]
1130 async fn run_job_retryable() {
1131 let mut backend = MockBackend::default();
1132 backend
1133 .expect_mark_job_retryable()
1134 .returning(|_, _, _| Ok(()));
1135
1136 let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Error {
1137 error: MockError("oh no".to_owned()),
1138 });
1139
1140 run_job(backend, job).await;
1141 }
1142
1143 #[tokio::test]
1144 async fn run_job_retryable_error_marking_retryable() {
1145 let mut backend = MockBackend::default();
1146 backend
1147 .expect_mark_job_retryable()
1148 .returning(|_, _, _| Err(BackendError::BadState));
1149
1150 let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Error {
1151 error: MockError("oh no".to_owned()),
1152 });
1153
1154 run_job(backend, job).await;
1155 }
1156
1157 #[tokio::test]
1158 async fn run_job_retryable_timeout() {
1159 let mut backend = MockBackend::default();
1160 backend
1161 .expect_mark_job_retryable()
1162 .returning(|_, _, _| Ok(()));
1163
1164 let job = Job::mock_job::<MockReturnExecutor>().with_data(MockExecutionResult::Timeout);
1165
1166 run_job(backend, job).await;
1167 }
1168
1169 #[tokio::test]
1170 async fn run_job_discarded() {
1171 let mut backend = MockBackend::default();
1172 backend.expect_mark_job_discarded().returning(|_, _| Ok(()));
1173
1174 let job = Job::mock_job::<MockReturnExecutor>()
1175 .with_data(MockExecutionResult::Error {
1176 error: MockError("oh no".to_owned()),
1177 })
1178 .with_max_attempts(1)
1179 .with_attempt(1);
1180
1181 run_job(backend, job).await;
1182 }
1183
1184 #[tokio::test]
1185 async fn run_job_discarded_error_marking_job_discarded() {
1186 let mut backend = MockBackend::default();
1187 backend
1188 .expect_mark_job_discarded()
1189 .returning(|_, _| Err(BackendError::BadState));
1190
1191 let job = Job::mock_job::<MockReturnExecutor>()
1192 .with_data(MockExecutionResult::Error {
1193 error: MockError("oh no".to_owned()),
1194 })
1195 .with_max_attempts(1)
1196 .with_attempt(1);
1197
1198 run_job(backend, job).await;
1199 }
1200
1201 #[tokio::test]
1202 async fn run_job_discarded_panic() {
1203 let mut backend = MockBackend::default();
1204 backend.expect_mark_job_discarded().returning(|_, _| Ok(()));
1205
1206 let job = Job::mock_job::<MockReturnExecutor>()
1207 .with_data(MockExecutionResult::Panic)
1208 .with_max_attempts(1)
1209 .with_attempt(1);
1210
1211 run_job(backend, job).await;
1212 }
1213
1214 #[tokio::test]
1215 async fn run_job_discarded_panic_error_marking_job_discarded() {
1216 let mut backend = MockBackend::default();
1217 backend
1218 .expect_mark_job_discarded()
1219 .returning(|_, _| Err(BackendError::BadState));
1220
1221 let job = Job::mock_job::<MockReturnExecutor>()
1222 .with_data(MockExecutionResult::Panic)
1223 .with_max_attempts(1)
1224 .with_attempt(1);
1225
1226 run_job(backend, job).await;
1227 }
1228
1229 #[tokio::test]
1230 async fn run_job_snoozed() {
1231 let mut backend = MockBackend::default();
1232 backend.expect_mark_job_snoozed().returning(|_, _| Ok(()));
1233
1234 let job = Job::mock_job::<MockReturnExecutor>()
1235 .with_data(MockExecutionResult::Snooze {
1236 delay: Duration::from_secs(10),
1237 })
1238 .with_max_attempts(1)
1239 .with_attempt(1);
1240
1241 run_job(backend, job).await;
1242 }
1243
1244 #[tokio::test]
1245 async fn run_job_snoozed_error_marking_job_snoozed() {
1246 let mut backend = MockBackend::default();
1247 backend
1248 .expect_mark_job_snoozed()
1249 .returning(|_, _| Err(BackendError::BadState));
1250
1251 let job = Job::mock_job::<MockReturnExecutor>()
1252 .with_data(MockExecutionResult::Snooze {
1253 delay: Duration::from_secs(10),
1254 })
1255 .with_max_attempts(1)
1256 .with_attempt(1);
1257
1258 run_job(backend, job).await;
1259 }
1260
1261 #[tokio::test]
1262 async fn run_job_cancelled() {
1263 let mut backend = MockBackend::default();
1264 backend.expect_mark_job_cancelled().returning(|_, _| Ok(()));
1265
1266 let job = Job::mock_job::<MockReturnExecutor>()
1267 .with_data(MockExecutionResult::Cancelled {
1268 reason: "No need anymore".to_owned(),
1269 })
1270 .with_max_attempts(1)
1271 .with_attempt(1);
1272
1273 run_job(backend, job).await;
1274 }
1275
1276 #[tokio::test]
1277 async fn run_job_cancelled_error_marking_job_cancelled() {
1278 let mut backend = MockBackend::default();
1279 backend
1280 .expect_mark_job_cancelled()
1281 .returning(|_, _| Err(BackendError::BadState));
1282
1283 let job = Job::mock_job::<MockReturnExecutor>()
1284 .with_data(MockExecutionResult::Cancelled {
1285 reason: "No need anymore".to_owned(),
1286 })
1287 .with_max_attempts(1)
1288 .with_attempt(1);
1289
1290 run_job(backend, job).await;
1291 }
1292
1293 #[tokio::test]
1294 async fn cron_job() {
1295 let every_second = cron::Schedule::try_from("* * * * * *").unwrap();
1296 let mut backend = MockBackend::default();
1297 let _sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1298 backend.expect_enqueue().returning(|_| Ok(0.into()));
1299 let backend = Arc::new(backend);
1300
1301 let _guard = Rexecutor::new(backend.clone())
1302 .with_cron_executor::<MockReturnExecutor>(every_second, MockExecutionResult::Done)
1303 .drop_guard();
1304
1305 tokio::task::yield_now().await;
1306 tokio::time::sleep(Duration::from_secs(1)).await;
1307 }
1308
1309 #[tokio::test]
1310 async fn cron_job_error() {
1311 let every_second = cron::Schedule::try_from("* * * * * *").unwrap();
1312 let mut backend = MockBackend::default();
1313 let _sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1314 backend
1315 .expect_enqueue()
1316 .returning(|_| Err(BackendError::BadState));
1317 let backend = Arc::new(backend);
1318
1319 let _guard = Rexecutor::new(backend.clone())
1320 .with_cron_executor::<MockReturnExecutor>(every_second, MockExecutionResult::Done)
1321 .drop_guard();
1322
1323 tokio::task::yield_now().await;
1324 tokio::time::sleep(Duration::from_secs(1)).await;
1325 }
1326
1327 #[tokio::test]
1328 async fn with_pruner() {
1329 let schedule = cron::Schedule::try_from("* * * * * *").unwrap();
1330 let mut backend = MockBackend::default();
1331 backend.expect_prune_jobs().returning(|_| Ok(()));
1332 let backend = Arc::new(backend);
1333
1334 let pruner = PrunerConfig::new(schedule)
1335 .with_pruner(Pruner::max_length(5, JobStatus::Complete).only::<MockReturnExecutor>());
1336
1337 let _guard = Rexecutor::new(backend.clone())
1338 .with_job_pruner(pruner)
1339 .drop_guard();
1340
1341 tokio::task::yield_now().await;
1342 tokio::time::sleep(Duration::from_secs(1)).await;
1343 }
1344
1345 #[tokio::test]
1346 async fn with_pruner_error() {
1347 let schedule = cron::Schedule::try_from("* * * * * *").unwrap();
1348 let mut backend = MockBackend::default();
1349 backend
1350 .expect_prune_jobs()
1351 .returning(|_| Err(BackendError::BadState));
1352 let backend = Arc::new(backend);
1353
1354 let pruner = PrunerConfig::new(schedule)
1355 .with_pruner(Pruner::max_length(5, JobStatus::Complete).only::<MockReturnExecutor>());
1356
1357 let _guard = Rexecutor::new(backend.clone())
1358 .with_job_pruner(pruner)
1359 .drop_guard();
1360
1361 tokio::task::yield_now().await;
1362 tokio::time::sleep(Duration::from_secs(1)).await;
1363 }
1364
1365 async fn run_job(mut backend: MockBackend, job: Job) {
1366 let sender = backend.expect_subscribe_to_ready_jobs_with_stream();
1367 let backend = Arc::new(backend);
1368 let _guard = Rexecutor::new(backend.clone())
1369 .with_executor::<MockReturnExecutor>()
1370 .drop_guard();
1371
1372 tokio::task::yield_now().await;
1373 sender.send(Ok(job)).unwrap();
1374 tokio::task::yield_now().await;
1375 }
1376}