Expand description
Actively manage and observe workers
§Monitor
Monitor provides centralized coordination and lifecycle management for multiple workers.
It is responsible for executing, monitoring, and gracefully shutting down all registered workers in a robust and customizable manner.
§Features
- Register and run one or more workers.
- Handle graceful shutdown with optional timeout.
- Register custom event handlers to observe worker events (e.g. task received, completed, errored).
- Integrate shutdown with system signals (e.g.
SIGINT,SIGTERM) or custom triggers. - Restart Strategy: Configure custom logic to automatically restart workers on failure.
§Usage
use tower::service_fn;
#[tokio::main]
async fn main() {
async fn task(req: u32) -> Result<u32, std::io::Error> {
println!("Processing task: {:?}", req);
Ok::<_, std::io::Error>(req)
}
let monitor = Monitor::new()
.on_event(|ctx, event| println!("{}: {:?}", ctx.name(), event))
.register(move |_| {
WorkerBuilder::new("demo-worker")
.backend(MemoryStorage::new())
.build(task)
});
// Start monitor and run all registered workers
monitor.run().await.unwrap();
}§Graceful Shutdown with Timeout
To force shutdown after a certain duration, use the shutdown_timeout method:
let monitor = Monitor::new()
.shutdown_timeout(Duration::from_secs(10))
.run_with_signal(signal::ctrl_c())
.await
.unwrap();This ensures that if any worker hangs or takes too long to finish, the monitor will shut down after 10 seconds.
§Restarting Workers on Failure
You can configure the monitor to restart workers when they fail, using custom logic:
use apalis_core::monitor::Monitor;
let monitor = Monitor::new()
.should_restart(|_ctx, error, attempt| {
println!("Worker failed: {error:?}, attempt: {attempt}");
attempt < 3 // Restart up to 3 times
});§Observing Worker Events
Register event handlers to observe worker lifecycle events:
use apalis_core::monitor::Monitor;
let monitor = Monitor::new()
.on_event(|ctx, event| println!("Worker {}: {:?}", ctx.name(), event));§Registering Multiple Workers
You can register multiple workers using the register method. Each worker can be customized by index:
let monitor = Monitor::new()
.register(|index|
WorkerBuilder::new(format!("worker-{index}"))
.backend(MemoryStorage::new())
.build(task)
);§Example: Full Monitor Usage
let monitor = Monitor::new()
.register(|index|
WorkerBuilder::new(format!("worker-{index}"))
.backend(MemoryStorage::new())
.build(task)
)
.should_restart(|_, _, attempt| attempt < 5)
.on_event(|ctx, event| println!("Event: {:?}", event))
.shutdown_timeout(Duration::from_secs(10));
monitor.run().await.unwrap();§Error Handling
If any worker fails, the monitor will return a MonitorError containing details about the failure.
You can inspect the error to see which workers failed and why.
Modules§
- shutdown
- A shutdown monitor that can be used to signal shutdown to tasks and workers.
Structs§
- Exit
Error - Represents errors that occurred in a monitored worker, including its context and the error itself.
- Monitor
- A monitor for coordinating and managing a collection of workers.
- Monitored
Worker Error - Represents errors that occurred in a monitored worker, including its context and the error itself.
Enums§
- Monitor
Error - Error type for monitor operations.