apalis_core/backend/
mod.rs

1//! Core traits for interacting with backends
2//!
3//! The core traits and types for backends, responsible for providing sources of tasks, handling their lifecycle, and exposing middleware for internal processing.
4//! The traits here abstract over different backend implementations, allowing for extensibility and interoperability.
5//!
6//! # Overview
7//! - [`Backend`]: The primary trait representing a task source, defining methods for polling tasks, heartbeats, and middleware.
8//! - [`TaskSink`]: An extension trait for backends that support pushing tasks.
9//! - [`FetchById`], [`Update`], [`Reschedule`]: Additional traits for managing tasks.
10//! - [`Vacuum`], [`ResumeById`], [`ResumeAbandoned`]: Traits for backend maintenance and task recovery.
11//! - [`RegisterWorker`], [`ListWorkers`], [`ListTasks`]: Traits for worker management and task listing.
12//! - [`WaitForCompletion`]: A trait for waiting on task completion and checking their status.
13//!
14//!
15//! ## Default Implementations
16//!
17//! The module includes several default backend implementations, such as:
18//! - [`MemoryStorage`](memory::MemoryStorage): An in-memory backend for testing and lightweight use cases
19//! - [`Pipe`](pipe::Pipe): A simple pipe-based backend for inter-thread communication
20//! - [`CustomBackend`](custom::CustomBackend): A flexible backend allowing custom functions for task management
21use std::{future::Future, time::Duration};
22
23use futures_util::{Stream, stream::BoxStream};
24
25use crate::{
26    backend::codec::Codec,
27    error::BoxDynError,
28    task::{Task, status::Status, task_id::TaskId},
29    worker::context::WorkerContext,
30};
31
32pub mod codec;
33pub mod custom;
34pub mod pipe;
35pub mod poll_strategy;
36pub mod queue;
37pub mod shared;
38
39mod config;
40mod expose;
41mod impls;
42mod sink;
43
44pub use expose::*;
45pub use sink::*;
46
47pub use impls::guide;
48
49pub use config::ConfigExt;
50
51/// In-memory backend based on channels
52pub mod memory {
53    pub use crate::backend::impls::memory::*;
54}
55
56/// File based Backend using JSON
57#[cfg(feature = "json")]
58pub mod json {
59    pub use crate::backend::impls::json::*;
60}
61
62/// The `Backend` trait defines how workers get and manage tasks from a backend.
63///
64/// In other languages, this might be called a "Queue", "Broker", etc.
65pub trait Backend {
66    /// The type of arguments the backend handles.
67    type Args;
68    /// The type used to uniquely identify tasks.
69    type IdType: Clone;
70    /// Context associated with each task.
71    type Context: Default;
72    /// The error type returned by backend operations
73    type Error;
74    /// The codec used for serialization/deserialization of tasks.
75    type Codec: Codec<Self::Args, Compact = Self::Compact>;
76
77    /// The compact representation of task arguments.
78    type Compact;
79
80    /// A stream of tasks provided by the backend.
81    type Stream: Stream<
82        Item = Result<Option<Task<Self::Args, Self::Context, Self::IdType>>, Self::Error>,
83    >;
84    /// A stream representing heartbeat signals.
85    type Beat: Stream<Item = Result<(), Self::Error>>;
86    /// The type representing backend middleware layer.
87    type Layer;
88
89    /// Returns a heartbeat stream for the given worker.
90    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat;
91    /// Returns the backend's middleware layer.
92    fn middleware(&self) -> Self::Layer;
93    /// Polls the backend for tasks for the given worker.
94    fn poll(self, worker: &WorkerContext) -> Self::Stream;
95}
96
97/// Represents a stream for T.
98pub type TaskStream<T, E = BoxDynError> = BoxStream<'static, Result<Option<T>, E>>;
99/// Allows fetching a task by its ID
100pub trait FetchById<Args>: Backend {
101    /// Fetch a task by its unique identifier
102    fn fetch_by_id(
103        &mut self,
104        task_id: &TaskId<Self::IdType>,
105    ) -> impl Future<Output = Result<Option<Task<Args, Self::Context, Self::IdType>>, Self::Error>> + Send;
106}
107
108/// Allows updating an existing task
109pub trait Update: Backend {
110    /// Update the given task
111    fn update(
112        &mut self,
113        task: Task<Self::Args, Self::Context, Self::IdType>,
114    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
115}
116
117/// Allows rescheduling a task for later execution
118pub trait Reschedule: Backend {
119    /// Reschedule the task after a specified duration
120    fn reschedule(
121        &mut self,
122        task: Task<Self::Args, Self::Context, Self::IdType>,
123        wait: Duration,
124    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
125}
126
127/// Allows cleaning up resources in the backend
128pub trait Vacuum: Backend {
129    /// Cleans up resources and returns the number of items vacuumed
130    fn vacuum(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
131}
132
133/// Allows resuming a task by its ID
134pub trait ResumeById: Backend {
135    /// Resume a task by its ID
136    fn resume_by_id(
137        &mut self,
138        id: TaskId<Self::IdType>,
139    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
140}
141
142/// Allows fetching multiple tasks by their IDs
143pub trait ResumeAbandoned: Backend {
144    /// Resume all abandoned tasks
145    fn resume_abandoned(&mut self) -> impl Future<Output = Result<usize, Self::Error>> + Send;
146}
147
148/// Allows registering a worker with the backend
149pub trait RegisterWorker: Backend {
150    /// Registers a worker
151    fn register_worker(
152        &mut self,
153        worker_id: String,
154    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
155}
156
157/// Represents the result of a task execution
158#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
159#[derive(Debug, Clone)]
160pub struct TaskResult<T> {
161    task_id: TaskId,
162    status: Status,
163    result: Result<T, String>,
164}
165
166impl<T> TaskResult<T> {
167    /// Create a new TaskResult
168    pub fn new(task_id: TaskId, status: Status, result: Result<T, String>) -> Self {
169        Self {
170            task_id,
171            status,
172            result,
173        }
174    }
175    /// Get the ID of the task
176    pub fn task_id(&self) -> &TaskId {
177        &self.task_id
178    }
179
180    /// Get the status of the task
181    pub fn status(&self) -> &Status {
182        &self.status
183    }
184
185    /// Get the result of the task
186    pub fn result(&self) -> &Result<T, String> {
187        &self.result
188    }
189
190    /// Take the result of the task
191    pub fn take(self) -> Result<T, String> {
192        self.result
193    }
194}
195
196/// Allows waiting for tasks to complete and checking their status
197pub trait WaitForCompletion<T>: Backend {
198    /// The result stream type yielding task results
199    type ResultStream: Stream<Item = Result<TaskResult<T>, Self::Error>> + Send + 'static;
200
201    /// Wait for multiple tasks to complete, yielding results as they become available
202    fn wait_for(
203        &self,
204        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>>,
205    ) -> Self::ResultStream;
206
207    /// Wait for a single task to complete, yielding its result
208    fn wait_for_single(&self, task_id: TaskId<Self::IdType>) -> Self::ResultStream {
209        self.wait_for(std::iter::once(task_id))
210    }
211
212    /// Check current status of tasks without waiting
213    fn check_status(
214        &self,
215        task_ids: impl IntoIterator<Item = TaskId<Self::IdType>> + Send,
216    ) -> impl Future<Output = Result<Vec<TaskResult<T>>, Self::Error>> + Send;
217}