apalis_core/backend/impls/
dequeue.rs

1#![cfg(feature = "sleep")]
2use std::{
3    collections::VecDeque,
4    fmt,
5    fmt::Debug,
6    pin::Pin,
7    sync::{
8        Arc,
9        atomic::{AtomicUsize, Ordering},
10    },
11    time::Duration,
12};
13
14use futures_core::stream::BoxStream;
15use futures_sink::Sink;
16use futures_util::{
17    FutureExt, StreamExt, TryStreamExt,
18    lock::Mutex,
19    sink::{self},
20    stream,
21};
22use tower_layer::Identity;
23
24use crate::{
25    backend::{
26        Backend, BackendExt,
27        codec::IdentityCodec,
28        custom::{BackendBuilder, CustomBackend},
29        poll_strategy::{
30            IntervalStrategy, MultiStrategy, PollContext, PollStrategyExt, StrategyBuilder,
31        },
32    },
33    error::BoxDynError,
34    task::{Task, task_id::RandomId},
35    worker::context::WorkerContext,
36};
37
38/// Wrapper type for the shared
39#[derive(Debug)]
40pub struct InMemoryDb<T> {
41    inner: Arc<Mutex<VecDeque<Task<T, (), RandomId>>>>,
42}
43
44impl<T> Clone for InMemoryDb<T> {
45    fn clone(&self) -> Self {
46        Self {
47            inner: self.inner.clone(),
48        }
49    }
50}
51
52impl<T> InMemoryDb<T> {
53    /// Create a new InMemoryDb instance
54    #[must_use]
55    pub fn new() -> Self {
56        Self {
57            inner: Arc::new(Mutex::new(VecDeque::new())),
58        }
59    }
60
61    /// Consume the InMemoryDb and return the inner Arc<Mutex<...>>
62    #[must_use]
63    pub fn into_inner(self) -> Arc<Mutex<VecDeque<Task<T, (), RandomId>>>> {
64        self.inner
65    }
66
67    /// Get a reference to the inner Arc<Mutex<...>>
68    #[must_use]
69    pub fn as_arc(&self) -> &Arc<Mutex<VecDeque<Task<T, (), RandomId>>>> {
70        &self.inner
71    }
72}
73
74impl<T> Default for InMemoryDb<T> {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80/// Configuration for the in-memory VecDeque backend
81#[derive(Debug, Clone)]
82pub struct Config {
83    strategy: MultiStrategy,
84    prev_count: Arc<AtomicUsize>,
85}
86/// Type alias for the boxed sink type
87pub type BoxSink<'a, T> = Pin<Box<dyn Sink<T, Error = VecDequeError> + Send + Sync + 'a>>;
88/// Type alias for the sink type
89pub type InMemorySink<T> = BoxSink<'static, Task<T, (), RandomId>>;
90
91/// Type alias for the complete in-memory backend
92pub struct VecDequeBackend<T>(
93    CustomBackend<
94        T,
95        InMemoryDb<T>,
96        BoxStream<'static, Result<Option<Task<T, (), RandomId>>, VecDequeError>>,
97        InMemorySink<T>,
98        RandomId,
99        Config,
100    >,
101);
102
103impl<T> Debug for VecDequeBackend<T> {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        f.debug_struct("VecDequeBackend<T>").finish()
106    }
107}
108
109impl<T> Clone for VecDequeBackend<T> {
110    fn clone(&self) -> Self {
111        Self(self.0.clone())
112    }
113}
114
115/// Errors encountered while using the `VecDequeBackend`
116#[derive(Debug, thiserror::Error, Clone)]
117pub enum VecDequeError {
118    /// Error occurred during polling
119    #[error("Polling error: {0}")]
120    PollError(Arc<BoxDynError>),
121    /// Error occurred during sending
122    #[error("Sending error: {0}")]
123    SendError(Arc<BoxDynError>),
124}
125
126impl<T> Backend for VecDequeBackend<T>
127where
128    T: Send + 'static,
129{
130    type Args = T;
131
132    type IdType = RandomId;
133
134    type Context = ();
135
136    type Stream = BoxStream<'static, Result<Option<Task<T, (), Self::IdType>>, VecDequeError>>;
137
138    type Layer = Identity;
139
140    type Beat = BoxStream<'static, Result<(), VecDequeError>>;
141
142    type Error = VecDequeError;
143
144    fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
145        self.0
146            .heartbeat(worker)
147            .map_err(|e| VecDequeError::PollError(Arc::new(e.into())))
148            .boxed()
149    }
150
151    fn middleware(&self) -> Self::Layer {
152        self.0.middleware()
153    }
154    fn poll(self, worker: &WorkerContext) -> Self::Stream {
155        self.0
156            .poll(worker)
157            .map_err(|e| VecDequeError::PollError(Arc::new(e.into())))
158            .boxed()
159    }
160}
161
162impl<T: Clone> BackendExt for VecDequeBackend<T>
163where
164    T: Send + 'static,
165{
166    type Codec = IdentityCodec;
167    type Compact = T;
168    type CompactStream = Self::Stream;
169
170    fn get_queue(&self) -> crate::backend::queue::Queue {
171        std::any::type_name::<T>().into()
172    }
173
174    fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
175        self.0
176            .poll(worker)
177            .map_err(|e| VecDequeError::PollError(Arc::new(e.into())))
178            .boxed()
179    }
180}
181
182impl<T> Sink<Task<T, (), RandomId>> for VecDequeBackend<T>
183where
184    T: Send + 'static,
185{
186    type Error = VecDequeError;
187
188    fn poll_ready(
189        self: Pin<&mut Self>,
190        cx: &mut std::task::Context<'_>,
191    ) -> std::task::Poll<Result<(), Self::Error>> {
192        Pin::new(&mut self.get_mut().0)
193            .poll_ready(cx)
194            .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
195    }
196
197    fn start_send(self: Pin<&mut Self>, item: Task<T, (), RandomId>) -> Result<(), Self::Error> {
198        Pin::new(&mut self.get_mut().0)
199            .start_send(item)
200            .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
201    }
202
203    fn poll_flush(
204        self: Pin<&mut Self>,
205        cx: &mut std::task::Context<'_>,
206    ) -> std::task::Poll<Result<(), Self::Error>> {
207        Pin::new(&mut self.get_mut().0)
208            .poll_flush(cx)
209            .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
210    }
211
212    fn poll_close(
213        self: Pin<&mut Self>,
214        cx: &mut std::task::Context<'_>,
215    ) -> std::task::Poll<Result<(), Self::Error>> {
216        Pin::new(&mut self.get_mut().0)
217            .poll_close(cx)
218            .map_err(|e| VecDequeError::SendError(Arc::new(e.into())))
219    }
220}
221
222/// Create an in-memory `VecDeque` backend with polling strategy for tasks of type T
223#[must_use]
224pub fn backend<T>(poll_interval: Duration) -> VecDequeBackend<T>
225where
226    T: Send + 'static,
227{
228    let memory = InMemoryDb::new();
229
230    let strategy = StrategyBuilder::new()
231        .apply(IntervalStrategy::new(poll_interval))
232        .build();
233
234    let config = Config {
235        strategy,
236        prev_count: Arc::new(AtomicUsize::new(1)),
237    };
238
239    let backend = BackendBuilder::new_with_cfg(config)
240        .database(memory)
241        .fetcher(
242            |db: &mut InMemoryDb<T>, config: &Config, worker: &WorkerContext| {
243                let poll_strategy = config.strategy.clone();
244                let poll_ctx = PollContext::new(worker.clone(), config.prev_count.clone());
245                let poller = poll_strategy.build_stream(&poll_ctx);
246                stream::unfold(
247                    (db.clone(), config.clone(), poller, worker.clone()),
248                    |(p, config, mut poller, ctx)| async move {
249                        poller.next().await;
250                        let Some(mut db) = p.inner.try_lock() else {
251                            return Some((
252                                Err::<Option<Task<T, (), RandomId>>, VecDequeError>(
253                                    VecDequeError::PollError(Arc::new(
254                                        "Failed to acquire lock".into(),
255                                    )),
256                                ),
257                                (p, config, poller, ctx),
258                            ));
259                        };
260                        let item = db.pop_front();
261                        drop(db);
262                        if let Some(item) = item {
263                            config.prev_count.store(1, Ordering::Relaxed);
264                            Some((Ok::<_, VecDequeError>(Some(item)), (p, config, poller, ctx)))
265                        } else {
266                            config.prev_count.store(0, Ordering::Relaxed);
267                            Some((
268                                Ok::<Option<Task<T, (), RandomId>>, VecDequeError>(None),
269                                (p, config, poller, ctx),
270                            ))
271                        }
272                    },
273                )
274                .boxed()
275            },
276        )
277        .sink(|db, _| {
278            Box::pin(sink::unfold(db.clone(), move |p, item| {
279                async move {
280                    let Some(mut db) = p.inner.try_lock() else {
281                        return Err(VecDequeError::PollError(Arc::new(
282                            "Failed to acquire lock".into(),
283                        )));
284                    };
285                    db.push_back(item);
286                    drop(db);
287                    Ok::<_, VecDequeError>(p)
288                }
289                .boxed()
290                .shared()
291            })) as _
292        })
293        .build()
294        .unwrap();
295
296    VecDequeBackend(backend)
297}