apalis_core/backend/impls/
memory.rs1use crate::backend::BackendExt;
41use crate::backend::codec::IdentityCodec;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45 backend::{Backend, TaskStream},
46 task::{
47 Task,
48 task_id::{RandomId, TaskId},
49 },
50 worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::{
56 FutureExt, SinkExt, Stream, StreamExt,
57 stream::{self, BoxStream},
58};
59use std::{
60 pin::Pin,
61 sync::Arc,
62 task::{Context, Poll},
63};
64use tower_layer::Identity;
65
66#[doc = features_table! {
79 setup = r#"
80 # {
81 # use apalis_core::backend::memory::MemoryStorage;
82 # MemoryStorage::new()
83 # };
84 "#,
85 Backend => supported("Basic Backend functionality", true),
86 TaskSink => supported("Ability to push new tasks", true),
87 Serialization => not_supported("Serialization support for arguments"),
88
89 PipeExt => not_implemented("Allow other backends to pipe to this backend"),
90 MakeShared => not_supported("Share the same storage across multiple workers"),
91
92 Update => not_supported("Allow updating a task"),
93 FetchById => not_supported("Allow fetching a task by its ID"),
94 Reschedule => not_supported("Reschedule a task"),
95
96 ResumeById => not_supported("Resume a task by its ID"),
97 ResumeAbandoned => not_supported("Resume abandoned tasks"),
98 Vacuum => not_supported("Vacuum the task storage"),
99
100 Workflow => not_implemented("Flexible enough to support workflows"),
101 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), RegisterWorker => not_supported("Allow registering a worker with the backend"),
104 ListWorkers => not_supported("List all workers registered with the backend"),
105 ListTasks => not_supported("List all tasks in the backend"),
106}]
107pub struct MemoryStorage<Args, Ctx = Extensions> {
108 pub(super) sender: MemorySink<Args, Ctx>,
109 pub(super) receiver: Pin<Box<dyn Stream<Item = Task<Args, Ctx>> + Send>>,
110}
111
112impl<Args: Send + 'static> Default for MemoryStorage<Args, Extensions> {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
119 #[must_use]
121 pub fn new() -> Self {
122 let (sender, receiver) = unbounded();
123 let sender = Box::new(sender)
124 as Box<dyn Sink<Task<Args, Extensions>, Error = SendError> + Send + Sync + Unpin>;
125 Self {
126 sender: MemorySink {
127 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
128 },
129 receiver: receiver.boxed(),
130 }
131 }
132}
133
134impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemoryStorage<Args, Ctx> {
135 type Error = SendError;
136
137 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.as_mut().sender.poll_ready_unpin(cx)
139 }
140
141 fn start_send(mut self: Pin<&mut Self>, item: Task<Args, Ctx>) -> Result<(), Self::Error> {
142 self.as_mut().sender.start_send_unpin(item)
143 }
144
145 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146 self.as_mut().sender.poll_flush_unpin(cx)
147 }
148
149 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150 self.as_mut().sender.poll_close_unpin(cx)
151 }
152}
153
154type MemorySinkInner<Args, Ctx = Extensions> = Arc<
155 futures_util::lock::Mutex<
156 Box<dyn Sink<Task<Args, Ctx>, Error = SendError> + Send + Sync + Unpin + 'static>,
157 >,
158>;
159
160pub struct MemorySink<Args, Ctx = Extensions> {
162 pub(super) inner: MemorySinkInner<Args, Ctx>,
163}
164
165impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 f.debug_struct("MemorySink")
168 .field("inner", &"<Sink>")
169 .finish()
170 }
171}
172
173impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
174 fn clone(&self) -> Self {
175 Self {
176 inner: Arc::clone(&self.inner),
177 }
178 }
179}
180
181impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemorySink<Args, Ctx> {
182 type Error = SendError;
183
184 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
185 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
186 Pin::new(&mut *lock).poll_ready_unpin(cx)
187 }
188
189 fn start_send(self: Pin<&mut Self>, mut item: Task<Args, Ctx>) -> Result<(), Self::Error> {
190 let mut lock = self.inner.try_lock().unwrap();
191 item.parts
193 .task_id
194 .get_or_insert_with(|| TaskId::new(RandomId::default()));
195 Pin::new(&mut *lock).start_send_unpin(item)
196 }
197
198 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
200 Pin::new(&mut *lock).poll_flush_unpin(cx)
201 }
202
203 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
205 Pin::new(&mut *lock).poll_close_unpin(cx)
206 }
207}
208
209impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("MemoryStorage")
212 .field("sender", &self.sender)
213 .field("receiver", &"<Stream>")
214 .finish()
215 }
216}
217
218impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
219 type Item = Task<Args, Ctx>;
220
221 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
222 self.receiver.poll_next_unpin(cx)
223 }
224}
225
226impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
228 type Args = Args;
229 type IdType = RandomId;
230
231 type Context = Ctx;
232
233 type Error = SendError;
234 type Stream = TaskStream<Task<Args, Ctx>, SendError>;
235 type Layer = Identity;
236 type Beat = BoxStream<'static, Result<(), Self::Error>>;
237
238 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
239 stream::once(async { Ok(()) }).boxed()
240 }
241 fn middleware(&self) -> Self::Layer {
242 Identity::new()
243 }
244
245 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
246 (self.receiver.boxed().map(|r| Ok(Some(r))).boxed()) as _
247 }
248}
249
250impl<Args: Clone + Send + 'static, Ctx: Default + 'static> BackendExt for MemoryStorage<Args, Ctx> {
251 type Codec = IdentityCodec;
252 type Compact = Args;
253 type CompactStream = TaskStream<Task<Args, Self::Context>, Self::Error>;
254
255 fn poll_compact(self, _worker: &WorkerContext) -> Self::CompactStream {
256 (self.receiver.map(|task| Ok(Some(task))).boxed()) as _
257 }
258}