apalis_core/poller/
mod.rs

1use futures::{future::BoxFuture, Future, FutureExt};
2use std::fmt::{self, Debug};
3use tower::layer::util::Identity;
4
5/// Util for controlling pollers
6pub mod controller;
7/// Util for controlled stream
8pub mod stream;
9
10/// A poller type that allows fetching from a stream and a heartbeat future that can be used to do periodic tasks
11pub struct Poller<S, L = Identity> {
12    /// The stream of jobs
13    pub stream: S,
14    /// The heartbeat for the backend
15    pub heartbeat: BoxFuture<'static, ()>,
16    /// The tower middleware provided by the backend
17    pub layer: L,
18    pub(crate) _priv: (),
19}
20
21impl<S> Poller<S, Identity> {
22    /// Build a new poller
23    pub fn new(stream: S, heartbeat: impl Future<Output = ()> + Send + 'static) -> Self {
24        Self::new_with_layer(stream, heartbeat, Identity::new())
25    }
26
27    /// Build a poller with layer
28    pub fn new_with_layer<L>(
29        stream: S,
30        heartbeat: impl Future<Output = ()> + Send + 'static,
31        layer: L,
32    ) -> Poller<S, L> {
33        Poller {
34            stream,
35            heartbeat: heartbeat.boxed(),
36            layer,
37            _priv: (),
38        }
39    }
40}
41
42impl<S, L> Debug for Poller<S, L>
43where
44    S: Debug,
45    L: Debug,
46{
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        f.debug_struct("Poller")
49            .field("stream", &self.stream)
50            .field("heartbeat", &"...")
51            .field("layer", &self.layer)
52            .finish()
53    }
54}
55
56const STOPPED: usize = 2;
57const PLUGGED: usize = 1;
58const UNPLUGGED: usize = 0;