Crate apalis_core
source ·Expand description
apalis-core
Utilities for building job and message processing tools. This crate contains traits for working with workers.
async fn run() {
Monitor::new()
.register_with_count(2, move |c| {
WorkerBuilder::new(format!("tasty-banana-{c}"))
.layer(TraceLayer::new())
.with_storage(sqlite.clone())
.build_fn(send_email)
})
.shutdown_timeout(Duration::from_secs(1))
/// Here you could use tokio::ctrl_c etc
.run_with_signal(async { Ok(()) }).await
}
How Workers Run and Monitored
apalis
employs a robust system for running and monitoring workers, ensuring efficient and reliable execution of tasks. This section provides an overview of the underlying mechanism and how it can be utilized effectively.
- Worker Initialization.
To begin, the
Monitor::new()
function is called to create a new instance of the worker monitor. The monitor acts as a central control unit for managing and supervising the worker threads. - Worker Registration.
Once the monitor is instantiated, workers can be registered using
register()
and.register_with_count()
methods. The former takes in a single worker while the former method takes two parameters: the desired number of workers (count) and a closure(move |_| { ... })
that specifies the worker logic. Within the closure, a WorkerBuilder is utilized to construct individual worker instances. The WorkerBuilder provides a flexible and configurable way to set up worker-specific configurations, such as providing dependencies or applying additional layers to the worker. - Worker Configuration
In the example code snippet, the WorkerBuilder is configured with a job source (like storage, message queue or stream) eg
SqliteStorage
and a TraceLayer to enable tracing capabilities. These configurations are specific to the needs of the workers being created. - Worker Construction.
The
.build_fn(fn)
and.build(service)
methods of the WorkerBuilder can then be invoked, eg specifying the function (send_email) that the worker will execute. This function represents the actual work to be performed by each worker. It can be a custom-defined function or a predefined function provided by the library. - Worker Execution. Upon completing the worker configuration, the worker is ready to be executed. The worker instance is added to the internal thread pool managed by the monitor. The monitor will ensure that the specified number of worker threads (count) are created and available for processing tasks.
- Worker Monitoring. The monitor continuously monitors the worker threads to ensure their smooth operation. It keeps track of the workers’ status, manages their lifecycle, and restarts any workers that may have encountered errors or terminated unexpectedly.
- Asynchronous Execution.
To facilitate asynchronous execution, the
.run().await
method is invoked on the monitor. This call suspends the current task until all workers have completed their execution. This is particularly useful when integrating the library into asynchronous Rust applications or frameworks.
Middleware aka Layering
apalis
prefers a functional approach to job handling and uses tower::Layer
to model services as jobs.
First, we need to define a tower service.
// This service implements the Log behavior
pub struct LogService<S> {
target: &'static str,
service: S,
}
impl<S, Request> Service<Request> for LogService<S>
where
S: Service<Request>,
Request: std::fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, request: Request) -> Self::Future {
// Use service to apply middleware before or(and) after a request
info!("request = {:?}, target = {:?}", request, self.target);
self.service.call(request)
// Also possible to do something after
}
}
Then we define a layer.
pub struct LogLayer {
target: &'static str,
}
impl LogLayer {
pub fn new(target: &'static str) -> Self {
Self { target }
}
}
impl<S> Layer<S> for LogLayer {
type Service = LogService<S>;
fn layer(&self, service: S) -> Self::Service {
LogService {
target: self.target,
service,
}
}
}
Layers are executed sequentially.
.layer(LogLayer::new("log-layer-1"))
.layer(LogLayer::new("log-layer-2"))
log-layer-1
would be logged before log-layer-2
.
This the means you should put your general layers first eg, TraceLayer
and CatchPanicLayer
should be before something like AckLayer
This also can affect how other layers behave. Eg Any layer before TraceLayer
may do some tracing, but those traces would not appear in that job’s tracing span.
Graceful Shutdown
apalis
allows optional opt-in to graceful shutdown. This can be added to Monitor::run_with_signal
and this can be any future. We highly recommend using tokio::signal::ctrl_c
or something similar.
Modules
- Represent utilities for creating worker instances.
- Represents the [
JobContext
]. - Includes all possible error types.
- Represents an executor. Currently tokio is implemented as default
- expose
expose
Utilities to expose workers and jobs to external tools eg web frameworks and cli tools - Includes the utilities for a job.
- Represents a service that is created from a function.
- Represents middleware offered through [
tower::Layer
] - apalis mocking utilities
- Represents monitoring of running workers
- mq
mq
Message queuing utilities - Represents the job bytes.
- Represents different possible responses.
- storage
storage
Represents ability to persist and consume jobs from storages. - Represents extra utils needed for runtime agnostic approach
- Represents the utils for building workers.