apalis_core/backend/impls/
memory.rs1use crate::backend::codec::IdentityCodec;
41use crate::features_table;
42use crate::task::extensions::Extensions;
43use crate::{
44 backend::{Backend, TaskStream},
45 task::{
46 Task,
47 task_id::{RandomId, TaskId},
48 },
49 worker::context::WorkerContext,
50};
51use futures_channel::mpsc::{SendError, unbounded};
52use futures_core::ready;
53use futures_sink::Sink;
54use futures_util::{
55 FutureExt, SinkExt, Stream, StreamExt,
56 stream::{self, BoxStream},
57};
58use std::{
59 pin::Pin,
60 sync::Arc,
61 task::{Context, Poll},
62};
63use tower_layer::Identity;
64
65#[doc = features_table! {
78 setup = r#"
79 # {
80 # use apalis_core::backend::memory::MemoryStorage;
81 # MemoryStorage::new()
82 # };
83 "#,
84 Backend => supported("Basic Backend functionality", true),
85 TaskSink => supported("Ability to push new tasks", true),
86 Serialization => not_supported("Serialization support for arguments"),
87
88 PipeExt => not_implemented("Allow other backends to pipe to this backend"),
89 MakeShared => not_supported("Share the same storage across multiple workers"),
90
91 Update => not_supported("Allow updating a task"),
92 FetchById => not_supported("Allow fetching a task by its ID"),
93 Reschedule => not_supported("Reschedule a task"),
94
95 ResumeById => not_supported("Resume a task by its ID"),
96 ResumeAbandoned => not_supported("Resume abandoned tasks"),
97 Vacuum => not_supported("Vacuum the task storage"),
98
99 Workflow => not_implemented("Flexible enough to support workflows"),
100 WaitForCompletion => not_implemented("Wait for tasks to complete without blocking"), RegisterWorker => not_supported("Allow registering a worker with the backend"),
103 ListWorkers => not_supported("List all workers registered with the backend"),
104 ListTasks => not_supported("List all tasks in the backend"),
105}]
106pub struct MemoryStorage<Args, Ctx = Extensions> {
107 pub(super) sender: MemorySink<Args, Ctx>,
108 pub(super) receiver: Pin<Box<dyn Stream<Item = Task<Args, Ctx>> + Send>>,
109}
110
111impl<Args: Send + 'static> MemoryStorage<Args, Extensions> {
112 pub fn new() -> Self {
114 let (sender, receiver) = unbounded();
115 let sender = Box::new(sender)
116 as Box<dyn Sink<Task<Args, Extensions>, Error = SendError> + Send + Sync + Unpin>;
117 MemoryStorage {
118 sender: MemorySink {
119 inner: Arc::new(futures_util::lock::Mutex::new(sender)),
120 },
121 receiver: receiver.boxed(),
122 }
123 }
124}
125
126impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemoryStorage<Args, Ctx> {
127 type Error = SendError;
128
129 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
130 self.as_mut().sender.poll_ready_unpin(cx)
131 }
132
133 fn start_send(mut self: Pin<&mut Self>, item: Task<Args, Ctx>) -> Result<(), Self::Error> {
134 self.as_mut().sender.start_send_unpin(item)
135 }
136
137 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.as_mut().sender.poll_flush_unpin(cx)
139 }
140
141 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142 self.as_mut().sender.poll_close_unpin(cx)
143 }
144}
145
146pub struct MemorySink<Args, Ctx = Extensions> {
148 pub(super) inner: Arc<
149 futures_util::lock::Mutex<
150 Box<dyn Sink<Task<Args, Ctx>, Error = SendError> + Send + Sync + Unpin + 'static>,
151 >,
152 >,
153}
154
155impl<Args, Ctx> std::fmt::Debug for MemorySink<Args, Ctx> {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("MemorySink")
158 .field("inner", &"<Sink>")
159 .finish()
160 }
161}
162
163impl<Args, Ctx> Clone for MemorySink<Args, Ctx> {
164 fn clone(&self) -> Self {
165 Self {
166 inner: Arc::clone(&self.inner),
167 }
168 }
169}
170
171impl<Args, Ctx> Sink<Task<Args, Ctx>> for MemorySink<Args, Ctx> {
172 type Error = SendError;
173
174 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
175 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
176 Pin::new(&mut *lock).poll_ready_unpin(cx)
177 }
178
179 fn start_send(self: Pin<&mut Self>, mut item: Task<Args, Ctx>) -> Result<(), Self::Error> {
180 let mut lock = self.inner.try_lock().unwrap();
181 item.parts
183 .task_id
184 .get_or_insert_with(|| TaskId::new(RandomId::default()));
185 Pin::new(&mut *lock).start_send_unpin(item)
186 }
187
188 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
189 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
190 Pin::new(&mut *lock).poll_flush_unpin(cx)
191 }
192
193 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 let mut lock = ready!(self.inner.lock().poll_unpin(cx));
195 Pin::new(&mut *lock).poll_close_unpin(cx)
196 }
197}
198
199impl<Args, Ctx> std::fmt::Debug for MemoryStorage<Args, Ctx> {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 f.debug_struct("MemoryStorage")
202 .field("sender", &self.sender)
203 .field("receiver", &"<Stream>")
204 .finish()
205 }
206}
207
208impl<Args, Ctx> Stream for MemoryStorage<Args, Ctx> {
209 type Item = Task<Args, Ctx>;
210
211 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
212 self.receiver.poll_next_unpin(cx)
213 }
214}
215
216impl<Args: 'static + Clone + Send, Ctx: 'static + Default> Backend for MemoryStorage<Args, Ctx> {
218 type Args = Args;
219 type IdType = RandomId;
220
221 type Context = Ctx;
222
223 type Error = SendError;
224 type Stream = TaskStream<Task<Args, Ctx>, SendError>;
225 type Layer = Identity;
226 type Beat = BoxStream<'static, Result<(), Self::Error>>;
227
228 type Codec = IdentityCodec;
229 type Compact = Args;
230
231 fn heartbeat(&self, _: &WorkerContext) -> Self::Beat {
232 stream::once(async { Ok(()) }).boxed()
233 }
234 fn middleware(&self) -> Self::Layer {
235 Identity::new()
236 }
237
238 fn poll(self, _worker: &WorkerContext) -> Self::Stream {
239 let stream = self.receiver.boxed().map(|r| Ok(Some(r))).boxed();
240 stream
241 }
242}