apalis_core/
lib.rs

1#![crate_name = "apalis_core"]
2#![warn(
3    missing_debug_implementations,
4    missing_docs,
5    rust_2018_idioms,
6    unreachable_pub,
7    bad_style,
8    dead_code,
9    improper_ctypes,
10    non_shorthand_field_patterns,
11    no_mangle_generic_items,
12    overflowing_literals,
13    path_statements,
14    patterns_in_fns_without_body,
15    unconditional_recursion,
16    unused,
17    unused_allocation,
18    unused_comparisons,
19    unused_parens,
20    while_true
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23//! A high-performance, type-safe task processing framework for rust.
24//!
25//! `apalis-core` provides the fundamental abstractions and runtime components for building
26//! scalable background task systems with middleware support, graceful shutdown, and monitoring capabilities.
27//!
28//! This is advanced documentation, for guide level documentation is found on the [website](https://apalis.dev).
29//!
30//! # Core Concepts
31//!
32//! `apalis-core` is built around four primary abstractions that provide a flexible and extensible task processing system:
33//!
34//! - **[`Tasks`](#tasks)**: Type-safe task data structures with processing metadata
35//! - **[`Backends`](#backends)**: Pluggable task storage and streaming implementations  
36//! - **[`Workers`](#workers)**: Task processing engines with lifecycle management
37//! - **[`Monitor`](#monitor)**: Multi-worker coordination and observability
38//!
39//! The framework leverages the `tower` service abstraction to provide a rich middleware
40//! ecosystem like error handling, timeouts, rate limiting,
41//! and observability.
42//!
43//!
44//! ## Tasks
45//!
46//! The task struct provides type-safe components for task data and metadata:
47//! - [`Args`](crate::task_fn::guide) - The primary structure for the task
48//! - [`Parts`](crate::task::Parts) - Wrapper type for information for task execution includes context, status, attempts, task_id and metadata
49//! - [`Context`](crate::backend::Backend#required-associated-types) - contextual information with the task provided by the backend
50//! - [`Status`](crate::task::status::Status) - Represents the current state of a task
51//! - [`TaskId`](crate::task::task_id::TaskId) - Unique identifier for task tracking
52//! - [`Attempt`](crate::task::attempt::Attempt) - Retry tracking and attempt information
53//! - [`Extensions`](crate::task::data) - Type-safe storage for additional task data
54//! - [`Metadata`](crate::task::metadata) - metadata associated with the task
55//!
56//! ### Example: Using `TaskBuilder`
57//!
58//! ```ignore
59//! let task: Task<String, ()> = TaskBuilder::new("my-task".to_string())
60//!     .id("task-123".into())
61//!     .attempts(3)
62//!     .timeout(Duration::from_secs(30))
63//!     .run_in_minutes(10)
64//!     .build();
65//! ```
66//! Specific documentation for tasks can be found in the [`task`] and [`task::builder`] modules.
67//!
68//! #### Relevant Guides:
69//! - [**Defining Task arguments**](crate::task_fn::guide) - Creating effective task arguments that are scalable and type-safe
70//!
71//! ## Backends
72//!
73//! The [`Backend`](crate::backend::Backend) trait serves as the core abstraction for all task sources.
74//! It defines task polling mechanisms, streaming interfaces, and middleware integration points.
75//!
76//! <details>
77//! <summary>Associated Types:</summary>
78//!
79//! - `Stream` - Defines the task stream type for polling operations
80//! - `Layer` - Specifies the middleware layer stack for the backend
81//! - `Codec` - Determines serialization format for task data persistence
82//! - `Beat` - Heartbeat stream for worker liveness checks
83//! - `IdType` - Type used for unique task identifiers
84//! - `Ctx` -   Context associated with tasks
85//! - `Error` - Error type for backend operations
86//!
87//! </details>
88//!
89//! ### Inbuilt Implementations
90//! - [`MemoryStorage`](crate::backend::memory::MemoryStorage) : In-memory storage based on channels
91//! - [`Pipe`](crate::backend::pipe) : Pipe-based backend for a stream-to-backend pipeline
92//! - [`CustomBackend`](crate::backend::custom) : Flexible backend composition allowing custom functions for task management
93//!
94//! Backends handle task persistence, distribution, and reliability concerns while providing
95//! a uniform interface for worker consumption.
96//!
97//! ## Workers
98//!
99//! The [`Worker`](crate::worker) is the core runtime component responsible for task polling, execution, and lifecycle management:
100//!
101//! ### Worker Lifecycle
102//!
103//! - Workers are responsible for task polling, processing, and lifecycle management.
104//! - Workers can be run as a future or as a stream of events.
105//! - Workers readiness is conditioned on the backend and service (and middleware) being ready.
106//! - This means any blocking middleware eg (concurrency) will block the worker from polling tasks.
107//!
108//! ### Worker Components
109//!
110//! The following are the main components the worker module:
111//!
112//! - [`WorkerBuilder`] - Fluent builder for configuring and constructing workers
113//! - [`Worker`] - Actual worker implementation that processes tasks
114//! - [`WorkerContext`] - Runtime state including task counts and execution status
115//! - [`Event`] - Worker event enumeration (`Start`, `Engage`, `Idle`, `Error`, `Stop`)
116//! - [`Ext`](crate::worker::ext) - Extension traits and middleware for adding functionality to workers
117//!
118//! ### Example: Building and Running a Worker
119//! ```rust
120//! # use apalis_core::worker::{builder::WorkerBuilder, context::WorkerContext};
121//! # use apalis_core::backend::memory::MemoryStorage;
122//! # use apalis_core::error::BoxDynError;
123//! # use std::time::Duration;
124//! # use crate::apalis_core::worker::ext::event_listener::EventListenerExt;
125//! # use crate::apalis_core::backend::TaskSink;
126//! #[tokio::main]
127//! async fn main() {
128//!     let mut in_memory = MemoryStorage::new();
129//!     in_memory.push(1u32).await.unwrap();
130//!
131//!     async fn task(
132//!         task: u32,
133//!         worker: WorkerContext,
134//!     ) -> Result<(), BoxDynError> {
135//!          /// Do some work
136//!         tokio::time::sleep(Duration::from_secs(1)).await;
137//!         worker.stop().unwrap();
138//!         Ok(())
139//!     }
140//!
141//!     let worker = WorkerBuilder::new("rango-tango")
142//!         .backend(in_memory)
143//!         .on_event(|ctx, ev| {
144//!             println!("On Event = {:?}, {:?}", ev, ctx.name());
145//!         })
146//!         .build(task);
147//!     worker.run().await.unwrap();
148//! }
149//! ```
150//!
151//! Learn more about workers in the [`worker`](crate::worker) and [`worker::builder`](crate::worker::builder) modules.
152//!
153//! #### Relevant Tutorials:
154//! - [**Creating task handlers**](crate::task_fn::guide) - Defining task processing functions using the [`TaskFn`] trait
155//! - [**Testing task handlers with `TestWorker`**](crate::worker::test_worker) - Specialized worker implementation for unit and integration testing
156//!
157//! ## Monitor
158//!
159//! The [`Monitor`](crate::monitor::Monitor) helps manage and coordinate multiple workers:
160//!
161//! **Main Features:**
162//! - **Worker Registry** - Keeps track of active workers
163//! - **Event Handling** - Handles and processes worker events
164//! - **Graceful Shutdown** - Stops all workers together safely
165//! - **Health Monitoring** - Restarts and manages worker health
166//! ### Example: Using `Monitor` with a Worker
167//!
168//! ```rust
169//! # use apalis_core::monitor::Monitor;
170//! # use apalis_core::worker::builder::WorkerBuilder;
171//! # use apalis_core::backend::json::JsonStorage;
172//! # use apalis_core::task::Task;
173//! # use apalis_core::backend::TaskSink;
174//! # use tower::service_fn;
175//! # use std::time::Duration;
176//! # use apalis_core::worker::context::WorkerContext;
177//! #[tokio::main]
178//! async fn main() {
179//!     let mut storage = JsonStorage::new_temp().unwrap();
180//!     storage.push(1u32).await.unwrap();
181//!
182//!     let monitor = Monitor::new()
183//!         .on_event(|ctx, event| println!("{}: {:?}", ctx.name(), event))
184//!         .register(move |_| {
185//!             WorkerBuilder::new("demo-worker")
186//!                 .backend(storage.clone())
187//!                 .build(|req: u32, ctx: WorkerContext| async move {
188//!                     println!("Processing task: {:?}", req);
189//! #                    ctx.stop().unwrap();
190//!                     Ok::<_, std::io::Error>(req)
191//!                 })
192//!         });
193//!
194//!     // Start monitor and run all registered workers
195//!     monitor.run().await.unwrap();
196//! }
197//! ```
198//!
199//! Learn more about the monitor in the [`monitor` module](crate::monitor).
200//!
201//! ## Middleware
202//!
203//! Built on the `tower` ecosystem, `apalis-core` provides extensive middleware support like error handling, timeouts, rate limiting, and observability.
204//!
205//! ### Core Middleware
206//!
207//! The following middleware layers are included with their worker extensions:
208//! - [`AcknowledgmentLayer`] - Task acknowledgment after processing
209//! - [`EventListenerLayer`] - Worker event emission and handling
210//! - [`CircuitBreakerLayer`] - Circuit breaker pattern for failure handling
211//! - [`LongRunningLayer`] - Support for tracking long-running tasks
212//!
213//! ### Extending with middleware
214//!
215//! You can write your own middleware to run code before or after a task is processed.
216//!
217//! <details>
218//! <summary>Creating Custom Middleware</summary>
219//!
220//! Here's a simple example of a logging middleware layer:
221//!
222//! ```rust
223//! use apalis_core::task::Task;
224//! use tower::{Layer, Service};
225//! use std::task::{Context, Poll};
226//!
227//! // Define a logging service that wraps another service
228//! pub struct LoggingService<S> {
229//!     inner: S,
230//! }
231//!
232//! impl<S, Req, Res, Err> Service<Task<Req, ()>> for LoggingService<S>
233//! where
234//!     S: Service<Task<Req, ()>, Response = Res, Error = Err>,
235//!     Req: std::fmt::Debug,
236//! {
237//!     type Response = Res;
238//!     type Error = Err;
239//!     type Future = S::Future;
240//!
241//!     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
242//!         self.inner.poll_ready(cx)
243//!     }
244//!
245//!     fn call(&mut self, req: Task<Req, ()>) -> Self::Future {
246//!         println!("Processing task: {:?}", req.args);
247//!         self.inner.call(req)
248//!     }
249//! }
250//!
251//! // Define a layer that wraps services with LoggingService
252//! pub struct LoggingLayer;
253//!
254//! impl<S> Layer<S> for LoggingLayer {
255//!     type Service = LoggingService<S>;
256//!
257//!     fn layer(&self, service: S) -> Self::Service {
258//!         LoggingService { inner: service }
259//!     }
260//! }
261//! ```
262//! </details>
263//!
264//! If you want your middleware to do more than just intercept requests and responses, you can use extension traits. See the [`worker::ext`](crate::worker::ext) module for examples.
265//!
266//! ## Error Handling
267//!
268//! `apalis-core` defines a comprehensive error taxonomy for robust error handling:
269//!
270//! - [`AbortError`] - Non-retryable fatal errors requiring immediate termination
271//! - [`RetryAfterError`] - Retryable execution errors triggering retry mechanisms after a delay
272//! - [`DeferredError`] - Retryable execution errors triggering immediate retry
273//!
274//! This error classification enables precise error handling strategies and
275//! appropriate retry behavior for different failure scenarios.
276//!
277//! ## Graceful Shutdown
278//!
279//! `apalis-core` has a reliable graceful shutdown system that makes sure
280//! workers stop safely and all tasks finish before shutting down:
281//!
282//! **Key Features:**
283//! - Task tracking: Workers keep track of how many tasks are running.
284//! - Shutdown control: The system waits until all tasks are finished before shutting down.
285//! - Monitor coordination: A shared [`Shutdown`] token helps all workers stop together.
286//! - Timeout: You can set a time limit for shutdown using [`with_terminator`](crate::monitor::Monitor::with_terminator).
287//!
288//! Learn more about the graceful shutdown process in the [`monitor`](crate::monitor#graceful-shutdown-with-timeout) module.
289//!
290//! # Feature flags
291#![cfg_attr(
292    feature = "docsrs",
293    cfg_attr(doc, doc = ::document_features::document_features!())
294)]
295//!
296//! # Development
297//! `apalis-core` provides comprehensive extensibility mechanisms such as middleware and ext traits.
298//!  Beyond there one may want to dive deeper into the following topics:
299//!
300//! - [**Using CustomBackend**](crate::backend::custom) - using custom backend to integrate with already existing systems
301//! - [**Implementing Backends**](crate::backend::guide) - implementing the [`Backend`] trait from scratch
302//! - [**Extending Workers using extension traits**](crate::worker::ext#creating-a-custom-worker-extension-trait) - implementing custom worker functionality via extension traits
303//!
304//! # Observability
305//! You can track tasks using [apalis-board](https://github.com/apalis-dev/apalis-board).
306//! ![Task](https://github.com/apalis-dev/apalis-board/raw/master/screenshots/task.png)
307//!
308//! [`Backend`]: crate::backend::Backend
309//! [`TaskFn`]: crate::task_fn::TaskFn
310//! [`Service`]: tower_service::Service
311//! [`Task`]: crate::task
312//! [`WorkerBuilder`]: crate::worker::builder
313//! [`Worker`]: crate::worker
314//! [`Monitor`]: crate::monitor
315//! [`AcknowledgmentLayer`]: crate::worker::ext::ack
316//! [`EventListenerLayer`]: crate::worker::ext::event_listener
317//! [`CircuitBreakerLayer`]: crate::worker::ext::circuit_breaker
318//! [`LongRunningLayer`]: crate::worker::ext::long_running
319//! [`AbortError`]: crate::error::AbortError
320//! [`RetryAfterError`]: crate::error::RetryAfterError
321//! [`DeferredError`]: crate::error::DeferredError
322//! [`WorkerContext`]: crate::worker::context::WorkerContext
323//! [`Event`]: crate::worker::event::Event
324//! [`Parts`]: crate::task::Parts
325//! [`Status`]: crate::task::status::Status
326//! [`TaskId`]: crate::task::task_id::TaskId
327//! [`Attempt`]: crate::task::attempt::Attempt
328//! [`FromRequest`]: crate::task_fn::FromRequest
329//! [`TestWorker`]: crate::worker::test_worker::TestWorker
330//! [`Shutdown`]: crate::monitor::shutdown::Shutdown
331
332pub mod backend;
333/// Includes internal error types.
334pub mod error;
335#[macro_use]
336pub(crate) mod macros;
337/// Actively manage and observe workers
338pub mod monitor;
339pub mod task;
340pub mod task_fn;
341pub mod worker;
342
343/// Layers for building middleware stacks
344pub mod layers {
345    pub use tower_layer::*;
346    pub use tower_service::Service;
347}
348/// Timing and delaying utilities
349#[cfg(feature = "sleep")]
350pub mod timer {
351    pub use futures_timer::Delay;
352    /// Runtime agnostic sleep function based on [Delay]
353    pub async fn sleep(duration: std::time::Duration) {
354        futures_timer::Delay::new(duration).await;
355    }
356}