posemesh_compute_node/
heartbeat.rs

1use anyhow::Result;
2use serde_json::Value;
3use tokio::sync::watch;
4use tokio::time::{sleep, Duration};
5
6/// Payload carried by heartbeats: last progress and small event map.
7#[derive(Clone, Debug, Default)]
8pub struct HeartbeatData {
9    pub progress: Value,
10    pub events: Value,
11}
12
13/// Sender side of the progress channel.
14#[derive(Clone, Debug)]
15pub struct ProgressSender(watch::Sender<Option<HeartbeatData>>);
16
17/// Receiver side of the progress channel.
18#[derive(Debug)]
19pub struct ProgressReceiver(watch::Receiver<Option<HeartbeatData>>);
20
21/// Create a new progress channel. Only the latest value is relevant (watch).
22pub fn progress_channel() -> (ProgressSender, ProgressReceiver) {
23    let (tx, rx) = watch::channel::<Option<HeartbeatData>>(None);
24    (ProgressSender(tx), ProgressReceiver(rx))
25}
26
27impl ProgressSender {
28    /// Replace the current progress/events state.
29    pub fn update(&self, progress: Value, events: Value) {
30        let _ = self
31            .0
32            .send_replace(Some(HeartbeatData { progress, events }));
33    }
34}
35
36impl ProgressReceiver {
37    pub(crate) async fn recv(&mut self) -> Option<HeartbeatData> {
38        if self.0.changed().await.is_err() {
39            return None;
40        }
41        self.0.borrow().clone()
42    }
43}
44
45/// Shutdown signal for the heartbeat loop.
46#[derive(Clone, Debug)]
47pub struct ShutdownTx(watch::Sender<bool>);
48#[derive(Debug)]
49pub struct ShutdownRx(watch::Receiver<bool>);
50
51/// Create a shutdown channel (false by default). Set to true to stop the loop.
52pub fn shutdown_channel() -> (ShutdownTx, ShutdownRx) {
53    let (tx, rx) = watch::channel(false);
54    (ShutdownTx(tx), ShutdownRx(rx))
55}
56
57impl ShutdownTx {
58    pub fn shutdown(&self) {
59        let _ = self.0.send(true);
60    }
61}
62
63/// Debounced heartbeat scheduler.
64///
65/// - Listens for progress updates on a watch channel.
66/// - On change, waits a small jitter (0..=heartbeat_jitter_ms) and invokes `on_heartbeat`
67///   with the latest progress/events, coalescing multiple rapid updates into a single call.
68/// - Stops when `shutdown_rx` becomes true or all senders are dropped.
69pub async fn run_scheduler<F, Fut>(
70    mut progress_rx: ProgressReceiver,
71    mut shutdown_rx: ShutdownRx,
72    heartbeat_jitter_ms: u64,
73    mut on_heartbeat: F,
74) -> Result<()>
75where
76    F: FnMut(HeartbeatData) -> Fut + Send + 'static,
77    Fut: std::future::Future<Output = ()> + Send + 'static,
78{
79    loop {
80        tokio::select! {
81            changed = shutdown_rx.0.changed() => {
82                if changed.is_err() || *shutdown_rx.0.borrow() { break; }
83            }
84            changed = progress_rx.0.changed() => {
85                if changed.is_err() { break; }
86                // Mark current as observed to coalesce rapid updates during jitter.
87                {
88                    let _ = progress_rx.0.borrow_and_update();
89                }
90                let jitter = jitter_delay_ms(heartbeat_jitter_ms);
91                if jitter > 0 { sleep(Duration::from_millis(jitter)).await; }
92                let latest: Option<HeartbeatData> = {
93                    progress_rx.0.borrow_and_update().clone()
94                };
95                if let Some(data) = latest {
96                    on_heartbeat(data).await;
97                }
98            }
99        }
100    }
101    Ok(())
102}
103
104fn jitter_delay_ms(max_ms: u64) -> u64 {
105    if max_ms == 0 {
106        return 0;
107    }
108    let now = std::time::SystemTime::now()
109        .duration_since(std::time::UNIX_EPOCH)
110        .unwrap_or_else(|_| Duration::from_secs(0));
111    let min = std::cmp::max(1, max_ms / 2);
112    let span = max_ms - min + 1;
113    min + ((now.subsec_millis() as u64) % span)
114}