apalis_core/backend/poll_strategy/
mod.rs

1//! Polling strategies for backends.
2//!
3//! This module provides abstractions and implementations for different polling strategies
4//! used by backends to determine when to poll for new tasks. Strategies can be
5//! combined, customized, and extended to suit various workload requirements.
6//!
7//! # Features
8//!
9//! - Trait [`PollStrategy`] for defining custom polling strategies.
10//! - Extension trait [`PollStrategyExt`] for ergonomic usage.
11//! - [`PollContext`] struct for passing contextual information to strategies.
12//! - Boxed trait object type [`BoxedPollStrategy`] for dynamic dispatch.
13//! - Built-in strategies and combinators.
14//!
15//! # Usage
16//!
17//! Implement the [`PollStrategy`] trait for your custom strategy, or use the provided
18//! strategies and combinators. Use [`PollContext`] to access worker state and previous
19//! task counts.
20//!
21//! See submodules for available strategies and builder utilities.
22use crate::worker::context::WorkerContext;
23use futures_core::Stream;
24use futures_util::StreamExt;
25use std::{
26    pin::Pin,
27    sync::{Arc, atomic::AtomicUsize},
28};
29
30mod strategies;
31pub use strategies::*;
32mod builder;
33pub use builder::*;
34mod race_next;
35pub use race_next::*;
36
37/// A boxed poll strategy
38pub type BoxedPollStrategy =
39    Box<dyn PollStrategy<Stream = Pin<Box<dyn Stream<Item = ()> + Send>>> + Send + Sync + 'static>;
40
41/// A trait for different polling strategies
42/// All strategies can be combined in a race condition
43pub trait PollStrategy {
44    /// The stream returned by the strategy
45    type Stream: Stream + Send;
46
47    /// Create a stream that completes when the next poll should occur
48    fn poll_strategy(self: Box<Self>, ctx: &PollContext) -> Self::Stream;
49}
50
51impl<T: Sized> PollStrategyExt for T where T: PollStrategy {}
52
53/// Extension trait for PollStrategy
54pub trait PollStrategyExt: PollStrategy + Sized {
55    /// Build a boxed stream from the strategy
56    /// This is a convenience method that boxes the strategy and calls `poll_strategy`
57    fn build_stream(self, ctx: &PollContext) -> Pin<Box<dyn Stream<Item = ()> + Send>>
58    where
59        Self::Stream: 'static,
60    {
61        let this = Box::new(self);
62        this.poll_strategy(ctx).map(|_| ()).boxed()
63    }
64}
65
66/// Context provided to the polling strategies
67/// Includes the worker context and a reference to the previous count of tasks received
68#[derive(Debug, Clone)]
69pub struct PollContext {
70    worker: WorkerContext,
71    prev_count: Arc<AtomicUsize>,
72}
73
74impl PollContext {
75    /// Create a new PollContext
76    pub fn new(worker: WorkerContext, prev_count: Arc<AtomicUsize>) -> Self {
77        Self { worker, prev_count }
78    }
79    /// Get a reference to the worker context
80    #[must_use]
81    pub fn worker(&self) -> &WorkerContext {
82        &self.worker
83    }
84    /// Get a reference to the previous count of tasks received
85    #[must_use]
86    pub fn prev_count(&self) -> &Arc<AtomicUsize> {
87        &self.prev_count
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use std::{
94        collections::VecDeque,
95        sync::{Arc, atomic::Ordering},
96        time::Duration,
97    };
98
99    use futures_channel::mpsc;
100
101    use futures_util::{
102        FutureExt, SinkExt, StreamExt,
103        lock::Mutex,
104        sink,
105        stream::{self},
106    };
107
108    use crate::{
109        error::BoxDynError,
110        task::Task,
111        worker::{
112            builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
113        },
114    };
115
116    use super::*;
117
118    const ITEMS: u32 = 10;
119
120    type InMemoryQueue<T> = Arc<Mutex<VecDeque<Task<T, ()>>>>;
121
122    #[tokio::test]
123    #[cfg(feature = "sleep")]
124    async fn basic_strategy_backend() {
125        use crate::backend::custom::BackendBuilder;
126
127        let memory: InMemoryQueue<u32> = Arc::new(Mutex::new(VecDeque::new()));
128
129        #[derive(Clone)]
130        struct Config {
131            strategy: MultiStrategy,
132            prev_count: Arc<AtomicUsize>,
133        }
134        let strategy = StrategyBuilder::new()
135            .apply(IntervalStrategy::new(Duration::from_millis(100)))
136            .build();
137
138        let config = Config {
139            strategy,
140            prev_count: Arc::new(AtomicUsize::new(1)),
141        };
142
143        let mut backend = BackendBuilder::new_with_cfg(config)
144            .database(memory)
145            .fetcher(|db, config, worker| {
146                let poll_strategy = config.strategy.clone();
147                let poll_ctx = PollContext::new(worker.clone(), config.prev_count.clone());
148                let poller = poll_strategy.build_stream(&poll_ctx);
149                stream::unfold(
150                    (db.clone(), config.clone(), poller, worker.clone()),
151                    |(p, config, mut poller, ctx)| async move {
152                        let _ = poller.next().await;
153                        let mut db = p.lock().await;
154                        let item = db.pop_front();
155                        drop(db);
156                        if let Some(item) = item {
157                            config.prev_count.store(1, Ordering::Relaxed);
158                            Some((Ok::<_, BoxDynError>(Some(item)), (p, config, poller, ctx)))
159                        } else {
160                            config.prev_count.store(0, Ordering::Relaxed);
161                            Some((
162                                Ok::<Option<Task<u32, ()>>, BoxDynError>(None),
163                                (p, config, poller, ctx),
164                            ))
165                        }
166                    },
167                )
168                .boxed()
169            })
170            .sink(|db, _| {
171                sink::unfold(db.clone(), move |p, item| {
172                    async move {
173                        let mut db = p.lock().await;
174                        db.push_back(item);
175                        drop(db);
176                        Ok::<_, BoxDynError>(p)
177                    }
178                    .boxed()
179                })
180            })
181            .build()
182            .unwrap();
183
184        for i in 0..ITEMS {
185            backend.send(Task::new(i)).await.unwrap();
186        }
187
188        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
189            tokio::time::sleep(Duration::from_secs(1)).await;
190            if task == ITEMS - 1 {
191                tokio::time::sleep(Duration::from_secs(5)).await;
192                ctx.stop().unwrap();
193                return Err("Worker stopped!")?;
194            }
195            Ok(())
196        }
197
198        let worker = WorkerBuilder::new("rango-tango")
199            .backend(backend)
200            .on_event(|ctx, ev| {
201                println!("On Event = {ev:?} from {}", ctx.name());
202            })
203            .build(task);
204        worker.run().await.unwrap();
205    }
206
207    #[tokio::test]
208    #[cfg(feature = "sleep")]
209    async fn custom_strategy_backend() {
210        use crate::backend::custom::BackendBuilder;
211
212        let memory: InMemoryQueue<u32> = Arc::new(Mutex::new(VecDeque::new()));
213
214        #[derive(Clone)]
215        struct Config {
216            strategy: MultiStrategy,
217            prev_count: Arc<AtomicUsize>,
218        }
219
220        let backoff = BackoffConfig::new(Duration::from_secs(5))
221            .with_multiplier(1.5)
222            .with_jitter(0.2);
223        let interval = IntervalStrategy::new(Duration::from_millis(200)).with_backoff(backoff);
224
225        let when_i_am_ready = FutureStrategy::new(|_ctx: WorkerContext, _prev: usize| {
226            // println!("Waiting to be ready...");
227            tokio::time::sleep(Duration::from_millis(1500)).map(|_| {
228                // println!("I am ready now!");
229            })
230        });
231
232        let (mut tx, rx) = mpsc::channel(1);
233
234        tokio::spawn(async move {
235            for i in 0..ITEMS {
236                tokio::time::sleep(Duration::from_secs((i) as u64)).await;
237                if tx.send(()).await.is_err() {
238                    break;
239                }
240            }
241        });
242
243        let strategy = StrategyBuilder::new()
244            .apply(when_i_am_ready)
245            .apply(interval)
246            .apply(StreamStrategy::new(rx))
247            .build();
248
249        let config = Config {
250            strategy,
251            prev_count: Arc::new(AtomicUsize::new(1)),
252        };
253
254        let mut backend = BackendBuilder::new_with_cfg(config)
255            .database(memory)
256            .fetcher(|db, config, worker| {
257                let poll_strategy = config.strategy.clone();
258                let poll_ctx = PollContext::new(worker.clone(), config.prev_count.clone());
259                let poller = poll_strategy.build_stream(&poll_ctx);
260                stream::unfold(
261                    (db.clone(), config.clone(), poller, worker.clone()),
262                    |(p, config, mut poller, ctx)| async move {
263                        poller.next().await;
264                        let mut db = p.lock().await;
265                        let item = db.pop_front();
266                        drop(db);
267                        if let Some(item) = item {
268                            config.prev_count.store(1, Ordering::Relaxed);
269                            Some((Ok::<_, BoxDynError>(Some(item)), (p, config, poller, ctx)))
270                        } else {
271                            config.prev_count.store(0, Ordering::Relaxed);
272                            Some((
273                                Ok::<Option<Task<u32, ()>>, BoxDynError>(None),
274                                (p, config, poller, ctx),
275                            ))
276                        }
277                    },
278                )
279                .boxed()
280            })
281            .sink(|db, _| {
282                sink::unfold(db.clone(), move |p, item| {
283                    async move {
284                        let mut db = p.lock().await;
285                        db.push_back(item);
286                        drop(db);
287                        Ok::<_, BoxDynError>(p)
288                    }
289                    .boxed()
290                })
291            })
292            .build()
293            .unwrap();
294
295        for i in 0..ITEMS {
296            backend.send(Task::new(i)).await.unwrap();
297        }
298
299        async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
300            tokio::time::sleep(Duration::from_secs(1)).await;
301            if task == ITEMS - 1 {
302                tokio::time::sleep(Duration::from_secs(10)).await;
303                ctx.stop().unwrap();
304                return Err("Worker stopped!")?;
305            }
306            Ok(())
307        }
308
309        let worker = WorkerBuilder::new("rango-tango")
310            .backend(backend)
311            .on_event(|ctx, ev| {
312                println!("On Event = {ev:?} from {}", ctx.name());
313            })
314            .build(task);
315        worker.run().await.unwrap();
316    }
317}