Expand description
A high-performance, type-safe task processing framework for rust.
apalis-core provides the fundamental abstractions and runtime components for building
scalable background task systems with middleware support, graceful shutdown, and monitoring capabilities.
This is advanced documentation, for guide level documentation is found on the website.
§Core Concepts
apalis-core is built around four primary abstractions that provide a flexible and extensible task processing system:
Tasks: Type-safe task data structures with processing metadataBackends: Pluggable task storage and streaming implementationsWorkers: Task processing engines with lifecycle managementMonitor: Multi-worker coordination and observability
The framework leverages the tower service abstraction to provide a rich middleware
ecosystem like error handling, timeouts, rate limiting,
and observability.
§Tasks
The task struct provides type-safe components for task data and metadata:
Args- The primary structure for the taskParts- Wrapper type for information for task execution includes context, status, attempts, task_id and metadataContext- contextual information with the task provided by the backendStatus- Represents the current state of a taskTaskId- Unique identifier for task trackingAttempt- Retry tracking and attempt informationExtensions- Type-safe storage for additional task dataMetadata- metadata associated with the task
§Example: Using TaskBuilder
let task: Task<String, ()> = TaskBuilder::new("my-task".to_string())
.id("task-123".into())
.attempts(3)
.timeout(Duration::from_secs(30))
.run_in_minutes(10)
.build();Specific documentation for tasks can be found in the task and task::builder modules.
§Relevant Guides:
- Defining Task arguments - Creating effective task arguments that are scalable and type-safe
§Backends
The Backend trait serves as the core abstraction for all task sources.
It defines task polling mechanisms, streaming interfaces, and middleware integration points.
Associated Types:
Stream- Defines the task stream type for polling operationsLayer- Specifies the middleware layer stack for the backendCodec- Determines serialization format for task data persistenceBeat- Heartbeat stream for worker liveness checksIdType- Type used for unique task identifiersCtx- Context associated with tasksError- Error type for backend operations
§Inbuilt Implementations
MemoryStorage: In-memory storage based on channelsPipe: Pipe-based backend for a stream-to-backend pipelineCustomBackend: Flexible backend composition allowing custom functions for task management
Backends handle task persistence, distribution, and reliability concerns while providing a uniform interface for worker consumption.
§Workers
The Worker is the core runtime component responsible for task polling, execution, and lifecycle management:
§Worker Lifecycle
- Workers are responsible for task polling, processing, and lifecycle management.
- Workers can be run as a future or as a stream of events.
- Workers readiness is conditioned on the backend and service (and middleware) being ready.
- This means any blocking middleware eg (concurrency) will block the worker from polling tasks.
§Worker Components
The following are the main components the worker module:
WorkerBuilder- Fluent builder for configuring and constructing workersWorker- Actual worker implementation that processes tasksWorkerContext- Runtime state including task counts and execution statusEvent- Worker event enumeration (Start,Engage,Idle,Error,Stop)Ext- Extension traits and middleware for adding functionality to workers
§Example: Building and Running a Worker
#[tokio::main]
async fn main() {
let mut in_memory = MemoryStorage::new();
in_memory.push(1u32).await.unwrap();
async fn task(
task: u32,
worker: WorkerContext,
) -> Result<(), BoxDynError> {
/// Do some work
tokio::time::sleep(Duration::from_secs(1)).await;
worker.stop().unwrap();
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(in_memory)
.on_event(|ctx, ev| {
println!("On Event = {:?}, {:?}", ev, ctx.name());
})
.build(task);
worker.run().await.unwrap();
}Learn more about workers in the worker and worker::builder modules.
§Relevant Tutorials:
- Creating task handlers - Defining task processing functions using the
TaskFntrait - Testing task handlers with
TestWorker- Specialized worker implementation for unit and integration testing
§Monitor
The Monitor helps manage and coordinate multiple workers:
Main Features:
- Worker Registry - Keeps track of active workers
- Event Handling - Handles and processes worker events
- Graceful Shutdown - Stops all workers together safely
- Health Monitoring - Restarts and manages worker health
§Example: Using Monitor with a Worker
#[tokio::main]
async fn main() {
let mut storage = JsonStorage::new_temp().unwrap();
storage.push(1u32).await.unwrap();
let monitor = Monitor::new()
.on_event(|ctx, event| println!("{}: {:?}", ctx.name(), event))
.register(move |_| {
WorkerBuilder::new("demo-worker")
.backend(storage.clone())
.build(|req: u32, ctx: WorkerContext| async move {
println!("Processing task: {:?}", req);
Ok::<_, std::io::Error>(req)
})
});
// Start monitor and run all registered workers
monitor.run().await.unwrap();
}Learn more about the monitor in the monitor module.
§Middleware
Built on the tower ecosystem, apalis-core provides extensive middleware support like error handling, timeouts, rate limiting, and observability.
§Core Middleware
The following middleware layers are included with their worker extensions:
AcknowledgmentLayer- Task acknowledgment after processingEventListenerLayer- Worker event emission and handlingCircuitBreakerLayer- Circuit breaker pattern for failure handlingLongRunningLayer- Support for tracking long-running tasks
§Extending with middleware
You can write your own middleware to run code before or after a task is processed.
Creating Custom Middleware
Here’s a simple example of a logging middleware layer:
use apalis_core::task::Task;
use tower::{Layer, Service};
use std::task::{Context, Poll};
// Define a logging service that wraps another service
pub struct LoggingService<S> {
inner: S,
}
impl<S, Req, Res, Err> Service<Task<Req, ()>> for LoggingService<S>
where
S: Service<Task<Req, ()>, Response = Res, Error = Err>,
Req: std::fmt::Debug,
{
type Response = Res;
type Error = Err;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Task<Req, ()>) -> Self::Future {
println!("Processing task: {:?}", req.args);
self.inner.call(req)
}
}
// Define a layer that wraps services with LoggingService
pub struct LoggingLayer;
impl<S> Layer<S> for LoggingLayer {
type Service = LoggingService<S>;
fn layer(&self, service: S) -> Self::Service {
LoggingService { inner: service }
}
}If you want your middleware to do more than just intercept requests and responses, you can use extension traits. See the worker::ext module for examples.
§Error Handling
apalis-core defines a comprehensive error taxonomy for robust error handling:
AbortError- Non-retryable fatal errors requiring immediate terminationRetryAfterError- Retryable execution errors triggering retry mechanisms after a delayDeferredError- Retryable execution errors triggering immediate retry
This error classification enables precise error handling strategies and appropriate retry behavior for different failure scenarios.
§Graceful Shutdown
apalis-core has a reliable graceful shutdown system that makes sure
workers stop safely and all tasks finish before shutting down:
Key Features:
- Task tracking: Workers keep track of how many tasks are running.
- Shutdown control: The system waits until all tasks are finished before shutting down.
- Monitor coordination: A shared
Shutdowntoken helps all workers stop together. - Timeout: You can set a time limit for shutdown using
with_terminator.
Learn more about the graceful shutdown process in the monitor module.
§Feature flags
sleep(enabled by default) — Sleep feature for backends that need to waitserde— Enable serde supportjson— Enable serde_json supporttracing(enabled by default) — Enable tracing supporttest-utils— Enable test utilities
§Development
apalis-core provides comprehensive extensibility mechanisms such as middleware and ext traits.
Beyond there one may want to dive deeper into the following topics:
- Using CustomBackend - using custom backend to integrate with already existing systems
- Implementing Backends - implementing the
Backendtrait from scratch - Extending Workers using extension traits - implementing custom worker functionality via extension traits
§Observability
You can track tasks using apalis-board.

Modules§
- backend
- Core traits for interacting with backends
- error
- Includes internal error types.
- layers
- Layers for building middleware stacks
- monitor
- Actively manage and observe workers
- task
- Utilities for creating and managing tasks.
- task_fn
- Utilities for adapting async functions into a task handler.
- timer
sleep - Timing and delaying utilities
- worker
- Utilities for building and running workers.
Macros§
- features_
table - Macro to generate feature tables for backend documentation with standardized assert functions