apalis_core/backend/poll_strategy/
builder.rs

1use futures_core::Stream;
2use futures_util::StreamExt;
3use std::{pin::Pin, sync::Arc};
4
5use crate::backend::poll_strategy::{
6    BoxedPollStrategy, PollContext, PollStrategy, RaceNext, WrapperStrategy,
7};
8
9/// Builder for composing multiple polling strategies
10pub struct StrategyBuilder {
11    strategies: Vec<BoxedPollStrategy>,
12}
13
14impl std::fmt::Debug for StrategyBuilder {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        f.debug_struct("StrategyBuilder")
17            .field("strategies", &self.strategies.len())
18            .finish()
19    }
20}
21
22impl Default for StrategyBuilder {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl StrategyBuilder {
29    /// Create a new StrategyBuilder
30    #[must_use]
31    pub fn new() -> Self {
32        Self {
33            strategies: Vec::new(),
34        }
35    }
36
37    /// Apply a polling strategy to the builder
38    /// Strategies are executed in the order they are added, with the first strategy having the highest priority
39    /// In case of multiple strategies being ready at the same time, the first one added will be chosen
40    #[must_use]
41    pub fn apply<S, Stm>(mut self, strategy: S) -> Self
42    where
43        S: PollStrategy<Stream = Stm> + 'static + Sync + Send,
44        Stm: Stream<Item = ()> + Send + 'static,
45    {
46        self.strategies
47            .push(Box::new(WrapperStrategy::new(strategy)));
48        self
49    }
50
51    /// Build the MultiStrategy from the builder
52    /// Consumes the builder and returns a MultiStrategy
53    /// The MultiStrategy will contain all the strategies added to the builder
54    #[must_use]
55    pub fn build(self) -> MultiStrategy {
56        MultiStrategy {
57            strategies: Arc::new(std::sync::Mutex::new(self.strategies)),
58        }
59    }
60}
61
62/// A polling strategy that combines multiple strategies
63/// The strategies are executed in the order they were added to the builder
64/// In case of multiple strategies being ready at the same time, the first one added will be chosen
65#[derive(Clone)]
66pub struct MultiStrategy {
67    strategies: Arc<std::sync::Mutex<Vec<BoxedPollStrategy>>>,
68}
69
70impl std::fmt::Debug for MultiStrategy {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("MultiStrategy")
73            .field("strategies", &self.strategies.lock().unwrap().len())
74            .finish()
75    }
76}
77
78impl PollStrategy for MultiStrategy {
79    type Stream = Pin<Box<dyn Stream<Item = ()> + Send>>;
80
81    fn poll_strategy(self: Box<Self>, ctx: &PollContext) -> Self::Stream {
82        let ctx = ctx.clone();
83        let mut streams = self
84            .strategies
85            .lock()
86            .unwrap()
87            .drain(..)
88            .map(move |s| {
89                let ctx = ctx.clone();
90                s.poll_strategy(&ctx)
91            })
92            .collect::<Vec<_>>();
93        // Reverse to give priority to strategies in the order they were added
94        streams.reverse();
95        RaceNext::new(streams).map(|(_idx, _)| ()).boxed()
96    }
97}