use celers_core::lock::DistributedLockBackend;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum BeatRole {
Leader,
Standby,
#[default]
Unknown,
}
impl std::fmt::Display for BeatRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BeatRole::Leader => write!(f, "leader"),
BeatRole::Standby => write!(f, "standby"),
BeatRole::Unknown => write!(f, "unknown"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatInfo {
pub instance_id: String,
pub role: BeatRole,
pub last_heartbeat_at: DateTime<Utc>,
pub schedule_count: usize,
pub next_due_task: Option<String>,
pub started_at: DateTime<Utc>,
pub version: String,
pub hostname: String,
}
impl HeartbeatInfo {
pub fn new(instance_id: String) -> Self {
Self {
instance_id,
role: BeatRole::Unknown,
last_heartbeat_at: Utc::now(),
schedule_count: 0,
next_due_task: None,
started_at: Utc::now(),
version: env!("CARGO_PKG_VERSION").to_string(),
hostname: hostname(),
}
}
pub fn with_role(mut self, role: BeatRole) -> Self {
self.role = role;
self
}
pub fn with_schedule_count(mut self, count: usize) -> Self {
self.schedule_count = count;
self
}
pub fn with_next_due_task(mut self, task: Option<String>) -> Self {
self.next_due_task = task;
self
}
}
fn hostname() -> String {
std::env::var("HOSTNAME")
.or_else(|_| std::env::var("COMPUTERNAME"))
.unwrap_or_else(|_| "unknown".to_string())
}
#[derive(Debug, Clone)]
pub struct HeartbeatConfig {
pub heartbeat_interval: Duration,
pub heartbeat_ttl: Duration,
pub leader_lease_ttl: Duration,
pub leader_renewal_interval: Duration,
pub failover_timeout: Duration,
pub leader_lock_key: String,
pub heartbeat_key_prefix: String,
}
impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
heartbeat_interval: Duration::from_secs(5),
heartbeat_ttl: Duration::from_secs(15),
leader_lease_ttl: Duration::from_secs(30),
leader_renewal_interval: Duration::from_secs(10),
failover_timeout: Duration::from_secs(45),
leader_lock_key: "beat_leader".to_string(),
heartbeat_key_prefix: "beat_heartbeat:".to_string(),
}
}
}
impl HeartbeatConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_heartbeat_interval(mut self, interval: Duration) -> Self {
self.heartbeat_interval = interval;
self
}
pub fn with_heartbeat_ttl(mut self, ttl: Duration) -> Self {
self.heartbeat_ttl = ttl;
self
}
pub fn with_leader_lease_ttl(mut self, ttl: Duration) -> Self {
self.leader_lease_ttl = ttl;
self
}
pub fn with_leader_renewal_interval(mut self, interval: Duration) -> Self {
self.leader_renewal_interval = interval;
self
}
pub fn with_failover_timeout(mut self, timeout: Duration) -> Self {
self.failover_timeout = timeout;
self
}
pub fn with_leader_lock_key(mut self, key: impl Into<String>) -> Self {
self.leader_lock_key = key.into();
self
}
pub fn with_heartbeat_key_prefix(mut self, prefix: impl Into<String>) -> Self {
self.heartbeat_key_prefix = prefix.into();
self
}
pub fn validate(&self) -> Result<(), String> {
if self.heartbeat_ttl <= self.heartbeat_interval {
return Err("heartbeat_ttl must be greater than heartbeat_interval".to_string());
}
if self.leader_lease_ttl <= self.leader_renewal_interval {
return Err(
"leader_lease_ttl must be greater than leader_renewal_interval".to_string(),
);
}
if self.failover_timeout <= self.leader_lease_ttl {
return Err("failover_timeout must be greater than leader_lease_ttl".to_string());
}
Ok(())
}
}
#[derive(Debug)]
pub struct HeartbeatStats {
heartbeats_sent: AtomicU64,
leader_elections_won: AtomicU64,
leader_elections_lost: AtomicU64,
lease_renewals: AtomicU64,
lease_renewal_failures: AtomicU64,
failovers_detected: AtomicU64,
}
impl HeartbeatStats {
pub fn new() -> Self {
Self {
heartbeats_sent: AtomicU64::new(0),
leader_elections_won: AtomicU64::new(0),
leader_elections_lost: AtomicU64::new(0),
lease_renewals: AtomicU64::new(0),
lease_renewal_failures: AtomicU64::new(0),
failovers_detected: AtomicU64::new(0),
}
}
pub fn heartbeats_sent(&self) -> u64 {
self.heartbeats_sent.load(Ordering::Relaxed)
}
pub fn leader_elections_won(&self) -> u64 {
self.leader_elections_won.load(Ordering::Relaxed)
}
pub fn leader_elections_lost(&self) -> u64 {
self.leader_elections_lost.load(Ordering::Relaxed)
}
pub fn lease_renewals(&self) -> u64 {
self.lease_renewals.load(Ordering::Relaxed)
}
pub fn lease_renewal_failures(&self) -> u64 {
self.lease_renewal_failures.load(Ordering::Relaxed)
}
pub fn failovers_detected(&self) -> u64 {
self.failovers_detected.load(Ordering::Relaxed)
}
pub fn record_heartbeat(&self) {
self.heartbeats_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn record_election_won(&self) {
self.leader_elections_won.fetch_add(1, Ordering::Relaxed);
}
pub fn record_election_lost(&self) {
self.leader_elections_lost.fetch_add(1, Ordering::Relaxed);
}
pub fn record_lease_renewal(&self) {
self.lease_renewals.fetch_add(1, Ordering::Relaxed);
}
pub fn record_lease_renewal_failure(&self) {
self.lease_renewal_failures.fetch_add(1, Ordering::Relaxed);
}
pub fn record_failover(&self) {
self.failovers_detected.fetch_add(1, Ordering::Relaxed);
}
}
impl Default for HeartbeatStats {
fn default() -> Self {
Self::new()
}
}
pub struct BeatHeartbeat {
config: HeartbeatConfig,
instance_id: String,
role: Arc<RwLock<BeatRole>>,
lock_backend: Arc<dyn DistributedLockBackend>,
heartbeat_info: Arc<RwLock<HeartbeatInfo>>,
stats: Arc<HeartbeatStats>,
running: Arc<AtomicBool>,
}
impl BeatHeartbeat {
pub fn new(
instance_id: String,
lock_backend: Arc<dyn DistributedLockBackend>,
config: HeartbeatConfig,
) -> Self {
let info = HeartbeatInfo::new(instance_id.clone());
Self {
config,
instance_id: instance_id.clone(),
role: Arc::new(RwLock::new(BeatRole::Unknown)),
lock_backend,
heartbeat_info: Arc::new(RwLock::new(info)),
stats: Arc::new(HeartbeatStats::new()),
running: Arc::new(AtomicBool::new(true)),
}
}
pub async fn try_become_leader(&self) -> celers_core::error::Result<bool> {
let acquired = self
.lock_backend
.try_acquire(
&self.config.leader_lock_key,
&self.instance_id,
self.config.leader_lease_ttl.as_secs(),
)
.await?;
if acquired {
let mut role = self.role.write().await;
*role = BeatRole::Leader;
let mut info = self.heartbeat_info.write().await;
info.role = BeatRole::Leader;
self.stats.record_election_won();
Ok(true)
} else {
let mut role = self.role.write().await;
if *role != BeatRole::Leader {
*role = BeatRole::Standby;
}
let mut info = self.heartbeat_info.write().await;
info.role = *role;
self.stats.record_election_lost();
Ok(false)
}
}
pub async fn renew_lease(&self) -> celers_core::error::Result<bool> {
let renewed = self
.lock_backend
.renew(
&self.config.leader_lock_key,
&self.instance_id,
self.config.leader_lease_ttl.as_secs(),
)
.await?;
if renewed {
self.stats.record_lease_renewal();
} else {
self.stats.record_lease_renewal_failure();
let mut role = self.role.write().await;
*role = BeatRole::Standby;
let mut info = self.heartbeat_info.write().await;
info.role = BeatRole::Standby;
}
Ok(renewed)
}
pub async fn check_leader_health(&self) -> celers_core::error::Result<bool> {
self.lock_backend
.is_locked(&self.config.leader_lock_key)
.await
}
pub async fn update_info(&self, schedule_count: usize, next_due_task: Option<String>) {
let current_role = *self.role.read().await;
let mut info = self.heartbeat_info.write().await;
info.last_heartbeat_at = Utc::now();
info.schedule_count = schedule_count;
info.next_due_task = next_due_task;
info.role = current_role;
self.stats.record_heartbeat();
}
pub async fn shutdown(&self) -> celers_core::error::Result<()> {
self.running.store(false, Ordering::SeqCst);
let role = *self.role.read().await;
if role == BeatRole::Leader {
self.lock_backend
.release(&self.config.leader_lock_key, &self.instance_id)
.await?;
}
let mut r = self.role.write().await;
*r = BeatRole::Unknown;
let mut info = self.heartbeat_info.write().await;
info.role = BeatRole::Unknown;
Ok(())
}
pub async fn role(&self) -> BeatRole {
*self.role.read().await
}
pub async fn is_leader(&self) -> bool {
*self.role.read().await == BeatRole::Leader
}
pub async fn is_standby(&self) -> bool {
*self.role.read().await == BeatRole::Standby
}
pub async fn info(&self) -> HeartbeatInfo {
self.heartbeat_info.read().await.clone()
}
pub fn config(&self) -> &HeartbeatConfig {
&self.config
}
pub fn instance_id(&self) -> &str {
&self.instance_id
}
pub fn stats(&self) -> &HeartbeatStats {
&self.stats
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub async fn tick(&self) -> celers_core::error::Result<()> {
let role = self.role().await;
match role {
BeatRole::Leader => {
if !self.renew_lease().await? {
self.try_become_leader().await?;
}
}
BeatRole::Standby | BeatRole::Unknown => {
let leader_alive = self.check_leader_health().await?;
if !leader_alive {
self.stats.record_failover();
self.try_become_leader().await?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lock::InMemoryLockBackend;
fn make_backend() -> Arc<InMemoryLockBackend> {
Arc::new(InMemoryLockBackend::new())
}
fn make_heartbeat(instance_id: &str, backend: Arc<InMemoryLockBackend>) -> BeatHeartbeat {
BeatHeartbeat::new(instance_id.to_string(), backend, HeartbeatConfig::new())
}
#[test]
fn test_heartbeat_config_defaults() {
let config = HeartbeatConfig::new();
assert_eq!(config.heartbeat_interval, Duration::from_secs(5));
assert_eq!(config.heartbeat_ttl, Duration::from_secs(15));
assert_eq!(config.leader_lease_ttl, Duration::from_secs(30));
assert_eq!(config.leader_renewal_interval, Duration::from_secs(10));
assert_eq!(config.failover_timeout, Duration::from_secs(45));
assert_eq!(config.leader_lock_key, "beat_leader");
assert_eq!(config.heartbeat_key_prefix, "beat_heartbeat:");
assert!(config.validate().is_ok());
}
#[test]
fn test_heartbeat_config_validation() {
let config = HeartbeatConfig::new().with_heartbeat_ttl(Duration::from_secs(3));
assert!(config.validate().is_err());
let config = HeartbeatConfig::new()
.with_leader_lease_ttl(Duration::from_secs(5))
.with_leader_renewal_interval(Duration::from_secs(10));
assert!(config.validate().is_err());
let config = HeartbeatConfig::new().with_failover_timeout(Duration::from_secs(20));
assert!(config.validate().is_err());
}
#[test]
fn test_beat_role_display() {
assert_eq!(BeatRole::Leader.to_string(), "leader");
assert_eq!(BeatRole::Standby.to_string(), "standby");
assert_eq!(BeatRole::Unknown.to_string(), "unknown");
}
#[test]
fn test_beat_role_default() {
assert_eq!(BeatRole::default(), BeatRole::Unknown);
}
#[test]
fn test_heartbeat_info_creation() {
let info = HeartbeatInfo::new("test-instance".to_string());
assert_eq!(info.instance_id, "test-instance");
assert_eq!(info.role, BeatRole::Unknown);
assert_eq!(info.schedule_count, 0);
assert!(info.next_due_task.is_none());
assert_eq!(info.version, env!("CARGO_PKG_VERSION"));
}
#[test]
fn test_heartbeat_info_builder() {
let info = HeartbeatInfo::new("inst".to_string())
.with_role(BeatRole::Leader)
.with_schedule_count(5)
.with_next_due_task(Some("my_task".to_string()));
assert_eq!(info.role, BeatRole::Leader);
assert_eq!(info.schedule_count, 5);
assert_eq!(info.next_due_task.as_deref(), Some("my_task"));
}
#[tokio::test]
async fn test_try_become_leader_success() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
let result = hb.try_become_leader().await;
assert!(result.is_ok());
assert!(result.is_ok_and(|v| v));
assert!(hb.is_leader().await);
assert_eq!(hb.stats().leader_elections_won(), 1);
}
#[tokio::test]
async fn test_try_become_leader_already_held() {
let backend = make_backend();
let hb1 = make_heartbeat("instance-1", backend.clone());
let hb2 = make_heartbeat("instance-2", backend);
let r1 = hb1.try_become_leader().await;
assert!(r1.is_ok_and(|v| v));
let r2 = hb2.try_become_leader().await;
assert!(r2.is_ok_and(|v| !v));
assert!(hb2.is_standby().await);
assert_eq!(hb2.stats().leader_elections_lost(), 1);
}
#[tokio::test]
async fn test_renew_lease() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
let _ = hb.try_become_leader().await;
assert!(hb.is_leader().await);
let renewed = hb.renew_lease().await;
assert!(renewed.is_ok_and(|v| v));
assert_eq!(hb.stats().lease_renewals(), 1);
assert!(hb.is_leader().await);
}
#[tokio::test]
async fn test_failover_on_leader_loss() {
let backend = make_backend();
let hb1 = make_heartbeat("instance-1", backend.clone());
let hb2 = make_heartbeat("instance-2", backend.clone());
let _ = hb1.try_become_leader().await;
assert!(hb1.is_leader().await);
let _ = hb2.try_become_leader().await;
assert!(hb2.is_standby().await);
let _ = hb1.shutdown().await;
let leader_alive = hb2.check_leader_health().await;
assert!(leader_alive.is_ok_and(|v| !v));
let promoted = hb2.try_become_leader().await;
assert!(promoted.is_ok_and(|v| v));
assert!(hb2.is_leader().await);
}
#[tokio::test]
async fn test_graceful_shutdown() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend.clone());
let _ = hb.try_become_leader().await;
assert!(hb.is_leader().await);
let result = hb.shutdown().await;
assert!(result.is_ok());
assert_eq!(hb.role().await, BeatRole::Unknown);
assert!(!hb.is_running());
let locked = backend.is_locked("beat_leader").await;
assert!(locked.is_ok_and(|v| !v));
}
#[tokio::test]
async fn test_tick_leader_renews() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
let _ = hb.try_become_leader().await;
assert!(hb.is_leader().await);
let result = hb.tick().await;
assert!(result.is_ok());
assert!(hb.is_leader().await);
assert_eq!(hb.stats().lease_renewals(), 1);
}
#[tokio::test]
async fn test_tick_standby_promotes() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
assert_eq!(hb.role().await, BeatRole::Unknown);
let result = hb.tick().await;
assert!(result.is_ok());
assert!(hb.is_leader().await);
assert_eq!(hb.stats().failovers_detected(), 1);
assert_eq!(hb.stats().leader_elections_won(), 1);
}
#[tokio::test]
async fn test_stats_tracking() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
assert_eq!(hb.stats().heartbeats_sent(), 0);
assert_eq!(hb.stats().leader_elections_won(), 0);
assert_eq!(hb.stats().leader_elections_lost(), 0);
assert_eq!(hb.stats().lease_renewals(), 0);
assert_eq!(hb.stats().lease_renewal_failures(), 0);
assert_eq!(hb.stats().failovers_detected(), 0);
let _ = hb.try_become_leader().await;
assert_eq!(hb.stats().leader_elections_won(), 1);
let _ = hb.renew_lease().await;
assert_eq!(hb.stats().lease_renewals(), 1);
hb.update_info(3, Some("task_a".to_string())).await;
assert_eq!(hb.stats().heartbeats_sent(), 1);
}
#[tokio::test]
async fn test_update_info() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
let _ = hb.try_become_leader().await;
hb.update_info(10, Some("daily_report".to_string())).await;
let info = hb.info().await;
assert_eq!(info.schedule_count, 10);
assert_eq!(info.next_due_task.as_deref(), Some("daily_report"));
assert_eq!(info.role, BeatRole::Leader);
assert_eq!(info.instance_id, "instance-1");
}
#[tokio::test]
async fn test_check_leader_health_no_leader() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
let alive = hb.check_leader_health().await;
assert!(alive.is_ok_and(|v| !v));
}
#[tokio::test]
async fn test_check_leader_health_with_leader() {
let backend = make_backend();
let hb1 = make_heartbeat("instance-1", backend.clone());
let hb2 = make_heartbeat("instance-2", backend);
let _ = hb1.try_become_leader().await;
let alive = hb2.check_leader_health().await;
assert!(alive.is_ok_and(|v| v));
}
#[tokio::test]
async fn test_config_builder_chain() {
let config = HeartbeatConfig::new()
.with_heartbeat_interval(Duration::from_secs(2))
.with_heartbeat_ttl(Duration::from_secs(10))
.with_leader_lease_ttl(Duration::from_secs(20))
.with_leader_renewal_interval(Duration::from_secs(8))
.with_failover_timeout(Duration::from_secs(30))
.with_leader_lock_key("custom_leader")
.with_heartbeat_key_prefix("custom:");
assert_eq!(config.heartbeat_interval, Duration::from_secs(2));
assert_eq!(config.heartbeat_ttl, Duration::from_secs(10));
assert_eq!(config.leader_lease_ttl, Duration::from_secs(20));
assert_eq!(config.leader_renewal_interval, Duration::from_secs(8));
assert_eq!(config.failover_timeout, Duration::from_secs(30));
assert_eq!(config.leader_lock_key, "custom_leader");
assert_eq!(config.heartbeat_key_prefix, "custom:");
assert!(config.validate().is_ok());
}
#[tokio::test]
async fn test_renew_lease_not_leader() {
let backend = make_backend();
let hb = make_heartbeat("instance-1", backend);
let renewed = hb.renew_lease().await;
assert!(renewed.is_ok_and(|v| !v));
assert_eq!(hb.stats().lease_renewal_failures(), 1);
}
}