apalis_core/backend/impls/
memory.rs1use crate::backend::codec::IdentityCodec;
41use crate::backend::queue::Queue;
42use crate::features_table;
43use crate::task::extensions::Extensions;
44use crate::{
45 backend::{Backend, TaskStream},
46 task::{
47 Task,
48 task_id::{RandomId, TaskId},
49 },
50 worker::context::WorkerContext,
51};
52use futures_channel::mpsc::{SendError, unbounded};
53use futures_core::ready;
54use futures_sink::Sink;
55use futures_util::{
56 FutureExt, SinkExt, Stream, StreamExt,
57 stream::{self, BoxStream},
58};
59use std::{
60 pin::Pin,
61 sync::Arc,
62 task::{Context, Poll},
63};
64use tower_layer::Identity;
65
66#[doc = features_table! {
69 setup = {
70 use apalis_core::backend::memory::MemoryStorage;
71 MemoryStorage::new()
73 };,
74
75 Backend => supported("Basic Backend functionality", true),
76 TaskSink => supported("Ability to push new tasks", true),
77 Serialization => not_supported("Serialization support for arguments"),
78
79 PipeExt => not_implemented("Allow other backends to pipe to this backend"),
80 MakeShared => not_supported("Share the same JSON storage across multiple workers"),
81
82 Update => not_supported("Allow updating a task"),
83 FetchById => not_supported("Allow fetching a task by its ID"),
84 Reschedule => not_supported("Reschedule a task"),
85
86 ResumeById => not_supported("Resume a task by its ID"),
87 ResumeAbandoned => not_supported("Resume abandoned tasks"),
88 Vacuum => not_supported("Vacuum the task storage"),
89
90 Workflow => not_implemented("Flexible enough to support workflows"),
91 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), RegisterWorker => not_supported("Allow registering a worker with the backend"),
94 ListWorkers => not_supported("List all workers registered with the backend"),
95 ListTasks => not_supported("List all tasks in the backend"),
96}]
97pub struct MemoryStorage<Args, Ctx = Extensions> {
98 pub(super) sender: MemorySink<Args, Ctx>,
99 pub(super) receiver: Pin<Box<dyn Stream<Item = Task<Args, Ctx>> + Send>>,
100}
101
102impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
103 pub fn new() -> Self {
105 let (sender, receiver) = unbounded();
106 let sender = Box::new(sender)
107 as Box<
108 dyn Sink<Task<Args, Extensions>, Error = SendError>
109 + Send
110 + Sync
111 + Unpin,
112 >;
113 MemoryStorage {
114 sender: MemorySink {
115 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
116 },
117 receiver: receiver.boxed(),
118 }
119 }
120}
121
122impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemoryStorage<Args, Ctx> {
123 type Error = SendError;
124
125 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126 self.as_mut().sender.poll_ready_unpin(cx)
127 }
128
129 fn start_send(mut self: Pin<&mut Self>, item: Task<Args, Ctx>) -> Result<(), Self::Error> {
130 self.as_mut().sender.start_send_unpin(item)
131 }
132
133 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134 self.as_mut().sender.poll_flush_unpin(cx)
135 }
136
137 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.as_mut().sender.poll_close_unpin(cx)
139 }
140}
141
142pub struct MemorySink<Args, Ctx = Extensions> {
144 pub(super) inner: Arc<
145 futures_util::lock::Mutex<
146 Box<
147 dyn Sink<Task<Args, Ctx>, Error = SendError>
148 + Send
149 + Sync
150 + Unpin
151 + 'static,
152 >,
153 >,
154 >,
155}
156
157impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("MemorySink")
160 .field("inner", &"<Sink>")
161 .finish()
162 }
163}
164
165impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
166 fn clone(&self) -> Self {
167 Self {
168 inner: Arc::clone(&self.inner),
169 }
170 }
171}
172
173impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemorySink<Args, Ctx> {
174 type Error = SendError;
175
176 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
177 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
178 Pin::new(&mut *lock).poll_ready_unpin(cx)
179 }
180
181 fn start_send(self: Pin<&mut Self>, mut item: Task<Args, Ctx>) -> Result<(), Self::Error> {
182 let mut lock = self.inner.try_lock().unwrap();
183 item.parts
185 .task_id
186 .get_or_insert_with(|| TaskId::new(RandomId::default()));
187 item.parts
188 .queue
189 .get_or_insert_with(|| Queue::from(std::any::type_name::<Args>()));
190 Pin::new(&mut *lock).start_send_unpin(item)
191 }
192
193 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
195 Pin::new(&mut *lock).poll_flush_unpin(cx)
196 }
197
198 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
200 Pin::new(&mut *lock).poll_close_unpin(cx)
201 }
202}
203
204impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 f.debug_struct("MemoryStorage")
207 .field("sender", &self.sender)
208 .field("receiver", &"<Stream>")
209 .finish()
210 }
211}
212
213impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
214 type Item = Task<Args, Ctx>;
215
216 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
217 self.receiver.poll_next_unpin(cx)
218 }
219}
220
221impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
223 type Args = Args;
224 type IdType = RandomId;
225
226 type Context = Ctx;
227
228 type Error = SendError;
229 type Stream = TaskStream<Task<Args, Ctx>, SendError>;
230 type Layer = Identity;
231 type Beat = BoxStream<'static, Result<(), Self::Error>>;
232
233 type Codec = IdentityCodec;
234 type Compact = Args;
235
236 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
237 stream::once(async { Ok(()) }).boxed()
238 }
239 fn middleware(&self) -> Self::Layer {
240 Identity::new()
241 }
242
243 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
244 let stream = self.receiver.boxed().map(|r| Ok(Some(r))).boxed();
245 stream
246 }
247}