Module monitor

Module monitor 

Source
Expand description

Actively manage and observe workers

§Monitor

Monitor provides centralized coordination and lifecycle management for multiple workers. It is responsible for executing, monitoring, and gracefully shutting down all registered workers in a robust and customizable manner.

§Features

  • Register and run one or more workers.
  • Handle graceful shutdown with optional timeout.
  • Register custom event handlers to observe worker events (e.g. task received, completed, errored).
  • Integrate shutdown with system signals (e.g. SIGINT, SIGTERM) or custom triggers.
  • Restart Strategy: Configure custom logic to automatically restart workers on failure.

§Usage

use tower::service_fn;

#[tokio::main]
async fn main() {

    async fn task(req: u32) -> Result<u32, std::io::Error> {
        println!("Processing task: {:?}", req);
        Ok::<_, std::io::Error>(req)
    }

    let monitor = Monitor::new()
        .on_event(|ctx, event| println!("{}: {:?}", ctx.name(), event))
        .register(move |_| {
            WorkerBuilder::new("demo-worker")
                .backend(MemoryStorage::new())
                .build(task)
         });

    // Start monitor and run all registered workers
    monitor.run().await.unwrap();
}

§Graceful Shutdown with Timeout

To force shutdown after a certain duration, use the shutdown_timeout method:

let monitor = Monitor::new()
    .shutdown_timeout(Duration::from_secs(10))
    .run_with_signal(signal::ctrl_c())
    .await
    .unwrap();

This ensures that if any worker hangs or takes too long to finish, the monitor will shut down after 10 seconds.

§Restarting Workers on Failure

You can configure the monitor to restart workers when they fail, using custom logic:

use apalis_core::monitor::Monitor;

let monitor = Monitor::new()
    .should_restart(|_ctx, error, attempt| {
        println!("Worker failed: {error:?}, attempt: {attempt}");
        attempt < 3 // Restart up to 3 times
    });

§Observing Worker Events

Register event handlers to observe worker lifecycle events:

use apalis_core::monitor::Monitor;

let monitor = Monitor::new()
    .on_event(|ctx, event| println!("Worker {}: {:?}", ctx.name(), event));

§Registering Multiple Workers

You can register multiple workers using the register method. Each worker can be customized by index:

let monitor = Monitor::new()
    .register(|index|
       WorkerBuilder::new(format!("worker-{index}"))
        .backend(MemoryStorage::new())
        .build(task)
    );

§Example: Full Monitor Usage

let monitor = Monitor::new()
    .register(|index|
        WorkerBuilder::new(format!("worker-{index}"))
        .backend(MemoryStorage::new())
        .build(task)
    )
    .should_restart(|_, _, attempt| attempt < 5)
    .on_event(|ctx, event| println!("Event: {:?}", event))
    .shutdown_timeout(Duration::from_secs(10));

monitor.run().await.unwrap();

§Error Handling

If any worker fails, the monitor will return a MonitorError containing details about the failure. You can inspect the error to see which workers failed and why.

Modules§

shutdown
A shutdown monitor that can be used to signal shutdown to tasks and workers.

Structs§

ExitError
Represents errors that occurred in a monitored worker, including its context and the error itself.
Monitor
A monitor for coordinating and managing a collection of workers.
MonitoredWorkerError
Represents errors that occurred in a monitored worker, including its context and the error itself.

Enums§

MonitorError
Error type for monitor operations.