apalis_core/backend/impls/
memory.rs1use crate::backend::BackendExt;
41use crate::backend::codec::IdentityCodec;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45 backend::{Backend, TaskStream},
46 task::{
47 Task,
48 task_id::{RandomId, TaskId},
49 },
50 worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::lock::Mutex;
56use futures_util::{
57 FutureExt, SinkExt, Stream, StreamExt,
58 stream::{self, BoxStream},
59};
60use std::collections::HashSet;
61use std::{
62 pin::Pin,
63 sync::Arc,
64 task::{Context, Poll},
65};
66use tower_layer::Identity;
67
68pub type BoxedReceiver<Args, Ctx> = Pin<Box<dyn Stream<Item = Task<Args, Ctx, RandomId>> + Send>>;
70
71#[doc = features_table! {
84 setup = r#"
85 # {
86 # use apalis_core::backend::memory::MemoryStorage;
87 # MemoryStorage::new()
88 # };
89 "#,
90 Backend => supported("Basic Backend functionality", true),
91 TaskSink => supported("Ability to push new tasks", true),
92 Serialization => not_supported("Serialization support for arguments"),
93
94 PipeExt => not_implemented("Allow other backends to pipe to this backend"),
95 MakeShared => not_supported("Share the same storage across multiple workers"),
96
97 Update => not_supported("Allow updating a task"),
98 FetchById => not_supported("Allow fetching a task by its ID"),
99 Reschedule => not_supported("Reschedule a task"),
100
101 ResumeById => not_supported("Resume a task by its ID"),
102 ResumeAbandoned => not_supported("Resume abandoned tasks"),
103 Vacuum => not_supported("Vacuum the task storage"),
104
105 Workflow => not_implemented("Flexible enough to support workflows"),
106 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), RegisterWorker => not_supported("Allow registering a worker with the backend"),
109 ListWorkers => not_supported("List all workers registered with the backend"),
110 ListTasks => not_supported("List all tasks in the backend"),
111}]
112pub struct MemoryStorage<Args, Ctx = Extensions> {
113 pub(super) sender: MemorySink<Args, Ctx>,
114 pub(super) receiver: BoxedReceiver<Args, Ctx>,
115}
116
117impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
124 #[must_use]
126 pub fn new() -> Self {
127 let (sender, receiver) = unbounded();
128 let sender = Box::new(sender)
129 as Box<
130 dyn Sink<Task<Args, Extensions, RandomId>, Error = SendError> + Send + Sync + Unpin,
131 >;
132 Self {
133 sender: MemorySink {
134 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
135 idempotency_keys: Default::default(),
136 },
137 receiver: receiver.boxed(),
138 }
139 }
140}
141
142impl<Args: Send + 'static, Ctx> MemoryStorage<Args, Ctx> {
143 #[must_use]
145 pub fn new_with(sender: MemorySink<Args, Ctx>, receiver: BoxedReceiver<Args, Ctx>) -> Self {
146 Self { sender, receiver }
147 }
148}
149
150impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
151 type Error = SendError;
152
153 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154 self.as_mut().sender.poll_ready_unpin(cx)
155 }
156
157 fn start_send(
158 mut self: Pin<&mut Self>,
159 item: Task<Args, Ctx, RandomId>,
160 ) -> Result<(), Self::Error> {
161 self.as_mut().sender.start_send_unpin(item)
162 }
163
164 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
165 self.as_mut().sender.poll_flush_unpin(cx)
166 }
167
168 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 self.as_mut().sender.poll_close_unpin(cx)
170 }
171}
172
173type ArcMemorySink<Args, Ctx = Extensions> = Arc<
174 Mutex<
175 Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
176 >,
177>;
178
179type ArcIdempotencySet = Arc<Mutex<HashSet<String>>>;
180
181pub struct MemorySink<Args, Ctx = Extensions> {
183 pub(super) inner: ArcMemorySink<Args, Ctx>,
184 pub(super) idempotency_keys: ArcIdempotencySet,
185}
186
187impl<Args, Ctx> MemorySink<Args, Ctx> {
188 pub fn new(sink: ArcMemorySink<Args, Ctx>) -> Self {
190 Self {
191 inner: sink,
192 idempotency_keys: Arc::new(Mutex::new(HashSet::new())),
193 }
194 }
195}
196
197impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("MemorySink")
200 .field("inner", &"<Sink>")
201 .field("idempotency_keys", &self.idempotency_keys.lock())
202 .finish()
203 }
204}
205
206impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
207 fn clone(&self) -> Self {
208 Self {
209 inner: Arc::clone(&self.inner),
210 idempotency_keys: Arc::clone(&self.idempotency_keys),
211 }
212 }
213}
214
215impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
216 type Error = SendError;
217
218 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
219 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
220 Pin::new(&mut *lock).poll_ready_unpin(cx)
221 }
222
223 fn start_send(
224 self: Pin<&mut Self>,
225 mut item: Task<Args, Ctx, RandomId>,
226 ) -> Result<(), Self::Error> {
227 let this = self.get_mut();
228
229 item.parts
231 .task_id
232 .get_or_insert_with(|| TaskId::new(RandomId::default()));
233
234 if let Some(key) = item.parts.idempotency_key.as_ref() {
235 let mut keys = this.idempotency_keys.try_lock().unwrap();
236
237 if keys.contains(key) {
238 return Ok(());
239 }
240
241 keys.insert(key.clone());
242 }
243
244 let mut sink = this.inner.try_lock().unwrap();
245 Pin::new(&mut *sink).start_send_unpin(item)
246 }
247
248 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
249 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
250 Pin::new(&mut *lock).poll_flush_unpin(cx)
251 }
252
253 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
254 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
255 Pin::new(&mut *lock).poll_close_unpin(cx)
256 }
257}
258
259impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 f.debug_struct("MemoryStorage")
262 .field("sender", &self.sender)
263 .field("receiver", &"<Stream>")
264 .finish()
265 }
266}
267
268impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
269 type Item = Task<Args, Ctx, RandomId>;
270
271 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
272 self.receiver.poll_next_unpin(cx)
273 }
274}
275
276impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
278 type Args = Args;
279 type IdType = RandomId;
280
281 type Context = Ctx;
282
283 type Error = SendError;
284 type Stream = TaskStream<Task<Args, Ctx, RandomId>, SendError>;
285 type Layer = Identity;
286 type Beat = BoxStream<'static, Result<(), Self::Error>>;
287
288 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
289 stream::once(async { Ok(()) }).boxed()
290 }
291 fn middleware(&self) -> Self::Layer {
292 Identity::new()
293 }
294
295 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
296 (self.receiver.boxed().map(|r| Ok(Some(r))).boxed()) as _
297 }
298}
299
300impl<Args: Clone + Send + 'static, Ctx: Default + 'static> BackendExt for MemoryStorage<Args, Ctx> {
301 type Codec = IdentityCodec;
302 type Compact = Args;
303 type CompactStream = TaskStream<Task<Args, Self::Context, RandomId>, Self::Error>;
304
305 fn get_queue(&self) -> crate::backend::queue::Queue {
306 std::any::type_name::<Args>().into()
307 }
308
309 fn poll_compact(self, _worker: &WorkerContext) -> Self::CompactStream {
310 (self.receiver.map(|task| Ok(Some(task))).boxed()) as _
311 }
312}