use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct HeartbeatConfig {
pub interval: Duration,
pub topic: Option<String>,
pub max_lag: Duration,
pub emit_events: bool,
pub action_prefix: String,
pub action_query: Option<String>,
pub action_query_databases: Vec<String>,
}
impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(10),
topic: None,
max_lag: Duration::from_secs(300), emit_events: false,
action_prefix: "__debezium-heartbeat".to_string(),
action_query: None,
action_query_databases: Vec::new(),
}
}
}
impl HeartbeatConfig {
pub fn builder() -> HeartbeatConfigBuilder {
HeartbeatConfigBuilder::default()
}
}
#[derive(Default)]
pub struct HeartbeatConfigBuilder {
config: HeartbeatConfig,
}
impl HeartbeatConfigBuilder {
pub fn interval(mut self, interval: Duration) -> Self {
self.config.interval = interval;
self
}
pub fn topic(mut self, topic: impl Into<String>) -> Self {
self.config.topic = Some(topic.into());
self
}
pub fn max_lag(mut self, max_lag: Duration) -> Self {
self.config.max_lag = max_lag;
self
}
pub fn emit_events(mut self, enabled: bool) -> Self {
self.config.emit_events = enabled;
self
}
pub fn action_prefix(mut self, prefix: impl Into<String>) -> Self {
self.config.action_prefix = prefix.into();
self
}
pub fn action_query(mut self, query: impl Into<String>) -> Self {
self.config.action_query = Some(query.into());
self
}
pub fn action_query_databases<I, S>(mut self, databases: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.config.action_query_databases = databases.into_iter().map(|s| s.into()).collect();
self
}
pub fn build(self) -> HeartbeatConfig {
self.config
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct HeartbeatEvent {
pub timestamp: i64,
pub source: String,
pub position: String,
pub connector: String,
pub sequence: u64,
}
impl HeartbeatEvent {
pub fn new(source: &str, position: &str, connector: &str, sequence: u64) -> Self {
Self {
timestamp: chrono::Utc::now().timestamp_millis(),
source: source.to_string(),
position: position.to_string(),
connector: connector.to_string(),
sequence,
}
}
}
#[derive(Debug, Default)]
pub struct HeartbeatStats {
heartbeats_sent: AtomicU64,
last_heartbeat_ts: AtomicI64,
last_position_ts: AtomicI64,
current_lag_ms: AtomicI64,
missed_heartbeats: AtomicU64,
is_healthy: AtomicBool,
action_queries_success: AtomicU64,
action_queries_failed: AtomicU64,
last_action_query_ms: AtomicU64,
}
impl HeartbeatStats {
pub fn record_heartbeat(&self) {
self.heartbeats_sent.fetch_add(1, Ordering::Relaxed);
self.last_heartbeat_ts
.store(chrono::Utc::now().timestamp_millis(), Ordering::Relaxed);
}
pub fn record_position_update(&self, lag_ms: i64) {
self.last_position_ts
.store(chrono::Utc::now().timestamp_millis(), Ordering::Relaxed);
self.current_lag_ms.store(lag_ms, Ordering::Relaxed);
}
pub fn record_missed(&self) {
self.missed_heartbeats.fetch_add(1, Ordering::Relaxed);
}
pub fn set_healthy(&self, healthy: bool) {
self.is_healthy.store(healthy, Ordering::Relaxed);
}
pub fn heartbeats_sent(&self) -> u64 {
self.heartbeats_sent.load(Ordering::Relaxed)
}
pub fn last_heartbeat_ts(&self) -> i64 {
self.last_heartbeat_ts.load(Ordering::Relaxed)
}
pub fn current_lag_ms(&self) -> i64 {
self.current_lag_ms.load(Ordering::Relaxed)
}
pub fn missed_heartbeats(&self) -> u64 {
self.missed_heartbeats.load(Ordering::Relaxed)
}
pub fn is_healthy(&self) -> bool {
self.is_healthy.load(Ordering::Relaxed)
}
pub fn record_action_query_success(&self, execution_time_ms: u64) {
self.action_queries_success.fetch_add(1, Ordering::Relaxed);
self.last_action_query_ms
.store(execution_time_ms, Ordering::Relaxed);
}
pub fn record_action_query_failure(&self) {
self.action_queries_failed.fetch_add(1, Ordering::Relaxed);
}
pub fn action_queries_success(&self) -> u64 {
self.action_queries_success.load(Ordering::Relaxed)
}
pub fn action_queries_failed(&self) -> u64 {
self.action_queries_failed.load(Ordering::Relaxed)
}
pub fn last_action_query_ms(&self) -> u64 {
self.last_action_query_ms.load(Ordering::Relaxed)
}
}
#[derive(Debug, Clone, Default)]
pub struct PositionInfo {
pub position: String,
pub server_id: String,
pub timestamp: i64,
}
#[derive(Debug, Clone)]
pub struct ActionQueryResult {
pub database: String,
pub success: bool,
pub execution_time_ms: u64,
pub error: Option<String>,
}
#[async_trait::async_trait]
pub trait ActionQueryExecutor: Send + Sync {
async fn execute_action_query(&self, query: &str, database: &str) -> ActionQueryResult;
}
#[derive(Debug, Default, Clone)]
pub struct NoOpActionExecutor;
#[async_trait::async_trait]
impl ActionQueryExecutor for NoOpActionExecutor {
async fn execute_action_query(&self, _query: &str, database: &str) -> ActionQueryResult {
ActionQueryResult {
database: database.to_string(),
success: true,
execution_time_ms: 0,
error: None,
}
}
}
pub struct Heartbeat {
config: HeartbeatConfig,
stats: Arc<HeartbeatStats>,
position: RwLock<PositionInfo>,
connector_name: String,
running: AtomicBool,
sequence: AtomicU64,
started_at: RwLock<Option<Instant>>,
action_executor: Option<Arc<dyn ActionQueryExecutor>>,
}
impl Heartbeat {
pub fn new(config: HeartbeatConfig, connector_name: impl Into<String>) -> Self {
Self {
config,
stats: Arc::new(HeartbeatStats::default()),
position: RwLock::new(PositionInfo::default()),
connector_name: connector_name.into(),
running: AtomicBool::new(false),
sequence: AtomicU64::new(0),
started_at: RwLock::new(None),
action_executor: None,
}
}
pub fn with_executor<E>(
config: HeartbeatConfig,
connector_name: impl Into<String>,
executor: E,
) -> Self
where
E: ActionQueryExecutor + 'static,
{
Self {
config,
stats: Arc::new(HeartbeatStats::default()),
position: RwLock::new(PositionInfo::default()),
connector_name: connector_name.into(),
running: AtomicBool::new(false),
sequence: AtomicU64::new(0),
started_at: RwLock::new(None),
action_executor: Some(Arc::new(executor)),
}
}
pub fn set_action_executor<E>(&mut self, executor: E)
where
E: ActionQueryExecutor + 'static,
{
self.action_executor = Some(Arc::new(executor));
}
pub fn action_query(&self) -> Option<&str> {
self.config.action_query.as_deref()
}
pub fn action_query_databases(&self) -> &[String] {
&self.config.action_query_databases
}
pub async fn execute_action_query(&self) -> Vec<ActionQueryResult> {
let query = match &self.config.action_query {
Some(q) => q,
None => return Vec::new(),
};
let executor = match &self.action_executor {
Some(e) => e,
None => {
debug!("Action query configured but no executor set, skipping");
return Vec::new();
}
};
let mut results = Vec::new();
let result = executor.execute_action_query(query, "").await;
if result.success {
self.stats
.record_action_query_success(result.execution_time_ms);
debug!(
"Action query executed successfully in {}ms",
result.execution_time_ms
);
} else {
self.stats.record_action_query_failure();
warn!(
"Action query failed: {}",
result.error.as_deref().unwrap_or("unknown error")
);
}
results.push(result);
for db in &self.config.action_query_databases {
let result = executor.execute_action_query(query, db).await;
if result.success {
self.stats
.record_action_query_success(result.execution_time_ms);
debug!(
"Action query executed on '{}' in {}ms",
db, result.execution_time_ms
);
} else {
self.stats.record_action_query_failure();
warn!(
"Action query failed on '{}': {}",
db,
result.error.as_deref().unwrap_or("unknown error")
);
}
results.push(result);
}
results
}
pub fn stats(&self) -> &Arc<HeartbeatStats> {
&self.stats
}
pub fn is_healthy(&self) -> bool {
self.stats.is_healthy()
}
pub fn lag(&self) -> Duration {
Duration::from_millis(self.stats.current_lag_ms() as u64)
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub async fn update_position(&self, position: &str, server_id: &str) {
let now = chrono::Utc::now().timestamp_millis();
let mut pos = self.position.write().await;
let lag_ms = if pos.timestamp > 0 {
now - pos.timestamp
} else {
0
};
pos.position = position.to_string();
pos.server_id = server_id.to_string();
pos.timestamp = now;
drop(pos);
self.stats.record_position_update(lag_ms);
let healthy = lag_ms < self.config.max_lag.as_millis() as i64;
self.stats.set_healthy(healthy);
if !healthy {
warn!(
"CDC lag exceeds threshold: {}ms > {}ms",
lag_ms,
self.config.max_lag.as_millis()
);
}
debug!("Position updated: {}, lag: {}ms", position, lag_ms);
}
pub async fn beat(&self) -> HeartbeatEvent {
let pos = self.position.read().await;
let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
self.stats.record_heartbeat();
self.stats.set_healthy(true);
let event = HeartbeatEvent::new(&pos.server_id, &pos.position, &self.connector_name, seq);
info!(
"Heartbeat #{}: position={}, connector={}",
seq, pos.position, self.connector_name
);
event
}
pub async fn start(&self) -> tokio::sync::mpsc::Receiver<HeartbeatEvent> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
self.running.store(true, Ordering::Relaxed);
*self.started_at.write().await = Some(Instant::now());
self.stats.set_healthy(true);
let interval = self.config.interval;
let emit_events = self.config.emit_events;
let stats = self.stats.clone();
let connector_name = self.connector_name.clone();
let position = PositionInfo::default();
let position_arc = Arc::new(RwLock::new(position));
let stats_clone = stats.clone();
let position_clone = position_arc.clone();
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
let sequence = AtomicU64::new(0);
loop {
interval_timer.tick().await;
if !running_clone.load(Ordering::Relaxed) {
break;
}
let pos = position_clone.read().await;
let seq = sequence.fetch_add(1, Ordering::Relaxed);
stats_clone.record_heartbeat();
let event =
HeartbeatEvent::new(&pos.server_id, &pos.position, &connector_name, seq);
drop(pos);
if emit_events && tx.send(event).await.is_err() {
break;
}
}
});
rx
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
info!("Heartbeat stopped for connector: {}", self.connector_name);
}
pub fn topic(&self) -> Option<&str> {
self.config.topic.as_deref()
}
pub async fn uptime(&self) -> Option<Duration> {
self.started_at.read().await.map(|s| s.elapsed())
}
pub fn check_health(&self) -> bool {
let last_ts = self.stats.last_heartbeat_ts();
if last_ts == 0 {
return true; }
let now = chrono::Utc::now().timestamp_millis();
let since_last = Duration::from_millis((now - last_ts) as u64);
let healthy = since_last < self.config.interval * 3;
self.stats.set_healthy(healthy);
if !healthy {
self.stats.record_missed();
warn!(
"Heartbeat missed: last was {:?} ago (threshold: {:?})",
since_last,
self.config.interval * 3
);
}
healthy
}
}
pub trait HeartbeatCallback: Send + Sync {
fn send_heartbeat(&self) -> impl std::future::Future<Output = Result<(), String>> + Send;
fn get_position(&self) -> impl std::future::Future<Output = String> + Send;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_heartbeat_config_default() {
let config = HeartbeatConfig::default();
assert_eq!(config.interval, Duration::from_secs(10));
assert_eq!(config.max_lag, Duration::from_secs(300));
assert!(config.topic.is_none());
assert!(!config.emit_events);
}
#[test]
fn test_heartbeat_config_builder() {
let config = HeartbeatConfig::builder()
.interval(Duration::from_secs(5))
.topic("heartbeat-topic")
.max_lag(Duration::from_secs(60))
.emit_events(true)
.action_prefix("custom-prefix")
.build();
assert_eq!(config.interval, Duration::from_secs(5));
assert_eq!(config.topic, Some("heartbeat-topic".to_string()));
assert_eq!(config.max_lag, Duration::from_secs(60));
assert!(config.emit_events);
assert_eq!(config.action_prefix, "custom-prefix");
}
#[test]
fn test_heartbeat_event() {
let event = HeartbeatEvent::new("pg-server", "0/16B3748", "my-connector", 1);
assert_eq!(event.source, "pg-server");
assert_eq!(event.position, "0/16B3748");
assert_eq!(event.connector, "my-connector");
assert_eq!(event.sequence, 1);
assert!(event.timestamp > 0);
}
#[test]
fn test_heartbeat_stats() {
let stats = HeartbeatStats::default();
assert_eq!(stats.heartbeats_sent(), 0);
assert!(!stats.is_healthy());
stats.set_healthy(true);
assert!(stats.is_healthy());
stats.record_heartbeat();
assert_eq!(stats.heartbeats_sent(), 1);
assert!(stats.last_heartbeat_ts() > 0);
stats.record_position_update(100);
assert_eq!(stats.current_lag_ms(), 100);
stats.record_missed();
assert_eq!(stats.missed_heartbeats(), 1);
}
#[tokio::test]
async fn test_heartbeat_creation() {
let config = HeartbeatConfig::default();
let heartbeat = Heartbeat::new(config, "test-connector");
assert!(!heartbeat.is_running());
assert!(!heartbeat.is_healthy());
assert_eq!(heartbeat.lag(), Duration::ZERO);
}
#[tokio::test]
async fn test_heartbeat_position_update() {
let config = HeartbeatConfig::builder()
.max_lag(Duration::from_secs(60))
.build();
let heartbeat = Heartbeat::new(config, "test-connector");
heartbeat.update_position("0/16B3748", "pg-server-1").await;
assert!(heartbeat.is_healthy());
let pos = heartbeat.position.read().await;
assert_eq!(pos.position, "0/16B3748");
assert_eq!(pos.server_id, "pg-server-1");
}
#[tokio::test]
async fn test_heartbeat_beat() {
let config = HeartbeatConfig::default();
let heartbeat = Heartbeat::new(config, "test-connector");
heartbeat.update_position("0/1234", "server-1").await;
let event = heartbeat.beat().await;
assert_eq!(event.connector, "test-connector");
assert_eq!(event.position, "0/1234");
assert_eq!(event.source, "server-1");
assert_eq!(event.sequence, 0);
assert_eq!(heartbeat.stats().heartbeats_sent(), 1);
}
#[tokio::test]
async fn test_heartbeat_health_check() {
let config = HeartbeatConfig::builder()
.interval(Duration::from_millis(10))
.build();
let heartbeat = Heartbeat::new(config, "test-connector");
assert!(heartbeat.check_health());
heartbeat.beat().await;
assert!(heartbeat.check_health());
assert!(heartbeat.is_healthy());
}
#[tokio::test]
async fn test_heartbeat_lag_detection() {
let config = HeartbeatConfig::builder()
.max_lag(Duration::from_millis(50))
.build();
let heartbeat = Heartbeat::new(config, "test-connector");
heartbeat.update_position("0/1000", "server").await;
tokio::time::sleep(Duration::from_millis(10)).await;
heartbeat.update_position("0/2000", "server").await;
let lag = heartbeat.stats().current_lag_ms();
assert!(lag > 0);
assert!(lag < 100); }
#[tokio::test]
async fn test_heartbeat_stop() {
let config = HeartbeatConfig::default();
let heartbeat = Heartbeat::new(config, "test-connector");
heartbeat.running.store(true, Ordering::Relaxed);
assert!(heartbeat.is_running());
heartbeat.stop();
assert!(!heartbeat.is_running());
}
#[tokio::test]
async fn test_heartbeat_topic() {
let config = HeartbeatConfig::builder()
.topic("my-heartbeat-topic")
.build();
let heartbeat = Heartbeat::new(config, "connector");
assert_eq!(heartbeat.topic(), Some("my-heartbeat-topic"));
let config_no_topic = HeartbeatConfig::default();
let heartbeat_no_topic = Heartbeat::new(config_no_topic, "connector");
assert_eq!(heartbeat_no_topic.topic(), None);
}
#[tokio::test]
async fn test_heartbeat_event_serialization() {
let event = HeartbeatEvent::new("pg", "0/ABCD", "conn", 42);
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("\"source\":\"pg\""));
assert!(json.contains("\"position\":\"0/ABCD\""));
assert!(json.contains("\"connector\":\"conn\""));
assert!(json.contains("\"sequence\":42"));
let parsed: HeartbeatEvent = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.source, "pg");
assert_eq!(parsed.sequence, 42);
}
#[tokio::test]
async fn test_multiple_position_updates() {
let config = HeartbeatConfig::default();
let heartbeat = Heartbeat::new(config, "test");
for i in 0..5 {
heartbeat
.update_position(&format!("0/{}", i * 1000), "server")
.await;
tokio::time::sleep(Duration::from_millis(5)).await;
}
let pos = heartbeat.position.read().await;
assert_eq!(pos.position, "0/4000");
}
#[test]
fn test_position_info_default() {
let pos = PositionInfo::default();
assert!(pos.position.is_empty());
assert!(pos.server_id.is_empty());
assert_eq!(pos.timestamp, 0);
}
#[test]
fn test_action_query_config() {
let config = HeartbeatConfig::builder()
.action_query("INSERT INTO heartbeat (ts) VALUES (now())")
.action_query_databases(["db1", "db2"])
.build();
assert_eq!(
config.action_query,
Some("INSERT INTO heartbeat (ts) VALUES (now())".to_string())
);
assert_eq!(config.action_query_databases, vec!["db1", "db2"]);
}
#[test]
fn test_action_query_stats() {
let stats = HeartbeatStats::default();
assert_eq!(stats.action_queries_success(), 0);
assert_eq!(stats.action_queries_failed(), 0);
assert_eq!(stats.last_action_query_ms(), 0);
stats.record_action_query_success(50);
assert_eq!(stats.action_queries_success(), 1);
assert_eq!(stats.last_action_query_ms(), 50);
stats.record_action_query_failure();
assert_eq!(stats.action_queries_failed(), 1);
assert_eq!(stats.action_queries_success(), 1);
}
#[tokio::test]
async fn test_action_query_no_executor() {
let config = HeartbeatConfig::builder().action_query("SELECT 1").build();
let heartbeat = Heartbeat::new(config, "test-connector");
let results = heartbeat.execute_action_query().await;
assert!(results.is_empty());
}
#[tokio::test]
async fn test_action_query_no_query_configured() {
let config = HeartbeatConfig::default(); let heartbeat = Heartbeat::new(config, "test-connector");
let results = heartbeat.execute_action_query().await;
assert!(results.is_empty());
}
struct MockActionExecutor {
execution_count: AtomicU64,
should_fail: bool,
}
impl MockActionExecutor {
fn new(should_fail: bool) -> Self {
Self {
execution_count: AtomicU64::new(0),
should_fail,
}
}
}
#[async_trait::async_trait]
impl ActionQueryExecutor for MockActionExecutor {
async fn execute_action_query(&self, _query: &str, database: &str) -> ActionQueryResult {
self.execution_count.fetch_add(1, Ordering::Relaxed);
if self.should_fail {
ActionQueryResult {
database: database.to_string(),
success: false,
execution_time_ms: 5,
error: Some("Mock failure".to_string()),
}
} else {
ActionQueryResult {
database: database.to_string(),
success: true,
execution_time_ms: 10,
error: None,
}
}
}
}
#[tokio::test]
async fn test_action_query_with_executor() {
let config = HeartbeatConfig::builder().action_query("SELECT 1").build();
let executor = MockActionExecutor::new(false);
let heartbeat = Heartbeat::with_executor(config, "test-connector", executor);
let results = heartbeat.execute_action_query().await;
assert_eq!(results.len(), 1); assert!(results[0].success);
assert_eq!(results[0].execution_time_ms, 10);
assert!(results[0].error.is_none());
assert_eq!(heartbeat.stats().action_queries_success(), 1);
assert_eq!(heartbeat.stats().action_queries_failed(), 0);
}
#[tokio::test]
async fn test_action_query_multiple_databases() {
let config = HeartbeatConfig::builder()
.action_query("SELECT 1")
.action_query_databases(["inventory", "analytics"])
.build();
let executor = MockActionExecutor::new(false);
let heartbeat = Heartbeat::with_executor(config, "test-connector", executor);
let results = heartbeat.execute_action_query().await;
assert_eq!(results.len(), 3);
assert!(results.iter().all(|r| r.success));
assert_eq!(results[0].database, ""); assert_eq!(results[1].database, "inventory");
assert_eq!(results[2].database, "analytics");
assert_eq!(heartbeat.stats().action_queries_success(), 3);
}
#[tokio::test]
async fn test_action_query_failure_tracking() {
let config = HeartbeatConfig::builder().action_query("SELECT 1").build();
let executor = MockActionExecutor::new(true); let heartbeat = Heartbeat::with_executor(config, "test-connector", executor);
let results = heartbeat.execute_action_query().await;
assert_eq!(results.len(), 1);
assert!(!results[0].success);
assert_eq!(results[0].error, Some("Mock failure".to_string()));
assert_eq!(heartbeat.stats().action_queries_success(), 0);
assert_eq!(heartbeat.stats().action_queries_failed(), 1);
}
#[test]
fn test_action_query_result_debug() {
let result = ActionQueryResult {
database: "mydb".to_string(),
success: true,
execution_time_ms: 42,
error: None,
};
let debug = format!("{:?}", result);
assert!(debug.contains("mydb"));
assert!(debug.contains("42"));
}
#[tokio::test]
async fn test_noop_action_executor() {
let executor = NoOpActionExecutor;
let result = executor.execute_action_query("SELECT 1", "test_db").await;
assert!(result.success);
assert_eq!(result.database, "test_db");
assert_eq!(result.execution_time_ms, 0);
assert!(result.error.is_none());
}
#[test]
fn test_heartbeat_action_query_accessors() {
let config = HeartbeatConfig::builder()
.action_query("SELECT 1")
.action_query_databases(["db1", "db2"])
.build();
let heartbeat = Heartbeat::new(config, "test");
assert_eq!(heartbeat.action_query(), Some("SELECT 1"));
assert_eq!(heartbeat.action_query_databases(), &["db1", "db2"]);
let config2 = HeartbeatConfig::default();
let heartbeat2 = Heartbeat::new(config2, "test2");
assert!(heartbeat2.action_query().is_none());
assert!(heartbeat2.action_query_databases().is_empty());
}
}