apalis_core/backend/impls/
memory.rs1use crate::backend::BackendExt;
41use crate::backend::codec::IdentityCodec;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45 backend::{Backend, TaskStream},
46 task::{
47 Task,
48 task_id::{RandomId, TaskId},
49 },
50 worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::{
56 FutureExt, SinkExt, Stream, StreamExt,
57 stream::{self, BoxStream},
58};
59use std::{
60 pin::Pin,
61 sync::Arc,
62 task::{Context, Poll},
63};
64use tower_layer::Identity;
65
66#[doc = features_table! {
79 setup = r#"
80 # {
81 # use apalis_core::backend::memory::MemoryStorage;
82 # MemoryStorage::new()
83 # };
84 "#,
85 Backend => supported("Basic Backend functionality", true),
86 TaskSink => supported("Ability to push new tasks", true),
87 Serialization => not_supported("Serialization support for arguments"),
88
89 PipeExt => not_implemented("Allow other backends to pipe to this backend"),
90 MakeShared => not_supported("Share the same storage across multiple workers"),
91
92 Update => not_supported("Allow updating a task"),
93 FetchById => not_supported("Allow fetching a task by its ID"),
94 Reschedule => not_supported("Reschedule a task"),
95
96 ResumeById => not_supported("Resume a task by its ID"),
97 ResumeAbandoned => not_supported("Resume abandoned tasks"),
98 Vacuum => not_supported("Vacuum the task storage"),
99
100 Workflow => not_implemented("Flexible enough to support workflows"),
101 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), RegisterWorker => not_supported("Allow registering a worker with the backend"),
104 ListWorkers => not_supported("List all workers registered with the backend"),
105 ListTasks => not_supported("List all tasks in the backend"),
106}]
107pub struct MemoryStorage<Args, Ctx = Extensions> {
108 pub(super) sender: MemorySink<Args, Ctx>,
109 pub(super) receiver: Pin<Box<dyn Stream<Item = Task<Args, Ctx, RandomId>> + Send>>,
110}
111
112impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
119 #[must_use]
121 pub fn new() -> Self {
122 let (sender, receiver) = unbounded();
123 let sender = Box::new(sender)
124 as Box<
125 dyn Sink<Task<Args, Extensions, RandomId>, Error = SendError> + Send + Sync + Unpin,
126 >;
127 Self {
128 sender: MemorySink {
129 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
130 },
131 receiver: receiver.boxed(),
132 }
133 }
134}
135
136impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
137 type Error = SendError;
138
139 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
140 self.as_mut().sender.poll_ready_unpin(cx)
141 }
142
143 fn start_send(
144 mut self: Pin<&mut Self>,
145 item: Task<Args, Ctx, RandomId>,
146 ) -> Result<(), Self::Error> {
147 self.as_mut().sender.start_send_unpin(item)
148 }
149
150 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151 self.as_mut().sender.poll_flush_unpin(cx)
152 }
153
154 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 self.as_mut().sender.poll_close_unpin(cx)
156 }
157}
158
159type MemorySinkInner<Args, Ctx = Extensions> = Arc<
160 futures_util::lock::Mutex<
161 Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
162 >,
163>;
164
165pub struct MemorySink<Args, Ctx = Extensions> {
167 pub(super) inner: MemorySinkInner<Args, Ctx>,
168}
169
170impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 f.debug_struct("MemorySink")
173 .field("inner", &"<Sink>")
174 .finish()
175 }
176}
177
178impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
179 fn clone(&self) -> Self {
180 Self {
181 inner: Arc::clone(&self.inner),
182 }
183 }
184}
185
186impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
187 type Error = SendError;
188
189 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
190 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
191 Pin::new(&mut *lock).poll_ready_unpin(cx)
192 }
193
194 fn start_send(
195 self: Pin<&mut Self>,
196 mut item: Task<Args, Ctx, RandomId>,
197 ) -> Result<(), Self::Error> {
198 let mut lock = self.inner.try_lock().unwrap();
199 item.parts
201 .task_id
202 .get_or_insert_with(|| TaskId::new(RandomId::default()));
203 Pin::new(&mut *lock).start_send_unpin(item)
204 }
205
206 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
208 Pin::new(&mut *lock).poll_flush_unpin(cx)
209 }
210
211 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
212 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
213 Pin::new(&mut *lock).poll_close_unpin(cx)
214 }
215}
216
217impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 f.debug_struct("MemoryStorage")
220 .field("sender", &self.sender)
221 .field("receiver", &"<Stream>")
222 .finish()
223 }
224}
225
226impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
227 type Item = Task<Args, Ctx, RandomId>;
228
229 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
230 self.receiver.poll_next_unpin(cx)
231 }
232}
233
234impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
236 type Args = Args;
237 type IdType = RandomId;
238
239 type Context = Ctx;
240
241 type Error = SendError;
242 type Stream = TaskStream<Task<Args, Ctx, RandomId>, SendError>;
243 type Layer = Identity;
244 type Beat = BoxStream<'static, Result<(), Self::Error>>;
245
246 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
247 stream::once(async { Ok(()) }).boxed()
248 }
249 fn middleware(&self) -> Self::Layer {
250 Identity::new()
251 }
252
253 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
254 (self.receiver.boxed().map(|r| Ok(Some(r))).boxed()) as _
255 }
256}
257
258impl<Args: Clone + Send + 'static, Ctx: Default + 'static> BackendExt for MemoryStorage<Args, Ctx> {
259 type Codec = IdentityCodec;
260 type Compact = Args;
261 type CompactStream = TaskStream<Task<Args, Self::Context, RandomId>, Self::Error>;
262
263 fn poll_compact(self, _worker: &WorkerContext) -> Self::CompactStream {
264 (self.receiver.map(|task| Ok(Some(task))).boxed()) as _
265 }
266}