1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
#![crate_name = "apalis_core"]
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
//! # apalis-core
//! Utilities for building job and message processing tools.
//! This crate contains traits for working with workers.
//! ````rust
//! async fn run() {
//! Monitor::new()
//! .register_with_count(2, move |c| {
//! WorkerBuilder::new(format!("tasty-banana-{c}"))
//! .layer(TraceLayer::new())
//! .with_storage(sqlite.clone())
//! .build_fn(send_email)
//! })
//! .shutdown_timeout(Duration::from_secs(1))
//! /// Here you could use tokio::ctrl_c etc
//! .run_with_signal(async { Ok(()) }).await
//! }
//! ````
//! ## How Workers Run and Monitored
//! `apalis` employs a robust system for running and monitoring workers, ensuring efficient and reliable execution of tasks. This section provides an overview of the underlying mechanism and how it can be utilized effectively.
//! 1. Worker Initialization.
//! To begin, the `Monitor::new()` function is called to create a new instance of the worker monitor. The monitor acts as a central control unit for managing and supervising the worker threads.
//! 2. Worker Registration.
//! Once the monitor is instantiated, workers can be registered using `register()` and `.register_with_count()` methods. The former takes in a single worker while the former method takes two parameters: the desired number of workers (count) and a closure `(move |_| { ... })` that specifies the worker logic.
//! Within the closure, a WorkerBuilder is utilized to construct individual worker instances. The WorkerBuilder provides a flexible and configurable way to set up worker-specific configurations, such as providing dependencies or applying additional layers to the worker.
//! 3. Worker Configuration
//! In the example code snippet, the WorkerBuilder is configured with a job source (like storage, message queue or stream) eg `SqliteStorage` and a TraceLayer to enable tracing capabilities. These configurations are specific to the needs of the workers being created.
//! 4. Worker Construction.
//! The `.build_fn(fn)` and `.build(service)` methods of the WorkerBuilder can then be invoked, eg specifying the function (send_email) that the worker will execute. This function represents the actual work to be performed by each worker. It can be a custom-defined function or a predefined function provided by the library.
//! 5. Worker Execution.
//! Upon completing the worker configuration, the worker is ready to be executed. The worker instance is added to the internal thread pool managed by the monitor. The monitor will ensure that the specified number of worker threads (count) are created and available for processing tasks.
//! 6. Worker Monitoring.
//! The monitor continuously monitors the worker threads to ensure their smooth operation. It keeps track of the workers' status, manages their lifecycle, and restarts any workers that may have encountered errors or terminated unexpectedly.
//! 7. Asynchronous Execution.
//! To facilitate asynchronous execution, the `.run().await` method is invoked on the monitor. This call suspends the current task until all workers have completed their execution. This is particularly useful when integrating the library into asynchronous Rust applications or frameworks.
//!
//! ## Middleware aka Layering
//! `apalis` prefers a functional approach to job handling and uses `tower::Layer` to model services as jobs.
//!
//! First, we need to define a tower service.
//! ```rust
//! // This service implements the Log behavior
//! pub struct LogService<S> {
//! target: &'static str,
//! service: S,
//! }
//!
//! impl<S, Request> Service<Request> for LogService<S>
//! where
//! S: Service<Request>,
//! Request: std::fmt::Debug,
//! {
//! type Response = S::Response;
//! type Error = S::Error;
//! type Future = S::Future;
//!
//! fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
//! self.service.poll_ready(cx)
//! }
//!
//! fn call(&mut self, request: Request) -> Self::Future {
//! // Use service to apply middleware before or(and) after a request
//! info!("request = {:?}, target = {:?}", request, self.target);
//! self.service.call(request)
//! // Also possible to do something after
//! }
//!}
//! ```
//!
//! Then we define a layer.
//!
//! ```rust
//! pub struct LogLayer {
//! target: &'static str,
//! }
//!
//! impl LogLayer {
//! pub fn new(target: &'static str) -> Self {
//! Self { target }
//! }
//! }
//!
//! impl<S> Layer<S> for LogLayer {
//! type Service = LogService<S>;
//!
//! fn layer(&self, service: S) -> Self::Service {
//! LogService {
//! target: self.target,
//! service,
//! }
//! }
//! }
//! ```
//! Layers are executed sequentially.
//!
//! ```rust
//! .layer(LogLayer::new("log-layer-1"))
//! .layer(LogLayer::new("log-layer-2"))
//! ```
//! `log-layer-1` would be logged before `log-layer-2`.
//! This the means you should put your general layers first eg, `TraceLayer` and `CatchPanicLayer` should be before something like `AckLayer`
//! This also can affect how other layers behave. Eg Any layer before `TraceLayer` may do some tracing, but those traces would not appear in that job's tracing span.
//!
//! ## Graceful Shutdown
//! `apalis` allows optional opt-in to graceful shutdown. This can be added to `Monitor::run_with_signal` and this can be any future. We highly recommend using `tokio::signal::ctrl_c` or something similar.
/// Represent utilities for creating worker instances.
pub mod builder;
/// Represents the [`JobContext`].
pub mod context;
/// Includes all possible error types.
pub mod error;
/// Includes the utilities for a job.
pub mod job;
/// Represents a service that is created from a function.
pub mod job_fn;
/// Represents middleware offered through [`tower::Layer`]
pub mod layers;
/// Represents the job bytes.
pub mod request;
/// Represents different possible responses.
pub mod response;
#[cfg(feature = "storage")]
#[cfg_attr(docsrs, doc(cfg(feature = "storage")))]
/// Represents ability to persist and consume jobs from storages.
pub mod storage;
/// Represents an executor. Currently tokio is implemented as default
pub mod executor;
/// Represents monitoring of running workers
pub mod monitor;
/// Represents extra utils needed for runtime agnostic approach
pub mod utils;
/// Represents the utils for building workers.
pub mod worker;
#[cfg(feature = "expose")]
#[cfg_attr(docsrs, doc(cfg(feature = "expose")))]
/// Utilities to expose workers and jobs to external tools eg web frameworks and cli tools
pub mod expose;
#[cfg(feature = "mq")]
#[cfg_attr(docsrs, doc(cfg(feature = "mq")))]
/// Message queuing utilities
pub mod mq;
/// apalis mocking utilities
#[cfg(feature = "tokio-comp")]
pub mod mock {
use futures::channel::mpsc::{Receiver, Sender};
use futures::{Stream, StreamExt};
use tower::Service;
use crate::{
job::Job,
worker::{ready::ReadyWorker, WorkerId},
};
fn build_stream<Req: Send + 'static>(mut rx: Receiver<Req>) -> impl Stream<Item = Req> {
let stream = async_stream::stream! {
while let Some(item) = rx.next().await {
yield item;
}
};
stream.boxed()
}
/// Useful for mocking a worker usually for testing purposes
///
/// # Example
/// ```rust
/// #[tokio::test(flavor = "current_thread")]
/// async fn test_worker() {
/// let (handle, mut worker) = mock_worker(job_fn(job2));
/// handle.send(TestJob(Utc::now())).await.unwrap();
/// let res = worker.consume_next().await;
/// }
/// ```
pub fn mock_worker<S, Req>(service: S) -> (Sender<Req>, ReadyWorker<impl Stream<Item = Req>, S>)
where
S: Service<Req>,
Req: Job + Send + 'static,
{
let (tx, rx) = futures::channel::mpsc::channel(10);
let stream = build_stream(rx);
(
tx,
ReadyWorker {
service,
stream,
id: WorkerId::new("mock-worker"),
beats: Vec::new(),
},
)
}
}