Crate apalis_core

source ·
Expand description

apalis-core

Utilities for building job and message processing tools. This crate contains traits for working with workers.

async fn run() {
    Monitor::new()
        .register_with_count(2, move |c| {
            WorkerBuilder::new(format!("tasty-banana-{c}"))
                .layer(TraceLayer::new())
                .with_storage(sqlite.clone())
                .build_fn(send_email)
        })
        .shutdown_timeout(Duration::from_secs(1))
        /// Here you could use tokio::ctrl_c etc
        .run_with_signal(async { Ok(()) }).await
}

How Workers Run and Monitored

apalis employs a robust system for running and monitoring workers, ensuring efficient and reliable execution of tasks. This section provides an overview of the underlying mechanism and how it can be utilized effectively.

  1. Worker Initialization. To begin, the Monitor::new() function is called to create a new instance of the worker monitor. The monitor acts as a central control unit for managing and supervising the worker threads.
  2. Worker Registration. Once the monitor is instantiated, workers can be registered using register() and .register_with_count() methods. The former takes in a single worker while the former method takes two parameters: the desired number of workers (count) and a closure (move |_| { ... }) that specifies the worker logic. Within the closure, a WorkerBuilder is utilized to construct individual worker instances. The WorkerBuilder provides a flexible and configurable way to set up worker-specific configurations, such as providing dependencies or applying additional layers to the worker.
  3. Worker Configuration In the example code snippet, the WorkerBuilder is configured with a job source (like storage, message queue or stream) eg SqliteStorage and a TraceLayer to enable tracing capabilities. These configurations are specific to the needs of the workers being created.
  4. Worker Construction. The .build_fn(fn) and .build(service) methods of the WorkerBuilder can then be invoked, eg specifying the function (send_email) that the worker will execute. This function represents the actual work to be performed by each worker. It can be a custom-defined function or a predefined function provided by the library.
  5. Worker Execution. Upon completing the worker configuration, the worker is ready to be executed. The worker instance is added to the internal thread pool managed by the monitor. The monitor will ensure that the specified number of worker threads (count) are created and available for processing tasks.
  6. Worker Monitoring. The monitor continuously monitors the worker threads to ensure their smooth operation. It keeps track of the workers’ status, manages their lifecycle, and restarts any workers that may have encountered errors or terminated unexpectedly.
  7. Asynchronous Execution. To facilitate asynchronous execution, the .run().await method is invoked on the monitor. This call suspends the current task until all workers have completed their execution. This is particularly useful when integrating the library into asynchronous Rust applications or frameworks.

Middleware aka Layering

apalis prefers a functional approach to job handling and uses tower::Layer to model services as jobs.

First, we need to define a tower service.

// This service implements the Log behavior
pub struct LogService<S> {
   target: &'static str,
   service: S,
}

impl<S, Request> Service<Request> for LogService<S>
where
    S: Service<Request>,
    Request: std::fmt::Debug,
{
   type Response = S::Response;
   type Error = S::Error;
   type Future = S::Future;

   fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
       self.service.poll_ready(cx)
   }

   fn call(&mut self, request: Request) -> Self::Future {
       // Use service to apply middleware before or(and) after a request
       info!("request = {:?}, target = {:?}", request, self.target);
       self.service.call(request)
       // Also possible to do something after
   }
}

Then we define a layer.

pub struct LogLayer {
    target: &'static str,
}

impl LogLayer {
   pub fn new(target: &'static str) -> Self {
       Self { target }
   }
}

impl<S> Layer<S> for LogLayer {
    type Service = LogService<S>;

    fn layer(&self, service: S) -> Self::Service {
        LogService {
            target: self.target,
            service,
        }
    }
}

Layers are executed sequentially.

.layer(LogLayer::new("log-layer-1"))
.layer(LogLayer::new("log-layer-2"))

log-layer-1 would be logged before log-layer-2. This the means you should put your general layers first eg, TraceLayer and CatchPanicLayer should be before something like AckLayer This also can affect how other layers behave. Eg Any layer before TraceLayer may do some tracing, but those traces would not appear in that job’s tracing span.

Graceful Shutdown

apalis allows optional opt-in to graceful shutdown. This can be added to Monitor::run_with_signal and this can be any future. We highly recommend using tokio::signal::ctrl_c or something similar.

Modules

  • Represent utilities for creating worker instances.
  • Represents the [JobContext].
  • Includes all possible error types.
  • Represents an executor. Currently tokio is implemented as default
  • exposeexpose
    Utilities to expose workers and jobs to external tools eg web frameworks and cli tools
  • Includes the utilities for a job.
  • Represents a service that is created from a function.
  • Represents middleware offered through [tower::Layer]
  • apalis mocking utilities
  • Represents monitoring of running workers
  • mqmq
    Message queuing utilities
  • Represents the job bytes.
  • Represents different possible responses.
  • storagestorage
    Represents ability to persist and consume jobs from storages.
  • Represents extra utils needed for runtime agnostic approach
  • Represents the utils for building workers.