posemesh_compute_node/
heartbeat.rs1use anyhow::Result;
2use serde_json::Value;
3use tokio::sync::watch;
4use tokio::time::{sleep, Duration};
5
6#[derive(Clone, Debug, Default)]
8pub struct HeartbeatData {
9 pub progress: Value,
10 pub events: Value,
11}
12
13#[derive(Clone, Debug)]
15pub struct ProgressSender(watch::Sender<Option<HeartbeatData>>);
16
17#[derive(Debug)]
19pub struct ProgressReceiver(watch::Receiver<Option<HeartbeatData>>);
20
21pub fn progress_channel() -> (ProgressSender, ProgressReceiver) {
23 let (tx, rx) = watch::channel::<Option<HeartbeatData>>(None);
24 (ProgressSender(tx), ProgressReceiver(rx))
25}
26
27impl ProgressSender {
28 pub fn update(&self, progress: Value, events: Value) {
30 let _ = self
31 .0
32 .send_replace(Some(HeartbeatData { progress, events }));
33 }
34}
35
36impl ProgressReceiver {
37 pub(crate) async fn recv(&mut self) -> Option<HeartbeatData> {
38 if self.0.changed().await.is_err() {
39 return None;
40 }
41 self.0.borrow().clone()
42 }
43}
44
45#[derive(Clone, Debug)]
47pub struct ShutdownTx(watch::Sender<bool>);
48#[derive(Debug)]
49pub struct ShutdownRx(watch::Receiver<bool>);
50
51pub fn shutdown_channel() -> (ShutdownTx, ShutdownRx) {
53 let (tx, rx) = watch::channel(false);
54 (ShutdownTx(tx), ShutdownRx(rx))
55}
56
57impl ShutdownTx {
58 pub fn shutdown(&self) {
59 let _ = self.0.send(true);
60 }
61}
62
63pub async fn run_scheduler<F, Fut>(
70 mut progress_rx: ProgressReceiver,
71 mut shutdown_rx: ShutdownRx,
72 heartbeat_jitter_ms: u64,
73 mut on_heartbeat: F,
74) -> Result<()>
75where
76 F: FnMut(HeartbeatData) -> Fut + Send + 'static,
77 Fut: std::future::Future<Output = ()> + Send + 'static,
78{
79 loop {
80 tokio::select! {
81 changed = shutdown_rx.0.changed() => {
82 if changed.is_err() || *shutdown_rx.0.borrow() { break; }
83 }
84 changed = progress_rx.0.changed() => {
85 if changed.is_err() { break; }
86 {
88 let _ = progress_rx.0.borrow_and_update();
89 }
90 let jitter = jitter_delay_ms(heartbeat_jitter_ms);
91 if jitter > 0 { sleep(Duration::from_millis(jitter)).await; }
92 let latest: Option<HeartbeatData> = {
93 progress_rx.0.borrow_and_update().clone()
94 };
95 if let Some(data) = latest {
96 on_heartbeat(data).await;
97 }
98 }
99 }
100 }
101 Ok(())
102}
103
104fn jitter_delay_ms(max_ms: u64) -> u64 {
105 if max_ms == 0 {
106 return 0;
107 }
108 let now = std::time::SystemTime::now()
109 .duration_since(std::time::UNIX_EPOCH)
110 .unwrap_or_else(|_| Duration::from_secs(0));
111 let min = std::cmp::max(1, max_ms / 2);
112 let span = max_ms - min + 1;
113 min + ((now.subsec_millis() as u64) % span)
114}