apalis_core/backend/poll_strategy/
mod.rs1use crate::worker::context::WorkerContext;
23use futures_core::Stream;
24use futures_util::StreamExt;
25use std::{
26 pin::Pin,
27 sync::{Arc, atomic::AtomicUsize},
28};
29
30mod strategies;
31pub use strategies::*;
32mod builder;
33pub use builder::*;
34mod race_next;
35pub use race_next::*;
36
37pub type BoxedPollStrategy =
39 Box<dyn PollStrategy<Stream = Pin<Box<dyn Stream<Item = ()> + Send>>> + Send + Sync + 'static>;
40
41pub trait PollStrategy {
44 type Stream: Stream + Send;
46
47 fn poll_strategy(self: Box<Self>, ctx: &PollContext) -> Self::Stream;
49}
50
51impl<T: Sized> PollStrategyExt for T where T: PollStrategy {}
52
53pub trait PollStrategyExt: PollStrategy + Sized {
55 fn build_stream(self, ctx: &PollContext) -> Pin<Box<dyn Stream<Item = ()> + Send>>
58 where
59 Self::Stream: 'static,
60 {
61 let this = Box::new(self);
62 this.poll_strategy(ctx).map(|_| ()).boxed()
63 }
64}
65
66#[derive(Debug, Clone)]
69pub struct PollContext {
70 worker: WorkerContext,
71 prev_count: Arc<AtomicUsize>,
72}
73
74impl PollContext {
75 pub fn new(worker: WorkerContext, prev_count: Arc<AtomicUsize>) -> Self {
77 Self { worker, prev_count }
78 }
79 #[must_use]
81 pub fn worker(&self) -> &WorkerContext {
82 &self.worker
83 }
84 #[must_use]
86 pub fn prev_count(&self) -> &Arc<AtomicUsize> {
87 &self.prev_count
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use std::{
94 collections::VecDeque,
95 sync::{Arc, atomic::Ordering},
96 time::Duration,
97 };
98
99 use futures_channel::mpsc;
100
101 use futures_util::{
102 FutureExt, SinkExt, StreamExt,
103 lock::Mutex,
104 sink,
105 stream::{self},
106 };
107
108 use crate::{
109 error::BoxDynError,
110 task::Task,
111 worker::{
112 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
113 },
114 };
115
116 use super::*;
117
118 const ITEMS: u32 = 10;
119
120 type InMemoryQueue<T> = Arc<Mutex<VecDeque<Task<T, ()>>>>;
121
122 #[tokio::test]
123 #[cfg(feature = "sleep")]
124 async fn basic_strategy_backend() {
125 use crate::backend::custom::BackendBuilder;
126
127 let memory: InMemoryQueue<u32> = Arc::new(Mutex::new(VecDeque::new()));
128
129 #[derive(Clone)]
130 struct Config {
131 strategy: MultiStrategy,
132 prev_count: Arc<AtomicUsize>,
133 }
134 let strategy = StrategyBuilder::new()
135 .apply(IntervalStrategy::new(Duration::from_millis(100)))
136 .build();
137
138 let config = Config {
139 strategy,
140 prev_count: Arc::new(AtomicUsize::new(1)),
141 };
142
143 let mut backend = BackendBuilder::new_with_cfg(config)
144 .database(memory)
145 .fetcher(|db, config, worker| {
146 let poll_strategy = config.strategy.clone();
147 let poll_ctx = PollContext::new(worker.clone(), config.prev_count.clone());
148 let poller = poll_strategy.build_stream(&poll_ctx);
149 stream::unfold(
150 (db.clone(), config.clone(), poller, worker.clone()),
151 |(p, config, mut poller, ctx)| async move {
152 let _ = poller.next().await;
153 let mut db = p.lock().await;
154 let item = db.pop_front();
155 drop(db);
156 if let Some(item) = item {
157 config.prev_count.store(1, Ordering::Relaxed);
158 Some((Ok::<_, BoxDynError>(Some(item)), (p, config, poller, ctx)))
159 } else {
160 config.prev_count.store(0, Ordering::Relaxed);
161 Some((
162 Ok::<Option<Task<u32, ()>>, BoxDynError>(None),
163 (p, config, poller, ctx),
164 ))
165 }
166 },
167 )
168 .boxed()
169 })
170 .sink(|db, _| {
171 sink::unfold(db.clone(), move |p, item| {
172 async move {
173 let mut db = p.lock().await;
174 db.push_back(item);
175 drop(db);
176 Ok::<_, BoxDynError>(p)
177 }
178 .boxed()
179 })
180 })
181 .build()
182 .unwrap();
183
184 for i in 0..ITEMS {
185 backend.send(Task::new(i)).await.unwrap();
186 }
187
188 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
189 tokio::time::sleep(Duration::from_secs(1)).await;
190 if task == ITEMS - 1 {
191 tokio::time::sleep(Duration::from_secs(5)).await;
192 ctx.stop().unwrap();
193 return Err("Worker stopped!")?;
194 }
195 Ok(())
196 }
197
198 let worker = WorkerBuilder::new("rango-tango")
199 .backend(backend)
200 .on_event(|ctx, ev| {
201 println!("On Event = {ev:?} from {}", ctx.name());
202 })
203 .build(task);
204 worker.run().await.unwrap();
205 }
206
207 #[tokio::test]
208 #[cfg(feature = "sleep")]
209 async fn custom_strategy_backend() {
210 use crate::backend::custom::BackendBuilder;
211
212 let memory: InMemoryQueue<u32> = Arc::new(Mutex::new(VecDeque::new()));
213
214 #[derive(Clone)]
215 struct Config {
216 strategy: MultiStrategy,
217 prev_count: Arc<AtomicUsize>,
218 }
219
220 let backoff = BackoffConfig::new(Duration::from_secs(5))
221 .with_multiplier(1.5)
222 .with_jitter(0.2);
223 let interval = IntervalStrategy::new(Duration::from_millis(200)).with_backoff(backoff);
224
225 let when_i_am_ready = FutureStrategy::new(|_ctx: WorkerContext, _prev: usize| {
226 tokio::time::sleep(Duration::from_millis(1500)).map(|_| {
228 })
230 });
231
232 let (mut tx, rx) = mpsc::channel(1);
233
234 tokio::spawn(async move {
235 for i in 0..ITEMS {
236 tokio::time::sleep(Duration::from_secs((i) as u64)).await;
237 if tx.send(()).await.is_err() {
238 break;
239 }
240 }
241 });
242
243 let strategy = StrategyBuilder::new()
244 .apply(when_i_am_ready)
245 .apply(interval)
246 .apply(StreamStrategy::new(rx))
247 .build();
248
249 let config = Config {
250 strategy,
251 prev_count: Arc::new(AtomicUsize::new(1)),
252 };
253
254 let mut backend = BackendBuilder::new_with_cfg(config)
255 .database(memory)
256 .fetcher(|db, config, worker| {
257 let poll_strategy = config.strategy.clone();
258 let poll_ctx = PollContext::new(worker.clone(), config.prev_count.clone());
259 let poller = poll_strategy.build_stream(&poll_ctx);
260 stream::unfold(
261 (db.clone(), config.clone(), poller, worker.clone()),
262 |(p, config, mut poller, ctx)| async move {
263 poller.next().await;
264 let mut db = p.lock().await;
265 let item = db.pop_front();
266 drop(db);
267 if let Some(item) = item {
268 config.prev_count.store(1, Ordering::Relaxed);
269 Some((Ok::<_, BoxDynError>(Some(item)), (p, config, poller, ctx)))
270 } else {
271 config.prev_count.store(0, Ordering::Relaxed);
272 Some((
273 Ok::<Option<Task<u32, ()>>, BoxDynError>(None),
274 (p, config, poller, ctx),
275 ))
276 }
277 },
278 )
279 .boxed()
280 })
281 .sink(|db, _| {
282 sink::unfold(db.clone(), move |p, item| {
283 async move {
284 let mut db = p.lock().await;
285 db.push_back(item);
286 drop(db);
287 Ok::<_, BoxDynError>(p)
288 }
289 .boxed()
290 })
291 })
292 .build()
293 .unwrap();
294
295 for i in 0..ITEMS {
296 backend.send(Task::new(i)).await.unwrap();
297 }
298
299 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
300 tokio::time::sleep(Duration::from_secs(1)).await;
301 if task == ITEMS - 1 {
302 tokio::time::sleep(Duration::from_secs(10)).await;
303 ctx.stop().unwrap();
304 return Err("Worker stopped!")?;
305 }
306 Ok(())
307 }
308
309 let worker = WorkerBuilder::new("rango-tango")
310 .backend(backend)
311 .on_event(|ctx, ev| {
312 println!("On Event = {ev:?} from {}", ctx.name());
313 })
314 .build(task);
315 worker.run().await.unwrap();
316 }
317}