apalis_core/backend/poll_strategy/
builder.rs1use futures_core::Stream;
2use futures_util::StreamExt;
3use std::{pin::Pin, sync::Arc};
4
5use crate::backend::poll_strategy::{
6 BoxedPollStrategy, PollContext, PollStrategy, RaceNext, WrapperStrategy,
7};
8
9pub struct StrategyBuilder {
11 strategies: Vec<BoxedPollStrategy>,
12}
13
14impl std::fmt::Debug for StrategyBuilder {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 f.debug_struct("StrategyBuilder")
17 .field("strategies", &self.strategies.len())
18 .finish()
19 }
20}
21
22impl Default for StrategyBuilder {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl StrategyBuilder {
29 #[must_use]
31 pub fn new() -> Self {
32 Self {
33 strategies: Vec::new(),
34 }
35 }
36
37 #[must_use]
41 pub fn apply<S, Stm>(mut self, strategy: S) -> Self
42 where
43 S: PollStrategy<Stream = Stm> + 'static + Sync + Send,
44 Stm: Stream<Item = ()> + Send + 'static,
45 {
46 self.strategies
47 .push(Box::new(WrapperStrategy::new(strategy)));
48 self
49 }
50
51 #[must_use]
55 pub fn build(self) -> MultiStrategy {
56 MultiStrategy {
57 strategies: Arc::new(std::sync::Mutex::new(self.strategies)),
58 }
59 }
60}
61
62#[derive(Clone)]
66pub struct MultiStrategy {
67 strategies: Arc<std::sync::Mutex<Vec<BoxedPollStrategy>>>,
68}
69
70impl std::fmt::Debug for MultiStrategy {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("MultiStrategy")
73 .field("strategies", &self.strategies.lock().unwrap().len())
74 .finish()
75 }
76}
77
78impl PollStrategy for MultiStrategy {
79 type Stream = Pin<Box<dyn Stream<Item = ()> + Send>>;
80
81 fn poll_strategy(self: Box<Self>, ctx: &PollContext) -> Self::Stream {
82 let ctx = ctx.clone();
83 let mut streams = self
84 .strategies
85 .lock()
86 .unwrap()
87 .drain(..)
88 .map(move |s| {
89 let ctx = ctx.clone();
90 s.poll_strategy(&ctx)
91 })
92 .collect::<Vec<_>>();
93 streams.reverse();
95 RaceNext::new(streams).map(|(_idx, _)| ()).boxed()
96 }
97}