pub struct Rexecutor<B: Backend, State: InternalRexecutorState> { /* private fields */ }
Expand description
The entry point for setting up rexecutor.
To create an instance of Rexecutor
you will need to have an implementation of Backend
.
The only one provided in this crate is backend::memory::InMemoryBackend
which is primarily
provided for testing purposes. Instead a backend should be used from one of the implementations
provided by other crates.
§Setting up executors
For each executor you would like to run Rexecutor::with_executor
should be called. Being
explicit about this opens the possibility of having specific nodes in a cluster running as
worker nodes for certain enqueued jobs while other node not responsible for their execution.
§Example setting up executors
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>();
§Compile time scheduling of cron jobs
It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using
either Rexecutor::with_cron_executor
or Rexecutor::with_cron_executor_for_timezone
. The
later is use to specify the specific timezone that the jobs should be scheduled to run in.
§Example setting up a UTC cron job
To setup a cron jobs to run every day at midnight you can use the following code.
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
type Data = String;
type Metadata = ();
const NAME: &'static str = "cron_job";
const MAX_ATTEMPTS: u16 = 1;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
/// Do something important
ExecutionResult::Done
}
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
§Pruning jobs
After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
To setup the job pruner Rexecutor::with_job_pruner
should be called passing in the given
PrunerConfig
.
Given the different ways in which jobs can finish it is often useful to be able to have fine
grained control over how old jobs should be cleaned up. PrunerConfig
enables such control.
When constructing PrunerConfig
a cron::Schedule
is provided to specify when the pruner
should run.
Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.
§Example configuring the job pruner
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
.with_max_concurrency(Some(2))
.with_pruner(
Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
.only::<RefreshWorker>()
.and::<EmailScheduler>(),
)
.with_pruner(
Pruner::max_length(200, JobStatus::Discarded)
.except::<RefreshWorker>()
.and::<EmailScheduler>(),
);
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.with_job_pruner(config);
§Shutting rexecutor down
To avoid jobs getting killed mid way through their executions it is important to make use of
graceful shutdown. This can either explicitly be called using Rexecutor::graceful_shutdown
,
or via use of the DropGuard
obtained via Rexecutor::drop_guard
.
Using Rexecutor::graceful_shutdown
or Rexecutor::drop_guard
will ensure that all
currently executing jobs will be allowed time to complete before shutting rexecutor down.
§Example using the DropGuard
let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.drop_guard();
§Global backend
Rexecutor can be ran either with use of a global backend. This enables the use of the
convenience job::builder::JobBuilder::enqueue
method which does not require a reference to
the backend to be passed down to the code that needs to enqueue a job.
The global backend can be set using Rexecutor::set_global_backend
this should only be
called once otherwise it will return an error.
In fact for a single Rexecutor
instance it is impossible to call this twice
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();
Note, using a global backend has many of the same drawbacks of any global variable in particular it can make unit testing more difficult.
Implementations§
Source§impl<B> Rexecutor<B, GlobalUnset>
impl<B> Rexecutor<B, GlobalUnset>
Sourcepub fn set_global_backend(
self,
) -> Result<Rexecutor<B, GlobalSet>, RexecutorError>
pub fn set_global_backend( self, ) -> Result<Rexecutor<B, GlobalSet>, RexecutorError>
Sets the global backend to the backend associated with the current instance of
Rexecutor
.
This should only be called once. If called a second time it will return
RexecutorError::GlobalBackend
.
Calling this makes is possible to enqueue jobs without maintaining a reference to the
backend throughout the codebase and enables the use of
job::builder::JobBuilder::enqueue
.
Note is is not possible to call this twice for the same Rexecutor
instance
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();
Source§impl<B, State> Rexecutor<B, State>
impl<B, State> Rexecutor<B, State>
Sourcepub fn with_executor<E>(self) -> Selfwhere
E: Executor + 'static + Sync + Send,
E::Data: Send + DeserializeOwned,
E::Metadata: Serialize + DeserializeOwned + Send,
pub fn with_executor<E>(self) -> Selfwhere
E: Executor + 'static + Sync + Send,
E::Data: Send + DeserializeOwned,
E::Metadata: Serialize + DeserializeOwned + Send,
Enable the execution of jobs for the provided Executor
.
If this isn’t called for an executor, then it’s jobs will not be ran on this instance. This can be used to only run jobs on specific executor nodes.
Jobs can still be enqueued to the backend without calling this method, but they will not be executed unless this method has been called for at least one instance of the running application.
Sourcepub fn with_cron_executor<E>(self, schedule: Schedule, data: E::Data) -> Self
pub fn with_cron_executor<E>(self, schedule: Schedule, data: E::Data) -> Self
Setup a cron job to run on the given schedule with the given data.
Note this will run the schedule according to UTC. To schedule the job in another timezone use
Rexecutor::with_cron_executor_for_timezone
.
§Example
To setup a cron jobs to run every day at midnight you can use the following code.
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
type Data = String;
type Metadata = ();
const NAME: &'static str = "cron_job";
const MAX_ATTEMPTS: u16 = 1;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
/// Do something important
ExecutionResult::Done
}
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
Sourcepub fn with_cron_executor_for_timezone<E, Z>(
self,
schedule: Schedule,
data: E::Data,
timezone: Z,
) -> Self
pub fn with_cron_executor_for_timezone<E, Z>( self, schedule: Schedule, data: E::Data, timezone: Z, ) -> Self
Setup a cron job to run on the given schedule with the given data in the given timezome.
§Example
To setup a cron jobs to run every day at midnight you can use the following code.
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
type Data = String;
type Metadata = ();
const NAME: &'static str = "cron_job";
const MAX_ATTEMPTS: u16 = 1;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
/// Do something important
ExecutionResult::Done
}
}
let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();
let backend = InMemoryBackend::new();
Rexecutor::new(backend).with_cron_executor_for_timezone::<CronJob, _>(
schedule,
"important data".to_owned(),
chrono::Local,
);
Sourcepub fn with_job_pruner(self, config: PrunerConfig) -> Self
pub fn with_job_pruner(self, config: PrunerConfig) -> Self
Set the job pruner config.
After jobs have completed, been cancelled, or discarded it is useful to be able to clean up.
Given the different ways in which jobs can finish it is often useful to be able to have fine
grained control over how old jobs should be cleaned up. PrunerConfig
enables such control.
When constructing PrunerConfig
a cron::Schedule
is provided to specify when the pruner
should run.
Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.
§Example
To remove all completed jobs more than a month old for both the RefreshWorker
and
EmailScheduler
while only maintaining the last 200 discarded jobs for all executors expect
the EmailScheduler
and RefreshWorker
, you can do the following:
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
.with_max_concurrency(Some(2))
.with_pruner(
Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
.only::<RefreshWorker>()
.and::<EmailScheduler>(),
)
.with_pruner(
Pruner::max_length(200, JobStatus::Discarded)
.except::<RefreshWorker>()
.and::<EmailScheduler>(),
);
let backend = InMemoryBackend::new();
Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.with_job_pruner(config);
Sourcepub fn graceful_shutdown(self)
pub fn graceful_shutdown(self)
Instruct rexecutor to shutdown gracefully giving time for any currently executing jobs to complete before shutting down.
Sourcepub fn drop_guard(self) -> DropGuard
pub fn drop_guard(self) -> DropGuard
Returns a drop guard which will gracefully shutdown rexecutor when droped.
§Example
let backend = InMemoryBackend::new();
// Note this must be given a name to ensure it is dropped at the end of the scope.
// See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
let _guard = Rexecutor::new(backend)
.with_executor::<RefreshWorker>()
.with_executor::<EmailScheduler>()
.with_executor::<RegistrationWorker>()
.drop_guard();