posemesh_compute_node/
poller.rs1use anyhow::Result;
2use tokio::sync::watch;
3use tokio::time::{sleep, Duration};
4
5#[derive(Clone, Copy, Debug)]
7pub struct PollerConfig {
8 pub backoff_ms_min: u64,
9 pub backoff_ms_max: u64,
10}
11
12pub fn jittered_delay_ms(cfg: PollerConfig) -> u64 {
14 let min = cfg.backoff_ms_min.min(cfg.backoff_ms_max);
15 let max = cfg.backoff_ms_max.max(cfg.backoff_ms_min);
16 if min == max {
17 return min;
18 }
19 let span = max - min + 1;
20 let now = std::time::SystemTime::now()
21 .duration_since(std::time::UNIX_EPOCH)
22 .unwrap_or_else(|_| Duration::from_secs(0));
23 min + ((now.subsec_millis() as u64) % span)
24}
25
26#[derive(Clone, Debug)]
28pub struct ShutdownTx(watch::Sender<bool>);
29#[derive(Debug)]
30pub struct ShutdownRx(watch::Receiver<bool>);
31
32pub fn shutdown_channel() -> (ShutdownTx, ShutdownRx) {
34 let (tx, rx) = watch::channel(false);
35 (ShutdownTx(tx), ShutdownRx(rx))
36}
37
38impl ShutdownTx {
39 pub fn shutdown(&self) {
40 let _ = self.0.send(true);
41 }
42}
43
44pub async fn run_poller<F, Fut>(
46 cfg: PollerConfig,
47 mut shutdown_rx: ShutdownRx,
48 mut on_tick: F,
49) -> Result<()>
50where
51 F: FnMut() -> Fut + Send + 'static,
52 Fut: std::future::Future<Output = ()> + Send + 'static,
53{
54 loop {
55 on_tick().await;
56 let delay = jittered_delay_ms(cfg);
57 let mut remain = Duration::from_millis(delay);
58 while remain > Duration::from_millis(0) {
60 let step = remain.min(Duration::from_millis(50));
61 tokio::select! {
62 changed = shutdown_rx.0.changed() => {
63 if changed.is_err() || *shutdown_rx.0.borrow() { return Ok(()); }
64 }
65 _ = sleep(step) => {
66 remain -= step;
67 }
68 }
69 }
70 }
71}