apalis_core/backend/impls/
dequeue.rs1#![cfg(feature = "sleep")]
2use std::{
3 collections::VecDeque,
4 fmt,
5 fmt::Debug,
6 pin::Pin,
7 sync::{
8 Arc,
9 atomic::{AtomicUsize, Ordering},
10 },
11 time::Duration,
12};
13
14use futures_core::stream::BoxStream;
15use futures_sink::Sink;
16use futures_util::{
17 FutureExt, StreamExt, TryStreamExt,
18 lock::Mutex,
19 sink::{self},
20 stream,
21};
22use tower_layer::Identity;
23
24use crate::{
25 backend::{
26 Backend, BackendExt,
27 codec::IdentityCodec,
28 custom::{BackendBuilder, CustomBackend},
29 poll_strategy::{
30 IntervalStrategy, MultiStrategy, PollContext, PollStrategyExt, StrategyBuilder,
31 },
32 },
33 error::BoxDynError,
34 task::{Task, task_id::RandomId},
35 worker::context::WorkerContext,
36};
37
38#[derive(Debug)]
40pub struct InMemoryDb<T> {
41 inner: Arc<Mutex<VecDeque<Task<T, (), RandomId>>>>,
42}
43
44impl<T> Clone for InMemoryDb<T> {
45 fn clone(&self) -> Self {
46 Self {
47 inner: self.inner.clone(),
48 }
49 }
50}
51
52impl<T> InMemoryDb<T> {
53 #[must_use]
55 pub fn new() -> Self {
56 Self {
57 inner: Arc::new(Mutex::new(VecDeque::new())),
58 }
59 }
60
61 #[must_use]
63 pub fn into_inner(self) -> Arc<Mutex<VecDeque<Task<T, (), RandomId>>>> {
64 self.inner
65 }
66
67 #[must_use]
69 pub fn as_arc(&self) -> &Arc<Mutex<VecDeque<Task<T, (), RandomId>>>> {
70 &self.inner
71 }
72}
73
74impl<T> Default for InMemoryDb<T> {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct Config {
83 strategy: MultiStrategy,
84 prev_count: Arc<AtomicUsize>,
85}
86pub type BoxSink<'a, T> = Pin<Box<dyn Sink<T, Error = VecDequeError> + Send + Sync + 'a>>;
88pub type InMemorySink<T> = BoxSink<'static, Task<T, (), RandomId>>;
90
91pub struct VecDequeBackend<T>(
93 CustomBackend<
94 T,
95 InMemoryDb<T>,
96 BoxStream<'static, Result<Option<Task<T, (), RandomId>>, VecDequeError>>,
97 InMemorySink<T>,
98 RandomId,
99 Config,
100 >,
101);
102
103impl<T> Debug for VecDequeBackend<T> {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 f.debug_struct("VecDequeBackend<T>").finish()
106 }
107}
108
109impl<T> Clone for VecDequeBackend<T> {
110 fn clone(&self) -> Self {
111 Self(self.0.clone())
112 }
113}
114
115#[derive(Debug, thiserror::Error, Clone)]
117pub enum VecDequeError {
118 #[error("Polling error: {0}")]
120 PollError(Arc<BoxDynError>),
121 #[error("Sending error: {0}")]
123 SendError(Arc<BoxDynError>),
124}
125
126impl<T> Backend for VecDequeBackend<T>
127where
128 T: Send + 'static,
129{
130 type Args = T;
131
132 type IdType = RandomId;
133
134 type Context = ();
135
136 type Stream = BoxStream<'static, Result<Option<Task<T, (), Self::IdType>>, VecDequeError>>;
137
138 type Layer = Identity;
139
140 type Beat = BoxStream<'static, Result<(), VecDequeError>>;
141
142 type Error = VecDequeError;
143
144 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
145 self.0
146 .heartbeat(worker)
147 .map_err(|e| VecDequeError::PollError(Arc::new(e.into())))
148 .boxed()
149 }
150
151 fn middleware(&self) -> Self::Layer {
152 self.0.middleware()
153 }
154 fn poll(self, worker: &WorkerContext) -> Self::Stream {
155 self.0
156 .poll(worker)
157 .map_err(|e| VecDequeError::PollError(Arc::new(e.into())))
158 .boxed()
159 }
160}
161
162impl<T: Clone> BackendExt for VecDequeBackend<T>
163where
164 T: Send + 'static,
165{
166 type Codec = IdentityCodec;
167 type Compact = T;
168 type CompactStream = Self::Stream;
169
170 fn get_queue(&self) -> crate::backend::queue::Queue {
171 std::any::type_name::<T>().into()
172 }
173
174 fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
175 self.0
176 .poll(worker)
177 .map_err(|e| VecDequeError::PollError(Arc::new(e.into())))
178 .boxed()
179 }
180}
181
182impl<T> Sink<Task<T, (), RandomId>> for VecDequeBackend<T>
183where
184 T: Send + 'static,
185{
186 type Error = VecDequeError;
187
188 fn poll_ready(
189 self: Pin<&mut Self>,
190 cx: &mut std::task::Context<'_>,
191 ) -> std::task::Poll<Result<(), Self::Error>> {
192 Pin::new(&mut self.get_mut().0)
193 .poll_ready(cx)
194 .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
195 }
196
197 fn start_send(self: Pin<&mut Self>, item: Task<T, (), RandomId>) -> Result<(), Self::Error> {
198 Pin::new(&mut self.get_mut().0)
199 .start_send(item)
200 .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
201 }
202
203 fn poll_flush(
204 self: Pin<&mut Self>,
205 cx: &mut std::task::Context<'_>,
206 ) -> std::task::Poll<Result<(), Self::Error>> {
207 Pin::new(&mut self.get_mut().0)
208 .poll_flush(cx)
209 .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
210 }
211
212 fn poll_close(
213 self: Pin<&mut Self>,
214 cx: &mut std::task::Context<'_>,
215 ) -> std::task::Poll<Result<(), Self::Error>> {
216 Pin::new(&mut self.get_mut().0)
217 .poll_close(cx)
218 .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
219 }
220}
221
222#[must_use]
224pub fn backend<T>(poll_interval: Duration) -> VecDequeBackend<T>
225where
226 T: Send + 'static,
227{
228 let memory = InMemoryDb::new();
229
230 let strategy = StrategyBuilder::new()
231 .apply(IntervalStrategy::new(poll_interval))
232 .build();
233
234 let config = Config {
235 strategy,
236 prev_count: Arc::new(AtomicUsize::new(1)),
237 };
238
239 let backend = BackendBuilder::new_with_cfg(config)
240 .database(memory)
241 .fetcher(
242 |db: &mut InMemoryDb<T>, config: &Config, worker: &WorkerContext| {
243 let poll_strategy = config.strategy.clone();
244 let poll_ctx = PollContext::new(worker.clone(), config.prev_count.clone());
245 let poller = poll_strategy.build_stream(&poll_ctx);
246 stream::unfold(
247 (db.clone(), config.clone(), poller, worker.clone()),
248 |(p, config, mut poller, ctx)| async move {
249 poller.next().await;
250 let Some(mut db) = p.inner.try_lock() else {
251 return Some((
252 Err::<Option<Task<T, (), RandomId>>, VecDequeError>(
253 VecDequeError::PollError(Arc::new(
254 "Failed to acquire lock".into(),
255 )),
256 ),
257 (p, config, poller, ctx),
258 ));
259 };
260 let item = db.pop_front();
261 drop(db);
262 if let Some(item) = item {
263 config.prev_count.store(1, Ordering::Relaxed);
264 Some((Ok::<_, VecDequeError>(Some(item)), (p, config, poller, ctx)))
265 } else {
266 config.prev_count.store(0, Ordering::Relaxed);
267 Some((
268 Ok::<Option<Task<T, (), RandomId>>, VecDequeError>(None),
269 (p, config, poller, ctx),
270 ))
271 }
272 },
273 )
274 .boxed()
275 },
276 )
277 .sink(|db, _| {
278 Box::pin(sink::unfold(db.clone(), move |p, item| {
279 async move {
280 let Some(mut db) = p.inner.try_lock() else {
281 return Err(VecDequeError::PollError(Arc::new(
282 "Failed to acquire lock".into(),
283 )));
284 };
285 db.push_back(item);
286 drop(db);
287 Ok::<_, VecDequeError>(p)
288 }
289 .boxed()
290 .shared()
291 })) as _
292 })
293 .build()
294 .unwrap();
295
296 VecDequeBackend(backend)
297}