apalis_core/backend/impls/
memory.rs

1//! # In-memory backend based on channels
2//!
3//! An in-memory backend suitable for testing, prototyping, or lightweight task processing scenarios where persistence is not required.
4//!
5//! ## Features
6//! - Generic in-memory queue for any task type.
7//! - Implements [`Backend`] for integration with workers.
8//! - Sink support: Ability to push new tasks.
9//!
10//! A detailed feature list can be found in the [capabilities](crate::backend::memory::MemoryStorage#capabilities) section.
11//!
12//! ## Example
13//!
14//! ```rust
15//! # use apalis_core::backend::memory::MemoryStorage;
16//! # use apalis_core::worker::context::WorkerContext;
17//! # use apalis_core::worker::builder::WorkerBuilder;
18//! # use apalis_core::backend::TaskSink;
19//! # async fn task(_: u32, ctx: WorkerContext) { ctx.stop().unwrap();}
20//! #[tokio::main]
21//! async fn main() {
22//!     let mut store = MemoryStorage::new();
23//!     store.push(42).await.unwrap();
24//!
25//!     let worker = WorkerBuilder::new("int-worker")
26//!         .backend(store)
27//!         .build(task);
28//!
29//!     worker.run().await.unwrap();
30//! }
31//! ```
32//!
33//! ## Note
34//! This backend is not persistent and is intended for use cases where durability is not required.
35//! For production workloads, consider using a persistent backend such as PostgreSQL or Redis.
36//!
37//! ## See Also
38//! - [`Backend`]
39//! - [`WorkerContext`](crate::worker::context::WorkerContext)
40use crate::backend::codec::IdentityCodec;
41use crate::backend::queue::Queue;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45    backend::{Backend, TaskStream},
46    task::{
47        Task,
48        task_id::{RandomId, TaskId},
49    },
50    worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::{
56    FutureExt, SinkExt, Stream, StreamExt,
57    stream::{self, BoxStream},
58};
59use std::{
60    pin::Pin,
61    sync::Arc,
62    task::{Context, Poll},
63};
64use tower_layer::Identity;
65
66/// In-memory queue that is based on channels
67///
68#[doc = features_table! {
69    setup = {
70        use apalis_core::backend::memory::MemoryStorage;
71        // No migrations
72        MemoryStorage::new()
73    };,
74
75    Backend => supported("Basic Backend functionality", true),
76    TaskSink => supported("Ability to push new tasks", true),
77    Serialization => not_supported("Serialization support for arguments"),
78
79    PipeExt => not_implemented("Allow other backends to pipe to this backend"),
80    MakeShared => not_supported("Share the same JSON storage across multiple workers"),
81
82    Update => not_supported("Allow updating a task"),
83    FetchById => not_supported("Allow fetching a task by its ID"),
84    Reschedule => not_supported("Reschedule a task"),
85
86    ResumeById => not_supported("Resume a task by its ID"),
87    ResumeAbandoned => not_supported("Resume abandoned tasks"),
88    Vacuum => not_supported("Vacuum the task storage"),
89
90    Workflow => not_implemented("Flexible enough to support workflows"),
91    WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), // Requires Clone
92
93    RegisterWorker => not_supported("Allow registering a worker with the backend"),
94    ListWorkers => not_supported("List all workers registered with the backend"),
95    ListTasks => not_supported("List all tasks in the backend"),
96}]
97pub struct MemoryStorage<Args, Ctx = Extensions> {
98    pub(super) sender: MemorySink<Args, Ctx>,
99    pub(super) receiver: Pin<Box<dyn Stream<Item = Task<Args, Ctx>> + Send>>,
100}
101
102impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
103    /// Create a new in-memory storage
104    pub fn new() -> Self {
105        let (sender, receiver) = unbounded();
106        let sender = Box::new(sender)
107            as Box<
108                dyn Sink<Task<Args, Extensions>, Error = SendError>
109                    + Send
110                    + Sync
111                    + Unpin,
112            >;
113        MemoryStorage {
114            sender: MemorySink {
115                inner: Arc::new(futures_util::lock::Mutex::new(sender)),
116            },
117            receiver: receiver.boxed(),
118        }
119    }
120}
121
122impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemoryStorage<Args, Ctx> {
123    type Error = SendError;
124
125    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126        self.as_mut().sender.poll_ready_unpin(cx)
127    }
128
129    fn start_send(mut self: Pin<&mut Self>, item: Task<Args, Ctx>) -> Result<(), Self::Error> {
130        self.as_mut().sender.start_send_unpin(item)
131    }
132
133    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134        self.as_mut().sender.poll_flush_unpin(cx)
135    }
136
137    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        self.as_mut().sender.poll_close_unpin(cx)
139    }
140}
141
142/// Memory sink for sending tasks to the in-memory backend
143pub struct MemorySink<Args, Ctx = Extensions> {
144    pub(super) inner: Arc<
145        futures_util::lock::Mutex<
146            Box<
147                dyn Sink<Task<Args, Ctx>, Error = SendError>
148                    + Send
149                    + Sync
150                    + Unpin
151                    + 'static,
152            >,
153        >,
154    >,
155}
156
157impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("MemorySink")
160            .field("inner", &"<Sink>")
161            .finish()
162    }
163}
164
165impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
166    fn clone(&self) -> Self {
167        Self {
168            inner: Arc::clone(&self.inner),
169        }
170    }
171}
172
173impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemorySink<Args, Ctx> {
174    type Error = SendError;
175
176    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
177        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
178        Pin::new(&mut *lock).poll_ready_unpin(cx)
179    }
180
181    fn start_send(self: Pin<&mut Self>, mut item: Task<Args, Ctx>) -> Result<(), Self::Error> {
182        let mut lock = self.inner.try_lock().unwrap();
183        // Ensure task has id
184        item.parts
185            .task_id
186            .get_or_insert_with(|| TaskId::new(RandomId::default()));
187        item.parts
188            .queue
189            .get_or_insert_with(|| Queue::from(std::any::type_name::<Args>()));
190        Pin::new(&mut *lock).start_send_unpin(item)
191    }
192
193    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
195        Pin::new(&mut *lock).poll_flush_unpin(cx)
196    }
197
198    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199        let mut lock = ready!(self.inner.lock().poll_unpin(cx));
200        Pin::new(&mut *lock).poll_close_unpin(cx)
201    }
202}
203
204impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        f.debug_struct("MemoryStorage")
207            .field("sender", &self.sender)
208            .field("receiver", &"<Stream>")
209            .finish()
210    }
211}
212
213impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
214    type Item = Task<Args, Ctx>;
215
216    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
217        self.receiver.poll_next_unpin(cx)
218    }
219}
220
221// MemoryStorage as a Backend
222impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
223    type Args = Args;
224    type IdType = RandomId;
225
226    type Context = Ctx;
227
228    type Error = SendError;
229    type Stream = TaskStream<Task<Args, Ctx>, SendError>;
230    type Layer = Identity;
231    type Beat = BoxStream<'static, Result<(), Self::Error>>;
232
233    type Codec = IdentityCodec;
234    type Compact = Args;
235
236    fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
237        stream::once(async { Ok(()) }).boxed()
238    }
239    fn middleware(&self) -> Self::Layer {
240        Identity::new()
241    }
242
243    fn poll(self, _worker: &WorkerContext) -> Self::Stream {
244        let stream = self.receiver.boxed().map(|r| Ok(Some(r))).boxed();
245        stream
246    }
247}