Struct Rexecutor

Source
pub struct Rexecutor<B: Backend, State: InternalRexecutorState> { /* private fields */ }
Expand description

The entry point for setting up rexecutor.

To create an instance of Rexecutor you will need to have an implementation of Backend. The only one provided in this crate is backend::memory::InMemoryBackend which is primarily provided for testing purposes. Instead a backend should be used from one of the implementations provided by other crates.

§Setting up executors

For each executor you would like to run Rexecutor::with_executor should be called. Being explicit about this opens the possibility of having specific nodes in a cluster running as worker nodes for certain enqueued jobs while other node not responsible for their execution.

§Example setting up executors

let backend = InMemoryBackend::new();
Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>();

§Compile time scheduling of cron jobs

It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using either Rexecutor::with_cron_executor or Rexecutor::with_cron_executor_for_timezone. The later is use to specify the specific timezone that the jobs should be scheduled to run in.

§Example setting up a UTC cron job

To setup a cron jobs to run every day at midnight you can use the following code.

struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());

§Pruning jobs

After jobs have completed, been cancelled, or discarded it is useful to be able to clean up. To setup the job pruner Rexecutor::with_job_pruner should be called passing in the given PrunerConfig.

Given the different ways in which jobs can finish it is often useful to be able to have fine grained control over how old jobs should be cleaned up. PrunerConfig enables such control.

When constructing PrunerConfig a cron::Schedule is provided to specify when the pruner should run.

Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.

§Example configuring the job pruner

let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
    .with_max_concurrency(Some(2))
    .with_pruner(
        Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
            .only::<RefreshWorker>()
            .and::<EmailScheduler>(),
    )
    .with_pruner(
        Pruner::max_length(200, JobStatus::Discarded)
            .except::<RefreshWorker>()
            .and::<EmailScheduler>(),
    );

let backend = InMemoryBackend::new();
Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>()
    .with_job_pruner(config);

§Shutting rexecutor down

To avoid jobs getting killed mid way through their executions it is important to make use of graceful shutdown. This can either explicitly be called using Rexecutor::graceful_shutdown, or via use of the DropGuard obtained via Rexecutor::drop_guard.

Using Rexecutor::graceful_shutdown or Rexecutor::drop_guard will ensure that all currently executing jobs will be allowed time to complete before shutting rexecutor down.

§Example using the DropGuard

let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>()
    .drop_guard();

§Global backend

Rexecutor can be ran either with use of a global backend. This enables the use of the convenience job::builder::JobBuilder::enqueue method which does not require a reference to the backend to be passed down to the code that needs to enqueue a job.

The global backend can be set using Rexecutor::set_global_backend this should only be called once otherwise it will return an error.

In fact for a single Rexecutor instance it is impossible to call this twice

let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();

Note, using a global backend has many of the same drawbacks of any global variable in particular it can make unit testing more difficult.

Implementations§

Source§

impl<B> Rexecutor<B, GlobalUnset>
where B: Backend,

Source

pub fn new(backend: B) -> Self

Create an instance of Rexecutor using the given backend.

Source§

impl<B> Rexecutor<B, GlobalUnset>
where B: Backend + Send + 'static + Sync + Clone,

Source

pub fn set_global_backend( self, ) -> Result<Rexecutor<B, GlobalSet>, RexecutorError>

Sets the global backend to the backend associated with the current instance of Rexecutor.

This should only be called once. If called a second time it will return RexecutorError::GlobalBackend.

Calling this makes is possible to enqueue jobs without maintaining a reference to the backend throughout the codebase and enables the use of job::builder::JobBuilder::enqueue.

Note is is not possible to call this twice for the same Rexecutor instance

let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();
Source§

impl<B, State> Rexecutor<B, State>
where B: Backend + Send + 'static + Sync + Clone, State: InternalRexecutorState,

Source

pub fn with_executor<E>(self) -> Self

Enable the execution of jobs for the provided Executor.

If this isn’t called for an executor, then it’s jobs will not be ran on this instance. This can be used to only run jobs on specific executor nodes.

Jobs can still be enqueued to the backend without calling this method, but they will not be executed unless this method has been called for at least one instance of the running application.

Source

pub fn with_cron_executor<E>(self, schedule: Schedule, data: E::Data) -> Self

Setup a cron job to run on the given schedule with the given data.

Note this will run the schedule according to UTC. To schedule the job in another timezone use Rexecutor::with_cron_executor_for_timezone.

§Example

To setup a cron jobs to run every day at midnight you can use the following code.

struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
Source

pub fn with_cron_executor_for_timezone<E, Z>( self, schedule: Schedule, data: E::Data, timezone: Z, ) -> Self
where E: Executor + 'static + Sync + Send, E::Data: Send + Sync + Serialize + DeserializeOwned + Clone + Hash, E::Metadata: Serialize + DeserializeOwned + Send + Sync, Z: TimeZone + Send + 'static,

Setup a cron job to run on the given schedule with the given data in the given timezome.

§Example

To setup a cron jobs to run every day at midnight you can use the following code.

struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor_for_timezone::<CronJob, _>(
    schedule,
    "important data".to_owned(),
    chrono::Local,
);
Source

pub fn with_job_pruner(self, config: PrunerConfig) -> Self

Set the job pruner config.

After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.

Given the different ways in which jobs can finish it is often useful to be able to have fine grained control over how old jobs should be cleaned up. PrunerConfig enables such control.

When constructing PrunerConfig a cron::Schedule is provided to specify when the pruner should run.

Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.

§Example

To remove all completed jobs more than a month old for both the RefreshWorker and EmailScheduler while only maintaining the last 200 discarded jobs for all executors expect the EmailScheduler and RefreshWorker, you can do the following:

let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
    .with_max_concurrency(Some(2))
    .with_pruner(
        Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
            .only::<RefreshWorker>()
            .and::<EmailScheduler>(),
    )
    .with_pruner(
        Pruner::max_length(200, JobStatus::Discarded)
            .except::<RefreshWorker>()
            .and::<EmailScheduler>(),
    );

let backend = InMemoryBackend::new();
Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>()
    .with_job_pruner(config);
Source

pub fn graceful_shutdown(self)

Instruct rexecutor to shutdown gracefully giving time for any currently executing jobs to complete before shutting down.

Source

pub fn drop_guard(self) -> DropGuard

Returns a drop guard which will gracefully shutdown rexecutor when droped.

§Example
let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
    .with_executor::<RefreshWorker>()
    .with_executor::<EmailScheduler>()
    .with_executor::<RegistrationWorker>()
    .drop_guard();

Trait Implementations§

Source§

impl<B: Debug + Backend, State: Debug + InternalRexecutorState> Debug for Rexecutor<B, State>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<B, State> Freeze for Rexecutor<B, State>
where B: Freeze,

§

impl<B, State> RefUnwindSafe for Rexecutor<B, State>
where B: RefUnwindSafe, State: RefUnwindSafe,

§

impl<B, State> Send for Rexecutor<B, State>
where B: Send, State: Send,

§

impl<B, State> Sync for Rexecutor<B, State>
where B: Sync, State: Sync,

§

impl<B, State> Unpin for Rexecutor<B, State>
where B: Unpin, State: Unpin,

§

impl<B, State> UnwindSafe for Rexecutor<B, State>
where B: UnwindSafe, State: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more