apalis_core/backend/
mod.rs

1//! Core traits for interacting with backends
2//!
3//! The core traits and types for backends, responsible for providing sources of tasks, handling their lifecycle, and exposing middleware for internal processing.
4//! The traits here abstract over different backend implementations, allowing for extensibility and interoperability.
5//!
6//! # Overview
7//! - [`Backend`]: The primary trait representing a task source, defining methods for polling tasks, heartbeats, and middleware.
8//! - [`TaskSink`]: An extension trait for backends that support pushing tasks.
9//! - [`FetchById`], [`Update`], [`Reschedule`]: Additional traits for managing tasks.
10//! - [`Vacuum`], [`ResumeById`], [`ResumeAbandoned`]: Traits for backend maintenance and task recovery.
11//! - [`RegisterWorker`], [`ListWorkers`], [`ListTasks`]: Traits for worker management and task listing.
12//! - [`WaitForCompletion`]: A trait for waiting on task completion and checking their status.
13//!
14//!
15//! ## Default Implementations
16//!
17//! The module includes several default backend implementations, such as:
18//! - [`MemoryStorage`](memory::MemoryStorage): An in-memory backend for testing and lightweight use cases
19//! - [`Pipe`](pipe::Pipe): A simple pipe-based backend for inter-thread communication
20//! - [`CustomBackend`](custom::CustomBackend): A flexible backend allowing custom functions for task management
21use std::{future::Future, time::Duration};
22
23use futures_util::{Stream, stream::BoxStream};
24
25use crate::{
26    backend::{codec::Codec, queue::Queue},
27    error::BoxDynError,
28    task::{Task, status::Status, task_id::TaskId},
29    worker::context::WorkerContext,
30};
31
32pub mod codec;
33pub mod custom;
34pub mod pipe;
35pub mod poll_strategy;
36pub mod queue;
37pub mod shared;
38
39mod expose;
40mod impls;
41mod sink;
42
43pub use expose::*;
44pub use sink::*;
45
46pub use impls::guide;
47
48/// In-memory backend based on channels
49pub mod memory {
50    pub use crate::backend::impls::memory::*;
51}
52
53/// In-memory dequeue backend
54#[cfg(feature = "sleep")]
55pub mod dequeue {
56    pub use crate::backend::impls::dequeue::*;
57}
58
59/// The `Backend` trait defines how workers get and manage tasks from a backend.
60///
61/// In other languages, this might be called a "Queue", "Broker", etc.
62pub trait Backend {
63    /// The type of arguments the backend handles.
64    type Args;
65    /// The type used to uniquely identify tasks.
66    type IdType: Clone;
67    /// Context associated with each task.
68    type Context: Default;
69    /// The error type returned by backend operations
70    type Error;
71    /// A stream of tasks provided by the backend.
72    type Stream: Stream<
73        Item = Result<Option<Task<Self::Args, Self::Context, Self::IdType>>, Self::Error>,
74    >;
75    /// A stream representing heartbeat signals.
76    type Beat: Stream<Item = Result<(), Self::Error>>;
77    /// The type representing backend middleware layer.
78    type Layer;
79
80    /// Returns a heartbeat stream for the given worker.
81    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat;
82    /// Returns the backend's middleware layer.
83    fn middleware(&self) -> Self::Layer;
84    /// Polls the backend for tasks for the given worker.
85    fn poll(self, worker: &WorkerContext) -> Self::Stream;
86}
87
88/// Defines the encoding/serialization aspects of a backend.
89pub trait BackendExt: Backend {
90    /// The codec used for serialization/deserialization of tasks.
91    type Codec: Codec<Self::Args, Compact = Self::Compact>;
92    /// The compact representation of task arguments.
93    type Compact;
94    /// A stream of encoded tasks provided by the backend.
95    type CompactStream: Stream<
96        Item = Result<Option<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
97    >;
98
99    /// Returns the queue associated with the backend.
100    fn get_queue(&self) -> Queue;
101
102    /// Polls the backend for encoded tasks for the given worker.
103    fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream;
104}
105
106/// Represents a stream for T.
107pub type TaskStream<T, E = BoxDynError> = BoxStream<'static, Result<Option<T>, E>>;
108/// Allows fetching a task by its ID
109pub trait FetchById<Args>: Backend {
110    /// Fetch a task by its unique identifier
111    #[allow(clippy::type_complexity)]
112    fn fetch_by_id(
113        &mut self,
114        task_id: &TaskId<Self::IdType>,
115    ) -> impl Future<Output = Result<Option<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
116}
117
118/// Allows updating an existing task
119pub trait Update: Backend {
120    /// Update the given task
121    fn update(
122        &mut self,
123        task: Task<Self::Args, Self::Context, Self::IdType>,
124    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
125}
126
127/// Allows rescheduling a task for later execution
128pub trait Reschedule: Backend {
129    /// Reschedule the task after a specified duration
130    fn reschedule(
131        &mut self,
132        task: Task<Self::Args, Self::Context, Self::IdType>,
133        wait: Duration,
134    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
135}
136
137/// Allows cleaning up resources in the backend
138pub trait Vacuum: Backend {
139    /// Cleans up resources and returns the number of items vacuumed
140    fn vacuum(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
141}
142
143/// Allows resuming a task by its ID
144pub trait ResumeById: Backend {
145    /// Resume a task by its ID
146    fn resume_by_id(
147        &mut self,
148        id: TaskId<Self::IdType>,
149    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
150}
151
152/// Allows fetching multiple tasks by their IDs
153pub trait ResumeAbandoned: Backend {
154    /// Resume all abandoned tasks
155    fn resume_abandoned(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
156}
157
158/// Allows registering a worker with the backend
159pub trait RegisterWorker: Backend {
160    /// Registers a worker
161    fn register_worker(
162        &mut self,
163        worker_id: String,
164    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
165}
166
167/// Represents the result of a task execution
168#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
169#[derive(Debug, Clone)]
170pub struct TaskResult<T, IdType> {
171    /// The unique identifier of the task
172    pub task_id: TaskId<IdType>,
173    /// The status of the task
174    pub status: Status,
175    /// The result of the task execution
176    pub result: Result<T, String>,
177}
178
179impl<T, IdType> TaskResult<T, IdType> {
180    /// Create a new TaskResult
181    pub fn new(task_id: TaskId<IdType>, status: Status, result: Result<T, String>) -> Self {
182        Self {
183            task_id,
184            status,
185            result,
186        }
187    }
188    /// Get the ID of the task
189    pub fn task_id(&self) -> &TaskId<IdType> {
190        &self.task_id
191    }
192
193    /// Get the status of the task
194    pub fn status(&self) -> &Status {
195        &self.status
196    }
197
198    /// Get the result of the task
199    pub fn result(&self) -> &Result<T, String> {
200        &self.result
201    }
202
203    /// Take the result of the task
204    pub fn take(self) -> Result<T, String> {
205        self.result
206    }
207}
208
209/// Allows waiting for tasks to complete and checking their status
210pub trait WaitForCompletion<T>: Backend {
211    /// The result stream type yielding task results
212    type ResultStream: Stream<Item = Result<TaskResult<T, Self::IdType>, Self::Error>>
213        + Send
214        + 'static;
215
216    /// Wait for multiple tasks to complete, yielding results as they become available
217    fn wait_for(
218        &self,
219        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
220    ) -> Self::ResultStream;
221
222    /// Wait for a single task to complete, yielding its result
223    fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream {
224        self.wait_for(std::iter::once(task_id))
225    }
226
227    /// Check current status of tasks without waiting
228    fn check_status(
229        &self,
230        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
231    ) -> impl Future<Output = Result<Vec<TaskResult<T, Self::IdType>>, Self::Error>> + Send;
232}