use std::sync::Arc;
use chrono::{Duration, Utc};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::error::{QmlError, Result};
use crate::storage::Storage;
pub const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::seconds(10);
pub const DEFAULT_DEAD_SERVER_TIMEOUT: Duration = Duration::seconds(60);
pub struct HeartbeatWorker {
storage: Arc<dyn Storage>,
server_id: String,
heartbeat_interval: Duration,
dead_server_timeout: Duration,
}
impl HeartbeatWorker {
pub fn new(
storage: Arc<dyn Storage>,
server_id: impl Into<String>,
heartbeat_interval: Duration,
dead_server_timeout: Duration,
) -> Self {
Self {
storage,
server_id: server_id.into(),
heartbeat_interval,
dead_server_timeout,
}
}
pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
info!(
"Starting heartbeat worker for server '{}' (interval: {:?}, dead_after: {:?})",
self.server_id, self.heartbeat_interval, self.dead_server_timeout
);
let mut tick = interval(self.heartbeat_interval.to_std().map_err(|e| {
QmlError::ConfigurationError {
message: format!("Invalid heartbeat interval: {}", e),
}
})?);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
debug!("Heartbeat worker exiting on cancellation");
return Ok(());
}
_ = tick.tick() => {}
}
if let Err(e) = self.bump_and_scan().await {
error!("Heartbeat tick failed: {}", e);
}
}
}
pub async fn bump_and_scan(&self) -> Result<()> {
let now = Utc::now();
match self.storage.heartbeat_server(&self.server_id, now).await {
Ok(true) => {}
Ok(false) => warn!(
"Heartbeat row missing for server '{}' — peer may have reclaimed us",
self.server_id
),
Err(e) => {
return Err(QmlError::StorageError {
message: format!("Heartbeat bump failed: {}", e),
});
}
}
let stale_before = now - self.dead_server_timeout;
let dead = self
.storage
.list_dead_servers(stale_before)
.await
.map_err(|e| QmlError::StorageError {
message: format!("list_dead_servers failed: {}", e),
})?;
for peer in dead {
if peer.server_id == self.server_id {
continue;
}
match self.storage.reclaim_jobs_from_server(&peer.server_id).await {
Ok(0) => {}
Ok(n) => info!("Reclaimed {} job(s) from dead peer '{}'", n, peer.server_id),
Err(e) => {
warn!(
"Failed to reclaim jobs from peer '{}': {}",
peer.server_id, e
);
continue;
}
}
if let Err(e) = self.storage.deregister_server(&peer.server_id).await {
warn!("Failed to deregister dead peer '{}': {}", peer.server_id, e);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{Job, JobState, ServerInfo};
use crate::storage::MemoryStorage;
#[tokio::test]
async fn bump_and_scan_reclaims_dead_peer_jobs() {
let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
let alive = ServerInfo::new("alive#1", "alive", 1, vec!["default".to_string()]);
storage.register_server(&alive).await.unwrap();
let mut dead = ServerInfo::new("dead#1", "dead", 1, vec!["default".to_string()]);
dead.last_heartbeat = Utc::now() - Duration::minutes(10);
storage.register_server(&dead).await.unwrap();
let mut job = Job::new("noop", serde_json::Value::Null);
job.state = JobState::Processing {
started_at: Utc::now() - Duration::minutes(5),
worker_id: "dead-worker".to_string(),
server_name: "dead#1".to_string(),
};
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
let worker = HeartbeatWorker::new(
storage.clone(),
"alive#1",
Duration::seconds(10),
Duration::seconds(30),
);
worker.bump_and_scan().await.unwrap();
let dead_list = storage
.list_dead_servers(Utc::now() + Duration::hours(1))
.await
.unwrap();
assert!(
dead_list.iter().all(|s| s.server_id != "dead#1"),
"dead peer should have been deregistered"
);
let reclaimed = storage.get(&job_id).await.unwrap().unwrap();
assert!(
matches!(reclaimed.state, JobState::Enqueued { .. }),
"dead peer's job should be Enqueued, got {:?}",
reclaimed.state
);
}
#[tokio::test]
async fn bump_and_scan_never_reclaims_self() {
let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
let mut me = ServerInfo::new("me#1", "me", 1, vec!["default".to_string()]);
me.last_heartbeat = Utc::now() - Duration::hours(1);
storage.register_server(&me).await.unwrap();
let mut job = Job::new("noop", serde_json::Value::Null);
job.state = JobState::Processing {
started_at: Utc::now(),
worker_id: "me-worker".to_string(),
server_name: "me#1".to_string(),
};
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
let worker = HeartbeatWorker::new(
storage.clone(),
"me#1",
Duration::seconds(10),
Duration::seconds(30),
);
worker.bump_and_scan().await.unwrap();
let after = storage.get(&job_id).await.unwrap().unwrap();
assert!(
matches!(after.state, JobState::Processing { .. }),
"self-owned job must not be reclaimed, got {:?}",
after.state
);
}
}