apalis_core/backend/
mod.rs

1//! Core traits for interacting with backends
2//!
3//! The core traits and types for backends, responsible for providing sources of tasks, handling their lifecycle, and exposing middleware for internal processing.
4//! The traits here abstract over different backend implementations, allowing for extensibility and interoperability.
5//!
6//! # Overview
7//! - [`Backend`]: The primary trait representing a task source, defining methods for polling tasks, heartbeats, and middleware.
8//! - [`TaskSink`]: An extension trait for backends that support pushing tasks.
9//! - [`FetchById`], [`Update`], [`Reschedule`]: Additional traits for managing tasks.
10//! - [`Vacuum`], [`ResumeById`], [`ResumeAbandoned`]: Traits for backend maintenance and task recovery.
11//! - [`RegisterWorker`], [`ListWorkers`], [`ListTasks`]: Traits for worker management and task listing.
12//! - [`WaitForCompletion`]: A trait for waiting on task completion and checking their status.
13//!
14//!
15//! ## Default Implementations
16//!
17//! The module includes several default backend implementations, such as:
18//! - [`MemoryStorage`](memory::MemoryStorage): An in-memory backend for testing and lightweight use cases
19//! - [`Pipe`](pipe::Pipe): A simple pipe-based backend for inter-thread communication
20//! - [`CustomBackend`](custom::CustomBackend): A flexible backend allowing custom functions for task management
21use std::{future::Future, time::Duration};
22
23use futures_sink::Sink;
24use futures_util::{
25    stream::{self, BoxStream},
26    Stream,
27};
28
29use crate::{
30    error::BoxDynError,
31    task::{status::Status, task_id::TaskId, Task},
32    worker::context::WorkerContext,
33};
34
35pub mod codec;
36pub mod custom;
37pub mod pipe;
38pub mod shared;
39
40mod impls;
41
42pub use impls::guide;
43
44/// In-memory backend based on channels
45pub mod memory {
46    pub use crate::backend::impls::memory::*;
47}
48
49/// File based Backend using JSON
50#[cfg(feature = "json")]
51pub mod json {
52    pub use crate::backend::impls::json::*;
53}
54
55/// The `Backend` trait defines how workers get and manage tasks from a backend.
56///
57/// In other languages, this might be called a "Queue", "Broker", etc.
58pub trait Backend<Args> {
59    /// The type used to uniquely identify tasks.
60    type IdType: Clone;
61    /// Context associated with each task.
62    type Context: Default;
63    /// The error type returned by backend operations
64    type Error;
65    /// The codec used for serialization/deserialization of tasks.
66    type Codec;
67
68    /// A stream of tasks provided by the backend.
69    type Stream: Stream<Item = Result<Option<Task<Args, Self::Context, Self::IdType>>, Self::Error>>;
70    /// A stream representing heartbeat signals.
71    type Beat: Stream<Item = Result<(), Self::Error>>;
72    /// The type representing backend middleware layer.
73    type Layer;
74
75    /// Returns a heartbeat stream for the given worker.
76    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat;
77    /// Returns the backend's middleware layer.
78    fn middleware(&self) -> Self::Layer;
79    /// Polls the backend for tasks for the given worker.
80    fn poll(self, worker: &WorkerContext) -> Self::Stream;
81}
82
83/// Represents a stream for T.
84pub type TaskStream<T, E = BoxDynError> = BoxStream<'static, Result<Option<T>, E>>;
85
86/// Extends Backend to allow pushing tasks into the backend
87pub trait TaskSink<Args>: Backend<Args> {
88    /// Allows pushing a single task into the backend
89    fn push(&mut self, task: Args) -> impl Future<Output = Result<(), Self::Error>> + Send;
90
91    /// Allows pushing multiple tasks into the backend in bulk
92    fn push_bulk(
93        &mut self,
94        tasks: Vec<Args>,
95    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
96
97    /// Allows pushing tasks from a stream into the backend
98    fn push_stream(
99        &mut self,
100        tasks: impl Stream<Item = Args> + Unpin + Send,
101    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
102
103    /// Allows pushing a fully constructed task into the backend
104    fn push_task(
105        &mut self,
106        task: Task<Args, Self::Context, Self::IdType>,
107    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
108}
109
110impl<Args, S, E> TaskSink<Args> for S
111where
112    S: Sink<Task<Args, Self::Context, Self::IdType>, Error = E>
113        + Unpin
114        + Backend<Args, Error = E>
115        + Send,
116    Args: Send,
117    S::Context: Send + Default,
118    S::IdType: Send + 'static,
119    E: Send,
120{
121    async fn push(&mut self, task: Args) -> Result<(), Self::Error> {
122        use futures_util::SinkExt;
123        self.send(Task::new(task)).await
124    }
125
126    async fn push_bulk(&mut self, tasks: Vec<Args>) -> Result<(), Self::Error> {
127        use futures_util::SinkExt;
128        self.send_all(&mut stream::iter(
129            tasks
130                .into_iter()
131                .map(Task::new)
132                .map(Result::Ok)
133                .collect::<Vec<_>>(),
134        ))
135        .await
136    }
137
138    async fn push_stream(
139        &mut self,
140        tasks: impl Stream<Item = Args> + Unpin + Send,
141    ) -> Result<(), Self::Error> {
142        use futures_util::SinkExt;
143        use futures_util::StreamExt;
144        self.send_all(&mut tasks.map(Task::new).map(Result::Ok))
145            .await
146    }
147
148    async fn push_task(
149        &mut self,
150        task: Task<Args, Self::Context, Self::IdType>,
151    ) -> Result<(), Self::Error> {
152        use futures_util::SinkExt;
153        self.send(task).await
154    }
155}
156
157/// Allows fetching a task by its ID
158pub trait FetchById<Args>: Backend<Args> {
159    /// Fetch a task by its unique identifier
160    fn fetch_by_id(
161        &mut self,
162        task_id: &TaskId<Self::IdType>,
163    ) -> impl Future<Output = Result<Option<Task<Args, Self::Context>>, Self::Error>> + Send;
164}
165
166/// Allows updating an existing task
167pub trait Update<Args>: Backend<Args> {
168    /// Update the given task
169    fn update(
170        &mut self,
171        task: Task<Args, Self::Context, Self::IdType>,
172    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
173}
174
175/// Allows rescheduling a task for later execution
176pub trait Reschedule<Args>: Backend<Args> {
177    /// Reschedule the task after a specified duration
178    fn reschedule(
179        &mut self,
180        task: Task<Args, Self::Context, Self::IdType>,
181        wait: Duration,
182    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
183}
184
185/// Allows cleaning up resources in the backend
186pub trait Vacuum {
187    /// The error type returned by vacuum operations
188    type Error;
189    /// Cleans up resources and returns the number of items vacuumed
190    fn vacuum(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
191}
192
193/// Allows resuming a task by its ID
194pub trait ResumeById<Args>: Backend<Args> {
195    /// Resume a task by its ID
196    fn resume_by_id(
197        &mut self,
198        id: TaskId<Self::IdType>,
199    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
200}
201
202/// Allows fetching multiple tasks by their IDs
203pub trait ResumeAbandoned<Args>: Backend<Args> {
204    /// Resume all abandoned tasks
205    fn resume_abandoned(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
206}
207
208/// Allows registering a worker with the backend
209pub trait RegisterWorker<Args>: Backend<Args> {
210    /// Registers a worker
211    fn register_worker(
212        &mut self,
213        worker_id: String,
214    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
215}
216
217/// Allows collecting metrics from the backend
218pub trait Metric<Output> {
219    /// The error type returned by metric operations
220    type Error;
221    /// Collects and returns backend metrics
222    fn metric(&mut self) -> impl Future<Output = Result<Output, Self::Error>> + Send;
223}
224
225/// Allows listing all workers registered with the backend
226pub trait ListWorkers<Args>: Backend<Args> {
227    /// The type representing a worker
228    type Worker;
229    /// List all registered workers
230    fn list_workers(&self) -> impl Future<Output = Result<Vec<Self::Worker>, Self::Error>> + Send;
231}
232
233/// Allows listing tasks with optional filtering
234pub trait ListTasks<Args>: Backend<Args> {
235    /// The type representing a filter for tasks
236    type Filter;
237
238    /// List tasks matching the given filter
239    fn list_tasks(
240        &self,
241        filter: &Self::Filter,
242    ) -> impl Future<Output = Result<Vec<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
243}
244
245/// Represents the result of a task execution
246#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
247#[derive(Debug, Clone)]
248pub struct TaskResult<T> {
249    task_id: TaskId,
250    status: Status,
251    result: Result<T, String>,
252}
253
254impl<T> TaskResult<T> {
255    /// Create a new TaskResult
256    pub fn new(task_id: TaskId, status: Status, result: Result<T, String>) -> Self {
257        Self {
258            task_id,
259            status,
260            result,
261        }
262    }
263    /// Get the ID of the task
264    pub fn task_id(&self) -> &TaskId {
265        &self.task_id
266    }
267
268    /// Get the status of the task
269    pub fn status(&self) -> &Status {
270        &self.status
271    }
272
273    /// Get the result of the task
274    pub fn result(&self) -> &Result<T, String> {
275        &self.result
276    }
277
278    /// Take the result of the task
279    pub fn take(self) -> Result<T, String> {
280        self.result
281    }
282}
283
284/// Allows waiting for tasks to complete and checking their status
285pub trait WaitForCompletion<T, Args>: Backend<Args> {
286    /// The result stream type yielding task results
287    type ResultStream: Stream<Item = Result<TaskResult<T>, Self::Error>> + Send + 'static;
288
289    /// Wait for multiple tasks to complete, yielding results as they become available
290    fn wait_for(
291        &self,
292        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
293    ) -> Self::ResultStream;
294
295    /// Wait for a single task to complete, yielding its result
296    fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream {
297        self.wait_for(std::iter::once(task_id))
298    }
299
300    /// Check current status of tasks without waiting
301    fn check_status(
302        &self,
303        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
304    ) -> impl Future<Output = Result<Vec<TaskResult<T>>, Self::Error>> + Send;
305}