Skip to main content

apalis_core/backend/impls/
memory.rs

1//! # In-memory backend based on channels
2//!
3//! An in-memory backend suitable for testing, prototyping, or lightweight task processing scenarios where persistence is not required.
4//!
5//! ## Features
6//! - Generic in-memory queue for any task type.
7//! - Implements [`Backend`] for integration with workers.
8//! - Sink support: Ability to push new tasks.
9//!
10//! A detailed feature list can be found in the [capabilities](crate::backend::memory::MemoryStorage#capabilities) section.
11//!
12//! ## Example
13//!
14//! ```rust
15//! # use apalis_core::backend::memory::MemoryStorage;
16//! # use apalis_core::worker::context::WorkerContext;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use apalis_core::backend::TaskSink;
19//! # async fn task(_: u32, ctx: WorkerContext) { ctx.stop().unwrap();}
20//! #[tokio::main]
21//! async fn main() {
22//!     let mut store = MemoryStorage::new();
23//!     store.push(42).await.unwrap();
24//!
25//!     let worker = WorkerBuilder::new("int-worker")
26//!         .backend(store)
27//!         .build(task);
28//!
29//!     worker.run().await.unwrap();
30//! }
31//! ```
32//!
33//! ## Note
34//! This backend is not persistent and is intended for use cases where durability is not required.
35//! For production workloads, consider using a persistent backend such as PostgreSQL or Redis.
36//!
37//! ## See Also
38//! - [`Backend`]
39//! - [`WorkerContext`]
40use crate::backend::BackendExt;
41use crate::backend::codec::IdentityCodec;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45    backend::{Backend, TaskStream},
46    task::{
47        Task,
48        task_id::{RandomId, TaskId},
49    },
50    worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::lock::Mutex;
56use futures_util::{
57    FutureExt, SinkExt, Stream, StreamExt,
58    stream::{self, BoxStream},
59};
60use std::collections::HashSet;
61use std::{
62    pin::Pin,
63    sync::Arc,
64    task::{Context, Poll},
65};
66use tower_layer::Identity;
67
68/// A boxed in-memory task receiver stream
69pub type BoxedReceiver<Args, Ctx> = Pin<Box<dyn Stream<Item = Task<Args, Ctx, RandomId>> + Send>>;
70
71/// In-memory queue that is based on channels
72///
73///
74/// ## Example
75/// ```rust
76/// # use apalis_core::backend::memory::MemoryStorage;
77/// # fn setup() -> MemoryStorage<u32> {
78/// let mut backend = MemoryStorage::new();
79/// # backend
80/// # }
81/// ```
82///
83#[doc = features_table! {
84    setup = r#"
85        # {
86        #   use apalis_core::backend::memory::MemoryStorage;
87        #   MemoryStorage::new()
88        # };
89    "#,
90    Backend => supported("Basic Backend functionality", true),
91    TaskSink => supported("Ability to push new tasks", true),
92    Serialization => not_supported("Serialization support for arguments"),
93
94    PipeExt => not_implemented("Allow other backends to pipe to this backend"),
95    MakeShared => not_supported("Share the same storage across multiple workers"),
96
97    Update => not_supported("Allow updating a task"),
98    FetchById => not_supported("Allow fetching a task by its ID"),
99    Reschedule => not_supported("Reschedule a task"),
100
101    ResumeById => not_supported("Resume a task by its ID"),
102    ResumeAbandoned => not_supported("Resume abandoned tasks"),
103    Vacuum => not_supported("Vacuum the task storage"),
104
105    Workflow => not_implemented("Flexible enough to support workflows"),
106    WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), // Requires Clone
107
108    RegisterWorker => not_supported("Allow registering a worker with the backend"),
109    ListWorkers => not_supported("List all workers registered with the backend"),
110    ListTasks => not_supported("List all tasks in the backend"),
111}]
112pub struct MemoryStorage<Args, Ctx = Extensions> {
113    pub(super) sender: MemorySink<Args, Ctx>,
114    pub(super) receiver: BoxedReceiver<Args, Ctx>,
115}
116
117impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
124    /// Create a new in-memory storage
125    #[must_use]
126    pub fn new() -> Self {
127        let (sender, receiver) = unbounded();
128        let sender = Box::new(sender)
129            as Box<
130                dyn Sink<Task<Args, Extensions, RandomId>, Error = SendError> + Send + Sync + Unpin,
131            >;
132        Self {
133            sender: MemorySink {
134                inner: Arc::new(futures_util::lock::Mutex::new(sender)),
135                idempotency_keys: Default::default(),
136            },
137            receiver: receiver.boxed(),
138        }
139    }
140}
141
142impl<Args: Send + 'static, Ctx> MemoryStorage<Args, Ctx> {
143    /// Create a storage given a sender and receiver
144    #[must_use]
145    pub fn new_with(sender: MemorySink<Args, Ctx>, receiver: BoxedReceiver<Args, Ctx>) -> Self {
146        Self { sender, receiver }
147    }
148}
149
150impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
151    type Error = SendError;
152
153    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154        self.as_mut().sender.poll_ready_unpin(cx)
155    }
156
157    fn start_send(
158        mut self: Pin<&mut Self>,
159        item: Task<Args, Ctx, RandomId>,
160    ) -> Result<(), Self::Error> {
161        self.as_mut().sender.start_send_unpin(item)
162    }
163
164    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
165        self.as_mut().sender.poll_flush_unpin(cx)
166    }
167
168    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169        self.as_mut().sender.poll_close_unpin(cx)
170    }
171}
172
173type ArcMemorySink<Args, Ctx = Extensions> = Arc<
174    Mutex<
175        Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
176    >,
177>;
178
179type ArcIdempotencySet = Arc<Mutex<HashSet<String>>>;
180
181/// Memory sink for sending tasks to the in-memory backend
182pub struct MemorySink<Args, Ctx = Extensions> {
183    pub(super) inner: ArcMemorySink<Args, Ctx>,
184    pub(super) idempotency_keys: ArcIdempotencySet,
185}
186
187impl<Args, Ctx> MemorySink<Args, Ctx> {
188    /// Build a new memory sink given a sink
189    pub fn new(sink: ArcMemorySink<Args, Ctx>) -> Self {
190        Self {
191            inner: sink,
192            idempotency_keys: Arc::new(Mutex::new(HashSet::new())),
193        }
194    }
195}
196
197impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("MemorySink")
200            .field("inner", &"<Sink>")
201            .field("idempotency_keys", &self.idempotency_keys.lock())
202            .finish()
203    }
204}
205
206impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
207    fn clone(&self) -> Self {
208        Self {
209            inner: Arc::clone(&self.inner),
210            idempotency_keys: Arc::clone(&self.idempotency_keys),
211        }
212    }
213}
214
215impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
216    type Error = SendError;
217
218    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
219        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
220        Pin::new(&mut *lock).poll_ready_unpin(cx)
221    }
222
223    fn start_send(
224        self: Pin<&mut Self>,
225        mut item: Task<Args, Ctx, RandomId>,
226    ) -> Result<(), Self::Error> {
227        let this = self.get_mut();
228
229        // Ensure task id exists
230        item.parts
231            .task_id
232            .get_or_insert_with(|| TaskId::new(RandomId::default()));
233
234        if let Some(key) = item.parts.idempotency_key.as_ref() {
235            let mut keys = this.idempotency_keys.try_lock().unwrap();
236
237            if keys.contains(key) {
238                return Ok(());
239            }
240
241            keys.insert(key.clone());
242        }
243
244        let mut sink = this.inner.try_lock().unwrap();
245        Pin::new(&mut *sink).start_send_unpin(item)
246    }
247
248    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
249        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
250        Pin::new(&mut *lock).poll_flush_unpin(cx)
251    }
252
253    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
254        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
255        Pin::new(&mut *lock).poll_close_unpin(cx)
256    }
257}
258
259impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        f.debug_struct("MemoryStorage")
262            .field("sender", &self.sender)
263            .field("receiver", &"<Stream>")
264            .finish()
265    }
266}
267
268impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
269    type Item = Task<Args, Ctx, RandomId>;
270
271    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
272        self.receiver.poll_next_unpin(cx)
273    }
274}
275
276// MemoryStorage as a Backend
277impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
278    type Args = Args;
279    type IdType = RandomId;
280
281    type Context = Ctx;
282
283    type Error = SendError;
284    type Stream = TaskStream<Task<Args, Ctx, RandomId>, SendError>;
285    type Layer = Identity;
286    type Beat = BoxStream<'static, Result<(), Self::Error>>;
287
288    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
289        stream::once(async { Ok(()) }).boxed()
290    }
291    fn middleware(&self) -> Self::Layer {
292        Identity::new()
293    }
294
295    fn poll(self, _worker: &WorkerContext) -> Self::Stream {
296        (self.receiver.boxed().map(|r| Ok(Some(r))).boxed()) as _
297    }
298}
299
300impl<Args: Clone + Send + 'static, Ctx: Default + 'static> BackendExt for MemoryStorage<Args, Ctx> {
301    type Codec = IdentityCodec;
302    type Compact = Args;
303    type CompactStream = TaskStream<Task<Args, Self::Context, RandomId>, Self::Error>;
304
305    fn get_queue(&self) -> crate::backend::queue::Queue {
306        std::any::type_name::<Args>().into()
307    }
308
309    fn poll_compact(self, _worker: &WorkerContext) -> Self::CompactStream {
310        (self.receiver.map(|task| Ok(Some(task))).boxed()) as _
311    }
312}