apalis_core/backend/impls/
memory.rs

1//! # In-memory backend based on channels
2//!
3//! An in-memory backend suitable for testing, prototyping, or lightweight task processing scenarios where persistence is not required.
4//!
5//! ## Features
6//! - Generic in-memory queue for any task type.
7//! - Implements [`Backend`] for integration with workers.
8//! - Sink support: Ability to push new tasks.
9//!
10//! A detailed feature list can be found in the [capabilities](crate::backend::memory::MemoryStorage#capabilities) section.
11//!
12//! ## Example
13//!
14//! ```rust
15//! # use apalis_core::backend::memory::MemoryStorage;
16//! # use apalis_core::worker::context::WorkerContext;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use apalis_core::backend::TaskSink;
19//! # async fn task(_: u32, ctx: WorkerContext) { ctx.stop().unwrap();}
20//! #[tokio::main]
21//! async fn main() {
22//!     let mut store = MemoryStorage::new();
23//!     store.push(42).await.unwrap();
24//!
25//!     let worker = WorkerBuilder::new("int-worker")
26//!         .backend(store)
27//!         .build(task);
28//!
29//!     worker.run().await.unwrap();
30//! }
31//! ```
32//!
33//! ## Note
34//! This backend is not persistent and is intended for use cases where durability is not required.
35//! For production workloads, consider using a persistent backend such as PostgreSQL or Redis.
36//!
37//! ## See Also
38//! - [`Backend`]
39//! - [`WorkerContext`]
40use crate::backend::BackendExt;
41use crate::backend::codec::IdentityCodec;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45    backend::{Backend, TaskStream},
46    task::{
47        Task,
48        task_id::{RandomId, TaskId},
49    },
50    worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::{
56    FutureExt, SinkExt, Stream, StreamExt,
57    stream::{self, BoxStream},
58};
59use std::{
60    pin::Pin,
61    sync::Arc,
62    task::{Context, Poll},
63};
64use tower_layer::Identity;
65
66/// A boxed in-memory task receiver stream
67pub type BoxedReceiver<Args, Ctx> = Pin<Box<dyn Stream<Item = Task<Args, Ctx, RandomId>> + Send>>;
68
69/// In-memory queue that is based on channels
70///
71///
72/// ## Example
73/// ```rust
74/// # use apalis_core::backend::memory::MemoryStorage;
75/// # fn setup() -> MemoryStorage<u32> {
76/// let mut backend = MemoryStorage::new();
77/// # backend
78/// # }
79/// ```
80///
81#[doc = features_table! {
82    setup = r#"
83        # {
84        #   use apalis_core::backend::memory::MemoryStorage;
85        #   MemoryStorage::new()
86        # };
87    "#,
88    Backend => supported("Basic Backend functionality", true),
89    TaskSink => supported("Ability to push new tasks", true),
90    Serialization => not_supported("Serialization support for arguments"),
91
92    PipeExt => not_implemented("Allow other backends to pipe to this backend"),
93    MakeShared => not_supported("Share the same storage across multiple workers"),
94
95    Update => not_supported("Allow updating a task"),
96    FetchById => not_supported("Allow fetching a task by its ID"),
97    Reschedule => not_supported("Reschedule a task"),
98
99    ResumeById => not_supported("Resume a task by its ID"),
100    ResumeAbandoned => not_supported("Resume abandoned tasks"),
101    Vacuum => not_supported("Vacuum the task storage"),
102
103    Workflow => not_implemented("Flexible enough to support workflows"),
104    WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), // Requires Clone
105
106    RegisterWorker => not_supported("Allow registering a worker with the backend"),
107    ListWorkers => not_supported("List all workers registered with the backend"),
108    ListTasks => not_supported("List all tasks in the backend"),
109}]
110pub struct MemoryStorage<Args, Ctx = Extensions> {
111    pub(super) sender: MemorySink<Args, Ctx>,
112    pub(super) receiver: BoxedReceiver<Args, Ctx>,
113}
114
115impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
122    /// Create a new in-memory storage
123    #[must_use]
124    pub fn new() -> Self {
125        let (sender, receiver) = unbounded();
126        let sender = Box::new(sender)
127            as Box<
128                dyn Sink<Task<Args, Extensions, RandomId>, Error = SendError> + Send + Sync + Unpin,
129            >;
130        Self {
131            sender: MemorySink {
132                inner: Arc::new(futures_util::lock::Mutex::new(sender)),
133            },
134            receiver: receiver.boxed(),
135        }
136    }
137}
138
139impl<Args: Send + 'static, Ctx> MemoryStorage<Args, Ctx> {
140    /// Create a storage given a sender and receiver
141    #[must_use]
142    pub fn new_with(sender: MemorySink<Args, Ctx>, receiver: BoxedReceiver<Args, Ctx>) -> Self {
143        Self { sender, receiver }
144    }
145}
146
147impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
148    type Error = SendError;
149
150    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151        self.as_mut().sender.poll_ready_unpin(cx)
152    }
153
154    fn start_send(
155        mut self: Pin<&mut Self>,
156        item: Task<Args, Ctx, RandomId>,
157    ) -> Result<(), Self::Error> {
158        self.as_mut().sender.start_send_unpin(item)
159    }
160
161    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162        self.as_mut().sender.poll_flush_unpin(cx)
163    }
164
165    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
166        self.as_mut().sender.poll_close_unpin(cx)
167    }
168}
169
170type ArcMemorySink<Args, Ctx = Extensions> = Arc<
171    futures_util::lock::Mutex<
172        Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
173    >,
174>;
175
176/// Memory sink for sending tasks to the in-memory backend
177pub struct MemorySink<Args, Ctx = Extensions> {
178    pub(super) inner: ArcMemorySink<Args, Ctx>,
179}
180
181impl<Args, Ctx> MemorySink<Args, Ctx> {
182    /// Build a new memory sink given a sink
183    pub fn new(sink: ArcMemorySink<Args, Ctx>) -> Self {
184        Self { inner: sink }
185    }
186}
187
188impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("MemorySink")
191            .field("inner", &"<Sink>")
192            .finish()
193    }
194}
195
196impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
197    fn clone(&self) -> Self {
198        Self {
199            inner: Arc::clone(&self.inner),
200        }
201    }
202}
203
204impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
205    type Error = SendError;
206
207    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
209        Pin::new(&mut *lock).poll_ready_unpin(cx)
210    }
211
212    fn start_send(
213        self: Pin<&mut Self>,
214        mut item: Task<Args, Ctx, RandomId>,
215    ) -> Result<(), Self::Error> {
216        let mut lock = self.inner.try_lock().unwrap();
217        // Ensure task has id
218        item.parts
219            .task_id
220            .get_or_insert_with(|| TaskId::new(RandomId::default()));
221        Pin::new(&mut *lock).start_send_unpin(item)
222    }
223
224    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
225        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
226        Pin::new(&mut *lock).poll_flush_unpin(cx)
227    }
228
229    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
230        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
231        Pin::new(&mut *lock).poll_close_unpin(cx)
232    }
233}
234
235impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237        f.debug_struct("MemoryStorage")
238            .field("sender", &self.sender)
239            .field("receiver", &"<Stream>")
240            .finish()
241    }
242}
243
244impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
245    type Item = Task<Args, Ctx, RandomId>;
246
247    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
248        self.receiver.poll_next_unpin(cx)
249    }
250}
251
252// MemoryStorage as a Backend
253impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
254    type Args = Args;
255    type IdType = RandomId;
256
257    type Context = Ctx;
258
259    type Error = SendError;
260    type Stream = TaskStream<Task<Args, Ctx, RandomId>, SendError>;
261    type Layer = Identity;
262    type Beat = BoxStream<'static, Result<(), Self::Error>>;
263
264    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
265        stream::once(async { Ok(()) }).boxed()
266    }
267    fn middleware(&self) -> Self::Layer {
268        Identity::new()
269    }
270
271    fn poll(self, _worker: &WorkerContext) -> Self::Stream {
272        (self.receiver.boxed().map(|r| Ok(Some(r))).boxed()) as _
273    }
274}
275
276impl<Args: Clone + Send + 'static, Ctx: Default + 'static> BackendExt for MemoryStorage<Args, Ctx> {
277    type Codec = IdentityCodec;
278    type Compact = Args;
279    type CompactStream = TaskStream<Task<Args, Self::Context, RandomId>, Self::Error>;
280
281    fn get_queue(&self) -> crate::backend::queue::Queue {
282        std::any::type_name::<Args>().into()
283    }
284
285    fn poll_compact(self, _worker: &WorkerContext) -> Self::CompactStream {
286        (self.receiver.map(|task| Ok(Some(task))).boxed()) as _
287    }
288}