apalis_core/backend/impls/
memory.rs1use crate::backend::BackendExt;
41use crate::backend::codec::IdentityCodec;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45 backend::{Backend, TaskStream},
46 task::{
47 Task,
48 task_id::{RandomId, TaskId},
49 },
50 worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::{
56 FutureExt, SinkExt, Stream, StreamExt,
57 stream::{self, BoxStream},
58};
59use std::{
60 pin::Pin,
61 sync::Arc,
62 task::{Context, Poll},
63};
64use tower_layer::Identity;
65
66pub type BoxedReceiver<Args, Ctx> = Pin<Box<dyn Stream<Item = Task<Args, Ctx, RandomId>> + Send>>;
68
69#[doc = features_table! {
82 setup = r#"
83 # {
84 # use apalis_core::backend::memory::MemoryStorage;
85 # MemoryStorage::new()
86 # };
87 "#,
88 Backend => supported("Basic Backend functionality", true),
89 TaskSink => supported("Ability to push new tasks", true),
90 Serialization => not_supported("Serialization support for arguments"),
91
92 PipeExt => not_implemented("Allow other backends to pipe to this backend"),
93 MakeShared => not_supported("Share the same storage across multiple workers"),
94
95 Update => not_supported("Allow updating a task"),
96 FetchById => not_supported("Allow fetching a task by its ID"),
97 Reschedule => not_supported("Reschedule a task"),
98
99 ResumeById => not_supported("Resume a task by its ID"),
100 ResumeAbandoned => not_supported("Resume abandoned tasks"),
101 Vacuum => not_supported("Vacuum the task storage"),
102
103 Workflow => not_implemented("Flexible enough to support workflows"),
104 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), RegisterWorker => not_supported("Allow registering a worker with the backend"),
107 ListWorkers => not_supported("List all workers registered with the backend"),
108 ListTasks => not_supported("List all tasks in the backend"),
109}]
110pub struct MemoryStorage<Args, Ctx = Extensions> {
111 pub(super) sender: MemorySink<Args, Ctx>,
112 pub(super) receiver: BoxedReceiver<Args, Ctx>,
113}
114
115impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
122 #[must_use]
124 pub fn new() -> Self {
125 let (sender, receiver) = unbounded();
126 let sender = Box::new(sender)
127 as Box<
128 dyn Sink<Task<Args, Extensions, RandomId>, Error = SendError> + Send + Sync + Unpin,
129 >;
130 Self {
131 sender: MemorySink {
132 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
133 },
134 receiver: receiver.boxed(),
135 }
136 }
137}
138
139impl<Args: Send + 'static, Ctx> MemoryStorage<Args, Ctx> {
140 #[must_use]
142 pub fn new_with(sender: MemorySink<Args, Ctx>, receiver: BoxedReceiver<Args, Ctx>) -> Self {
143 Self { sender, receiver }
144 }
145}
146
147impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemoryStorage<Args, Ctx> {
148 type Error = SendError;
149
150 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151 self.as_mut().sender.poll_ready_unpin(cx)
152 }
153
154 fn start_send(
155 mut self: Pin<&mut Self>,
156 item: Task<Args, Ctx, RandomId>,
157 ) -> Result<(), Self::Error> {
158 self.as_mut().sender.start_send_unpin(item)
159 }
160
161 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162 self.as_mut().sender.poll_flush_unpin(cx)
163 }
164
165 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
166 self.as_mut().sender.poll_close_unpin(cx)
167 }
168}
169
170type ArcMemorySink<Args, Ctx = Extensions> = Arc<
171 futures_util::lock::Mutex<
172 Box<dyn Sink<Task<Args, Ctx, RandomId>, Error = SendError> + Send + Sync + Unpin + 'static>,
173 >,
174>;
175
176pub struct MemorySink<Args, Ctx = Extensions> {
178 pub(super) inner: ArcMemorySink<Args, Ctx>,
179}
180
181impl<Args, Ctx> MemorySink<Args, Ctx> {
182 pub fn new(sink: ArcMemorySink<Args, Ctx>) -> Self {
184 Self { inner: sink }
185 }
186}
187
188impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("MemorySink")
191 .field("inner", &"<Sink>")
192 .finish()
193 }
194}
195
196impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
197 fn clone(&self) -> Self {
198 Self {
199 inner: Arc::clone(&self.inner),
200 }
201 }
202}
203
204impl<Args, Ctx> Sink<Task<Args, Ctx, RandomId>> for MemorySink<Args, Ctx> {
205 type Error = SendError;
206
207 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
209 Pin::new(&mut *lock).poll_ready_unpin(cx)
210 }
211
212 fn start_send(
213 self: Pin<&mut Self>,
214 mut item: Task<Args, Ctx, RandomId>,
215 ) -> Result<(), Self::Error> {
216 let mut lock = self.inner.try_lock().unwrap();
217 item.parts
219 .task_id
220 .get_or_insert_with(|| TaskId::new(RandomId::default()));
221 Pin::new(&mut *lock).start_send_unpin(item)
222 }
223
224 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
225 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
226 Pin::new(&mut *lock).poll_flush_unpin(cx)
227 }
228
229 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
230 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
231 Pin::new(&mut *lock).poll_close_unpin(cx)
232 }
233}
234
235impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
236 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 f.debug_struct("MemoryStorage")
238 .field("sender", &self.sender)
239 .field("receiver", &"<Stream>")
240 .finish()
241 }
242}
243
244impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
245 type Item = Task<Args, Ctx, RandomId>;
246
247 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
248 self.receiver.poll_next_unpin(cx)
249 }
250}
251
252impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
254 type Args = Args;
255 type IdType = RandomId;
256
257 type Context = Ctx;
258
259 type Error = SendError;
260 type Stream = TaskStream<Task<Args, Ctx, RandomId>, SendError>;
261 type Layer = Identity;
262 type Beat = BoxStream<'static, Result<(), Self::Error>>;
263
264 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
265 stream::once(async { Ok(()) }).boxed()
266 }
267 fn middleware(&self) -> Self::Layer {
268 Identity::new()
269 }
270
271 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
272 (self.receiver.boxed().map(|r| Ok(Some(r))).boxed()) as _
273 }
274}
275
276impl<Args: Clone + Send + 'static, Ctx: Default + 'static> BackendExt for MemoryStorage<Args, Ctx> {
277 type Codec = IdentityCodec;
278 type Compact = Args;
279 type CompactStream = TaskStream<Task<Args, Self::Context, RandomId>, Self::Error>;
280
281 fn get_queue(&self) -> crate::backend::queue::Queue {
282 std::any::type_name::<Args>().into()
283 }
284
285 fn poll_compact(self, _worker: &WorkerContext) -> Self::CompactStream {
286 (self.receiver.map(|task| Ok(Some(task))).boxed()) as _
287 }
288}