use crate::CdcResult;
pub trait MetricsCollectorTrait: Send + Sync {
fn new() -> Self
where
Self: Sized;
fn record_event(&self, event: &crate::types::ChangeEvent);
fn record_processing_duration(
&self,
duration: std::time::Duration,
event_type: &str,
destination_type: &str,
);
fn record_error(&self, error_type: &str, component: &str);
fn update_source_connection_status(&self, connected: bool);
fn update_destination_connection_status(&self, destination_type: &str, connected: bool);
fn update_active_connections(&self, count: usize, connection_type: &str);
fn update_consumer_queue_length(&self, length: usize);
fn update_uptime(&self);
fn update_events_rate(&self);
fn record_received_lsn(&self, lsn: u64);
fn get_metrics(&self) -> CdcResult<String>;
fn init_build_info(&self, version: &str);
fn record_transaction_processed(
&self,
transaction: &crate::types::Transaction,
destination_type: &str,
);
fn record_full_transaction_processed(
&self,
transaction: &crate::types::Transaction,
destination_type: &str,
);
}
pub trait ProcessingTimerTrait {
fn start(event_type: &str, destination_type: &str) -> Self
where
Self: Sized;
fn finish(self, collector: &dyn MetricsCollectorTrait);
}
#[cfg(feature = "metrics")]
pub use real_metrics::*;
#[cfg(feature = "metrics")]
mod real_metrics {
use super::*;
use crate::monitoring::metrics::*; use crate::types::EventType;
use std::borrow::Cow;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tracing::{debug, warn};
#[derive(Debug)]
pub struct MetricsCollector {
start_time: Instant,
last_event_time_nanos: AtomicU64, events_in_window: AtomicU64,
window_start_nanos: AtomicU64,
window_duration: Duration,
}
impl MetricsCollector {
#[inline]
fn instant_to_nanos(&self, instant: Instant) -> u64 {
instant.duration_since(self.start_time).as_nanos() as u64
}
#[inline]
fn now_nanos(&self) -> u64 {
self.instant_to_nanos(Instant::now())
}
}
impl MetricsCollectorTrait for MetricsCollector {
fn new() -> Self {
let now = Instant::now();
Self {
start_time: now,
last_event_time_nanos: AtomicU64::new(0), events_in_window: AtomicU64::new(0),
window_start_nanos: AtomicU64::new(0), window_duration: Duration::from_secs(60), }
}
fn record_event(&self, event: &crate::types::ChangeEvent) {
let now = Instant::now();
let now_nanos = self.instant_to_nanos(now);
EVENTS_PROCESSED_TOTAL.inc();
let event_type = event.event_type_str();
let table_name: Cow<'_, str> = match &event.event_type {
EventType::Insert { table, .. }
| EventType::Update { table, .. }
| EventType::Delete { table, .. } => Cow::Borrowed(table.as_ref()),
EventType::Truncate(tables) => Cow::Owned(
tables
.iter()
.map(|t| t.as_ref())
.collect::<Vec<&str>>()
.join(","),
),
_ => Cow::Borrowed("unknown"),
};
EVENTS_BY_TYPE
.with_label_values(&[event_type, &table_name])
.inc();
LAST_PROCESSED_LSN.set(event.lsn.value() as f64);
self.events_in_window.fetch_add(1, Ordering::Relaxed);
self.last_event_time_nanos
.store(now_nanos, Ordering::Relaxed);
}
fn record_processing_duration(
&self,
duration: std::time::Duration,
event_type: &str,
destination_type: &str,
) {
EVENT_PROCESSING_DURATION
.with_label_values(&[event_type, destination_type])
.observe(duration.as_secs_f64());
}
fn record_error(&self, error_type: &str, component: &str) {
ERRORS_TOTAL
.with_label_values(&[error_type, component])
.inc();
warn!(
"Error recorded: type={}, component={}",
error_type, component
);
}
fn update_source_connection_status(&self, connected: bool) {
SOURCE_CONNECTION_STATUS.set(if connected { 1.0 } else { 0.0 });
}
fn update_destination_connection_status(&self, destination_type: &str, connected: bool) {
DESTINATION_CONNECTION_STATUS
.with_label_values(&[destination_type])
.set(if connected { 1.0 } else { 0.0 });
}
fn update_active_connections(&self, count: usize, connection_type: &str) {
ACTIVE_CONNECTIONS
.with_label_values(&[connection_type])
.set(count as f64);
}
fn update_consumer_queue_length(&self, length: usize) {
CONSUMER_QUEUE_SIZE.set(length as f64);
debug!("Updated consumer queue length: {}", length);
}
fn update_uptime(&self) {
let uptime = self.start_time.elapsed().as_secs() as f64;
UPTIME_SECONDS.set(uptime);
}
fn update_events_rate(&self) {
let now_nanos = self.now_nanos();
let window_start = self.window_start_nanos.load(Ordering::Relaxed);
if window_start == 0 {
self.window_start_nanos.store(now_nanos, Ordering::Relaxed);
return;
}
let window_duration_nanos = self.window_duration.as_nanos() as u64;
if now_nanos.saturating_sub(window_start) >= window_duration_nanos {
let events = self.events_in_window.swap(0, Ordering::Relaxed);
let rate = events as f64 / self.window_duration.as_secs() as f64;
EVENTS_RATE.set(rate);
self.window_start_nanos.store(now_nanos, Ordering::Relaxed);
debug!("Updated events rate: {} events/sec", rate);
}
}
fn record_received_lsn(&self, lsn: u64) {
CURRENT_RECEIVED_LSN.set(lsn as f64);
}
fn get_metrics(&self) -> CdcResult<String> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::default_registry().gather();
let mut buffer = Vec::new();
encoder
.encode(&metric_families, &mut buffer)
.map_err(|e| crate::CdcError::generic(e.to_string()))?;
String::from_utf8(buffer).map_err(|e| crate::CdcError::generic(e.to_string()))
}
fn init_build_info(&self, version: &str) {
BUILD_INFO.with_label_values(&[version]).set(1.0);
}
fn record_transaction_processed(
&self,
transaction: &crate::types::Transaction,
destination_type: &str,
) {
TRANSACTIONS_PROCESSED_TOTAL.inc();
let event_count = transaction.event_count();
self.events_in_window
.fetch_add(event_count as u64, Ordering::Relaxed);
let now_nanos = self.now_nanos();
self.last_event_time_nanos
.store(now_nanos, Ordering::Relaxed);
if let Some(lsn) = transaction.commit_lsn {
LAST_PROCESSED_LSN.set(lsn.0 as f64);
}
debug!(
"Recorded transaction processed: transaction_id={:?}, events={}, destination={}",
transaction.transaction_id, event_count, destination_type
);
}
fn record_full_transaction_processed(
&self,
transaction: &crate::types::Transaction,
destination_type: &str,
) {
FULL_TRANSACTIONS_PROCESSED_TOTAL.inc();
debug!(
"Recorded full transaction processed: transaction_id={:?}, events={}, destination={}",
transaction.transaction_id, transaction.event_count(), destination_type
);
}
}
pub struct ProcessingTimer {
start_time: std::time::Instant,
event_type: String,
destination_type: String,
}
impl ProcessingTimerTrait for ProcessingTimer {
fn start(event_type: &str, destination_type: &str) -> Self {
Self {
start_time: std::time::Instant::now(),
event_type: event_type.to_string(),
destination_type: destination_type.to_string(),
}
}
fn finish(self, collector: &dyn MetricsCollectorTrait) {
let duration = self.start_time.elapsed();
collector.record_processing_duration(
duration,
&self.event_type,
&self.destination_type,
);
}
}
}
#[cfg(not(feature = "metrics"))]
pub use noop_metrics::*;
#[cfg(not(feature = "metrics"))]
mod noop_metrics {
use super::*;
#[derive(Debug)]
pub struct MetricsCollector;
impl MetricsCollectorTrait for MetricsCollector {
fn new() -> Self {
Self
}
fn record_event(&self, _event: &crate::types::ChangeEvent) {}
fn record_processing_duration(
&self,
_duration: std::time::Duration,
_event_type: &str,
_destination_type: &str,
) {
}
fn record_error(&self, _error_type: &str, _component: &str) {}
fn update_source_connection_status(&self, _connected: bool) {}
fn update_destination_connection_status(&self, _destination_type: &str, _connected: bool) {}
fn update_active_connections(&self, _count: usize, _connection_type: &str) {}
fn update_consumer_queue_length(&self, _size: usize) {}
fn update_uptime(&self) {}
fn update_events_rate(&self) {}
fn record_received_lsn(&self, _lsn: u64) {}
fn get_metrics(&self) -> CdcResult<String> {
Ok("# Metrics not available - metrics feature disabled\n".to_string())
}
fn init_build_info(&self, _version: &str) {}
fn record_transaction_processed(
&self,
_transaction: &crate::types::Transaction,
_destination_type: &str,
) {
}
fn record_full_transaction_processed(
&self,
_transaction: &crate::types::Transaction,
_destination_type: &str,
) {
}
}
pub struct ProcessingTimer {
_phantom: std::marker::PhantomData<()>,
}
impl ProcessingTimerTrait for ProcessingTimer {
fn start(_event_type: &str, _destination_type: &str) -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
fn finish(self, _collector: &dyn MetricsCollectorTrait) {}
}
}
#[cfg(feature = "metrics")]
pub fn init_metrics() -> CdcResult<()> {
crate::monitoring::metrics::init_metrics().map_err(|e| crate::CdcError::generic(e.to_string()))
}
#[cfg(not(feature = "metrics"))]
pub fn init_metrics() -> CdcResult<()> {
tracing::debug!("Metrics feature disabled - skipping metrics initialization");
Ok(())
}
#[cfg(feature = "metrics")]
pub fn gather_metrics() -> CdcResult<String> {
crate::monitoring::metrics::gather_metrics()
.map_err(|e| crate::CdcError::generic(e.to_string()))
}
#[cfg(not(feature = "metrics"))]
pub fn gather_metrics() -> CdcResult<String> {
Ok("# Metrics not available - metrics feature disabled\n".to_string())
}