posemesh_compute_node/
poller.rs

1use anyhow::Result;
2use tokio::sync::watch;
3use tokio::time::{sleep, Duration};
4
5/// Poller backoff configuration.
6#[derive(Clone, Copy, Debug)]
7pub struct PollerConfig {
8    pub backoff_ms_min: u64,
9    pub backoff_ms_max: u64,
10}
11
12/// Compute a jittered delay within [min, max] inclusive.
13pub fn jittered_delay_ms(cfg: PollerConfig) -> u64 {
14    let min = cfg.backoff_ms_min.min(cfg.backoff_ms_max);
15    let max = cfg.backoff_ms_max.max(cfg.backoff_ms_min);
16    if min == max {
17        return min;
18    }
19    let span = max - min + 1;
20    let now = std::time::SystemTime::now()
21        .duration_since(std::time::UNIX_EPOCH)
22        .unwrap_or_else(|_| Duration::from_secs(0));
23    min + ((now.subsec_millis() as u64) % span)
24}
25
26/// Shutdown signal for poller loop.
27#[derive(Clone, Debug)]
28pub struct ShutdownTx(watch::Sender<bool>);
29#[derive(Debug)]
30pub struct ShutdownRx(watch::Receiver<bool>);
31
32/// Create shutdown channel; set to true to stop loop.
33pub fn shutdown_channel() -> (ShutdownTx, ShutdownRx) {
34    let (tx, rx) = watch::channel(false);
35    (ShutdownTx(tx), ShutdownRx(rx))
36}
37
38impl ShutdownTx {
39    pub fn shutdown(&self) {
40        let _ = self.0.send(true);
41    }
42}
43
44/// Run a cancellable poller loop invoking `on_tick` between backoff periods.
45pub async fn run_poller<F, Fut>(
46    cfg: PollerConfig,
47    mut shutdown_rx: ShutdownRx,
48    mut on_tick: F,
49) -> Result<()>
50where
51    F: FnMut() -> Fut + Send + 'static,
52    Fut: std::future::Future<Output = ()> + Send + 'static,
53{
54    loop {
55        on_tick().await;
56        let delay = jittered_delay_ms(cfg);
57        let mut remain = Duration::from_millis(delay);
58        // Sleep in small chunks to observe cancellation quickly.
59        while remain > Duration::from_millis(0) {
60            let step = remain.min(Duration::from_millis(50));
61            tokio::select! {
62                changed = shutdown_rx.0.changed() => {
63                    if changed.is_err() || *shutdown_rx.0.borrow() { return Ok(()); }
64                }
65                _ = sleep(step) => {
66                    remain -= step;
67                }
68            }
69        }
70    }
71}