#![allow(dead_code)]
use std::sync::Arc;
use std::time::Duration;
use crate::reporter::Reporter;
use crate::server::ServingState;
use crate::serving_core::ServingCore;
pub(crate) struct HeartbeatSnapshot {
pub requests: u64,
pub ts_issued: u64,
pub not_leader: u64,
pub transitions: u64,
pub fence_retries: u64,
pub last_transition_unix_ms: Option<u64>,
}
impl HeartbeatSnapshot {
pub(crate) fn sample(r: &Reporter) -> Self {
Self {
requests: r.get_ts_requests.snapshot(),
ts_issued: r.timestamps_issued.snapshot(),
not_leader: r.not_leader.snapshot(),
transitions: r.leader_transitions.snapshot(),
fence_retries: r.fence_transient_retries.snapshot(),
last_transition_unix_ms: r.last_leader_transition.snapshot(),
}
}
}
pub(crate) fn age_secs_from(then_unix_ms: u64) -> u64 {
let now_ms = crate::reporter::now_unix_ms();
now_ms.saturating_sub(then_unix_ms) / 1000
}
pub(crate) async fn run_heartbeat(
interval: Duration,
core: Arc<ServingCore>,
reporter: Arc<Reporter>,
cancel: tokio::sync::oneshot::Receiver<()>,
) {
let mut prev = HeartbeatSnapshot::sample(&reporter);
let started = reporter.started_at;
let mut cancel = cancel;
loop {
tokio::select! {
biased;
_ = &mut cancel => break,
_ = tokio::time::sleep(interval) => {
let curr = HeartbeatSnapshot::sample(&reporter);
let state = core.serving_state();
let epoch = core.current_epoch();
let last_age = curr.last_transition_unix_ms.map(age_secs_from);
tracing::info!(
target: "tsoracle::heartbeat",
uptime_secs = started.elapsed().as_secs(),
serving = matches!(state, ServingState::Serving),
epoch = ?epoch.map(|e| e.0),
requests = curr.requests.wrapping_sub(prev.requests),
requests_total = curr.requests,
ts_issued = curr.ts_issued.wrapping_sub(prev.ts_issued),
not_leader = curr.not_leader.wrapping_sub(prev.not_leader),
transitions = curr.transitions.wrapping_sub(prev.transitions),
fence_retries = curr.fence_retries.wrapping_sub(prev.fence_retries),
last_transition_age_secs = ?last_age,
"heartbeat"
);
prev = curr;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex as StdMutex;
use tracing_subscriber::fmt::MakeWriter;
#[derive(Clone, Default)]
struct BufWriter(std::sync::Arc<StdMutex<Vec<u8>>>);
impl<'a> MakeWriter<'a> for BufWriter {
type Writer = BufWriterHandle;
fn make_writer(&'a self) -> Self::Writer {
BufWriterHandle(self.0.clone())
}
}
struct BufWriterHandle(std::sync::Arc<StdMutex<Vec<u8>>>);
impl std::io::Write for BufWriterHandle {
fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(b);
Ok(b.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn emits_after_each_interval() {
let buf = BufWriter::default();
let subscriber = tracing_subscriber::fmt()
.with_writer(buf.clone())
.with_max_level(tracing::Level::INFO)
.with_target(true)
.with_ansi(false)
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let reporter = std::sync::Arc::new(Reporter::new());
let core = std::sync::Arc::new(ServingCore::new(Duration::from_secs(3)));
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let hb_fut = run_heartbeat(
Duration::from_millis(50),
core.clone(),
reporter.clone(),
rx,
);
let ctrl_fut = async move {
for _ in 0..3 {
tokio::time::advance(Duration::from_millis(55)).await;
tokio::task::yield_now().await;
}
drop(tx);
};
tokio::join!(hb_fut, ctrl_fut);
let output = String::from_utf8(buf.0.lock().unwrap().clone()).unwrap();
let lines = output
.lines()
.filter(|l| l.contains("tsoracle::heartbeat"))
.count();
assert!(
lines >= 3,
"expected >= 3 heartbeat lines, got {lines}.\nFull output:\n{output}"
);
}
#[test]
fn snapshot_reflects_current_counter_values() {
let r = Reporter::new();
r.get_ts_requests.increment(3);
r.timestamps_issued.increment(8);
r.leader_transitions.increment(1);
r.last_leader_transition.touch_now();
let s = HeartbeatSnapshot::sample(&r);
assert_eq!(s.requests, 3);
assert_eq!(s.ts_issued, 8);
assert_eq!(s.transitions, 1);
assert_eq!(s.not_leader, 0);
assert!(s.last_transition_unix_ms.is_some());
}
#[test]
fn age_zero_on_future_timestamp() {
let now = crate::reporter::now_unix_ms();
let future = now + 1000;
assert_eq!(age_secs_from(future), 0);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn idle_tick_still_emits_with_zero_deltas() {
let buf = BufWriter::default();
let subscriber = tracing_subscriber::fmt()
.with_writer(buf.clone())
.with_max_level(tracing::Level::INFO)
.with_target(true)
.with_ansi(false)
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let reporter = std::sync::Arc::new(Reporter::new());
let core = std::sync::Arc::new(ServingCore::new(Duration::from_secs(3)));
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let hb_fut = run_heartbeat(
Duration::from_millis(50),
core.clone(),
reporter.clone(),
rx,
);
let ctrl_fut = async move {
tokio::time::advance(Duration::from_millis(55)).await;
tokio::task::yield_now().await;
drop(tx);
};
tokio::join!(hb_fut, ctrl_fut);
let output = String::from_utf8(buf.0.lock().unwrap().clone()).unwrap();
assert!(
output.contains("requests=0"),
"idle tick should report requests=0: {output}"
);
assert!(
output.contains("requests_total=0"),
"expected requests_total=0: {output}"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn deltas_reset_each_tick() {
let buf = BufWriter::default();
let subscriber = tracing_subscriber::fmt()
.with_writer(buf.clone())
.with_max_level(tracing::Level::INFO)
.with_target(true)
.with_ansi(false)
.finish();
let _guard = tracing::subscriber::set_default(subscriber);
let reporter = std::sync::Arc::new(Reporter::new());
let core = std::sync::Arc::new(ServingCore::new(Duration::from_secs(3)));
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let reporter2 = reporter.clone();
let hb_fut = run_heartbeat(
Duration::from_millis(50),
core.clone(),
reporter.clone(),
rx,
);
let ctrl_fut = async move {
reporter2.get_ts_requests.increment(3);
tokio::time::advance(Duration::from_millis(55)).await;
tokio::task::yield_now().await;
reporter2.get_ts_requests.increment(5);
tokio::time::advance(Duration::from_millis(55)).await;
tokio::task::yield_now().await;
drop(tx);
};
tokio::join!(hb_fut, ctrl_fut);
let output = String::from_utf8(buf.0.lock().unwrap().clone()).unwrap();
assert!(
output.contains("requests=3"),
"tick 1 should report requests=3: {output}"
);
assert!(
output.contains("requests=5"),
"tick 2 should report requests=5: {output}"
);
assert!(
!output.contains("requests=8"),
"delta should reset; got cumulative: {output}"
);
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn cancel_terminates_promptly_on_sender_drop() {
let reporter = std::sync::Arc::new(Reporter::new());
let core = std::sync::Arc::new(ServingCore::new(Duration::from_secs(3)));
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
drop(tx);
tokio::time::timeout(
Duration::from_millis(50),
run_heartbeat(Duration::from_secs(60), core.clone(), reporter.clone(), rx),
)
.await
.expect("heartbeat did not terminate after cancel sender dropped");
}
}