apalis_core/backend/
mod.rs

1//! Core traits for interacting with backends
2//!
3//! The core traits and types for backends, responsible for providing sources of tasks, handling their lifecycle, and exposing middleware for internal processing.
4//! The traits here abstract over different backend implementations, allowing for extensibility and interoperability.
5//!
6//! # Overview
7//! - [`Backend`]: The primary trait representing a task source, defining methods for polling tasks, heartbeats, and middleware.
8//! - [`TaskSink`]: An extension trait for backends that support pushing tasks.
9//! - [`FetchById`], [`Update`], [`Reschedule`]: Additional traits for managing tasks.
10//! - [`Vacuum`], [`ResumeById`], [`ResumeAbandoned`]: Traits for backend maintenance and task recovery.
11//! - [`RegisterWorker`], [`ListWorkers`], [`ListTasks`]: Traits for worker management and task listing.
12//! - [`WaitForCompletion`]: A trait for waiting on task completion and checking their status.
13//!
14//!
15//! ## Default Implementations
16//!
17//! The module includes several default backend implementations, such as:
18//! - [`MemoryStorage`](memory::MemoryStorage): An in-memory backend for testing and lightweight use cases
19//! - [`Pipe`](pipe::Pipe): A simple pipe-based backend for inter-thread communication
20//! - [`CustomBackend`](custom::CustomBackend): A flexible backend allowing custom functions for task management
21use std::{future::Future, time::Duration};
22
23use futures_util::{
24    Stream,
25    stream::{BoxStream},
26};
27
28use crate::{
29    backend::codec::Codec,
30    error::BoxDynError,
31    task::{Task, status::Status, task_id::TaskId},
32    worker::context::WorkerContext,
33};
34
35pub mod codec;
36pub mod custom;
37pub mod pipe;
38pub mod poll_strategy;
39pub mod queue;
40pub mod shared;
41
42mod config;
43mod expose;
44mod impls;
45mod sink;
46
47pub use sink::*;
48pub use expose::*;
49
50pub use impls::guide;
51
52pub use config::ConfigExt;
53
54/// In-memory backend based on channels
55pub mod memory {
56    pub use crate::backend::impls::memory::*;
57}
58
59/// File based Backend using JSON
60#[cfg(feature = "json")]
61pub mod json {
62    pub use crate::backend::impls::json::*;
63}
64
65/// The `Backend` trait defines how workers get and manage tasks from a backend.
66///
67/// In other languages, this might be called a "Queue", "Broker", etc.
68pub trait Backend {
69    /// The type of arguments the backend handles.
70    type Args;
71    /// The type used to uniquely identify tasks.
72    type IdType: Clone;
73    /// Context associated with each task.
74    type Context: Default;
75    /// The error type returned by backend operations
76    type Error;
77    /// The codec used for serialization/deserialization of tasks.
78    type Codec: Codec<Self::Args, Compact = Self::Compact>;
79
80    /// The compact representation of task arguments.
81    type Compact;
82
83    /// A stream of tasks provided by the backend.
84    type Stream: Stream<
85        Item = Result<Option<Task<Self::Args, Self::Context, Self::IdType>>, Self::Error>,
86    >;
87    /// A stream representing heartbeat signals.
88    type Beat: Stream<Item = Result<(), Self::Error>>;
89    /// The type representing backend middleware layer.
90    type Layer;
91
92    /// Returns a heartbeat stream for the given worker.
93    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat;
94    /// Returns the backend's middleware layer.
95    fn middleware(&self) -> Self::Layer;
96    /// Polls the backend for tasks for the given worker.
97    fn poll(self, worker: &WorkerContext) -> Self::Stream;
98}
99
100/// Represents a stream for T.
101pub type TaskStream<T, E = BoxDynError> = BoxStream<'static, Result<Option<T>, E>>;
102/// Allows fetching a task by its ID
103pub trait FetchById<Args>: Backend {
104    /// Fetch a task by its unique identifier
105    fn fetch_by_id(
106        &mut self,
107        task_id: &TaskId<Self::IdType>,
108    ) -> impl Future<Output = Result<Option<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
109}
110
111/// Allows updating an existing task
112pub trait Update: Backend {
113    /// Update the given task
114    fn update(
115        &mut self,
116        task: Task<Self::Args, Self::Context, Self::IdType>,
117    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
118}
119
120/// Allows rescheduling a task for later execution
121pub trait Reschedule: Backend {
122    /// Reschedule the task after a specified duration
123    fn reschedule(
124        &mut self,
125        task: Task<Self::Args, Self::Context, Self::IdType>,
126        wait: Duration,
127    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
128}
129
130/// Allows cleaning up resources in the backend
131pub trait Vacuum: Backend {
132    /// Cleans up resources and returns the number of items vacuumed
133    fn vacuum(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
134}
135
136/// Allows resuming a task by its ID
137pub trait ResumeById: Backend {
138    /// Resume a task by its ID
139    fn resume_by_id(
140        &mut self,
141        id: TaskId<Self::IdType>,
142    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
143}
144
145/// Allows fetching multiple tasks by their IDs
146pub trait ResumeAbandoned: Backend {
147    /// Resume all abandoned tasks
148    fn resume_abandoned(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
149}
150
151/// Allows registering a worker with the backend
152pub trait RegisterWorker: Backend {
153    /// Registers a worker
154    fn register_worker(
155        &mut self,
156        worker_id: String,
157    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
158}
159
160/// Represents the result of a task execution
161#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
162#[derive(Debug, Clone)]
163pub struct TaskResult<T> {
164    task_id: TaskId,
165    status: Status,
166    result: Result<T, String>,
167}
168
169impl<T> TaskResult<T> {
170    /// Create a new TaskResult
171    pub fn new(task_id: TaskId, status: Status, result: Result<T, String>) -> Self {
172        Self {
173            task_id,
174            status,
175            result,
176        }
177    }
178    /// Get the ID of the task
179    pub fn task_id(&self) -> &TaskId {
180        &self.task_id
181    }
182
183    /// Get the status of the task
184    pub fn status(&self) -> &Status {
185        &self.status
186    }
187
188    /// Get the result of the task
189    pub fn result(&self) -> &Result<T, String> {
190        &self.result
191    }
192
193    /// Take the result of the task
194    pub fn take(self) -> Result<T, String> {
195        self.result
196    }
197}
198
199/// Allows waiting for tasks to complete and checking their status
200pub trait WaitForCompletion<T>: Backend {
201    /// The result stream type yielding task results
202    type ResultStream: Stream<Item = Result<TaskResult<T>, Self::Error>> + Send + 'static;
203
204    /// Wait for multiple tasks to complete, yielding results as they become available
205    fn wait_for(
206        &self,
207        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
208    ) -> Self::ResultStream;
209
210    /// Wait for a single task to complete, yielding its result
211    fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream {
212        self.wait_for(std::iter::once(task_id))
213    }
214
215    /// Check current status of tasks without waiting
216    fn check_status(
217        &self,
218        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
219    ) -> impl Future<Output = Result<Vec<TaskResult<T>>, Self::Error>> + Send;
220}