use lazy_static::lazy_static;
use prometheus::{
register_counter, register_counter_vec, register_gauge, register_gauge_vec,
register_histogram_vec, Counter, CounterVec, Encoder, Gauge, GaugeVec, HistogramVec, Registry,
};
use tracing::debug;
lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
pub static ref EVENTS_PROCESSED_TOTAL: Counter = register_counter!(
"pg2any_events_processed_total",
"Total number of CDC events processed"
).expect("metric can be created");
pub static ref EVENTS_BY_TYPE: CounterVec = register_counter_vec!(
"pg2any_events_by_type_total",
"Number of events processed by type",
&["event_type", "table_name"]
).expect("metric can be created");
pub static ref EVENTS_RATE: Gauge = register_gauge!(
"pg2any_events_per_second",
"Current rate of events being processed per second"
).expect("metric can be created");
pub static ref CONSUMER_QUEUE_SIZE: Gauge = register_gauge!(
"pg2any_consumer_queue_length",
"Number of pending events in the consumer thread queue"
).expect("metric can be created");
pub static ref LAST_PROCESSED_LSN: Gauge = register_gauge!(
"pg2any_last_processed_lsn",
"Last processed LSN from PostgreSQL WAL"
).expect("metric can be created");
pub static ref CURRENT_RECEIVED_LSN: Gauge = register_gauge!(
"pg2any_current_received_lsn",
"Current LSN received from PostgreSQL replication stream"
).expect("metric can be created");
pub static ref ERRORS_TOTAL: CounterVec = register_counter_vec!(
"pg2any_errors_total",
"Total number of errors by type",
&["error_type", "component"]
).expect("metric can be created");
pub static ref SOURCE_CONNECTION_STATUS: Gauge = register_gauge!(
"pg2any_source_connection_status",
"Connection status to source PostgreSQL database"
).expect("metric can be created");
pub static ref DESTINATION_CONNECTION_STATUS: GaugeVec = register_gauge_vec!(
"pg2any_destination_connection_status",
"Connection status to destination database",
&["destination_type"]
).expect("metric can be created");
pub static ref EVENT_PROCESSING_DURATION: HistogramVec = register_histogram_vec!(
"pg2any_event_processing_duration_seconds",
"Time taken to process CDC events",
&["event_type", "destination_type"]
).expect("metric can be created");
pub static ref ACTIVE_CONNECTIONS: GaugeVec = register_gauge_vec!(
"pg2any_active_connections",
"Number of active database connections",
&["connection_type"] ).expect("metric can be created");
pub static ref UPTIME_SECONDS: Gauge = register_gauge!(
"pg2any_uptime_seconds",
"Application uptime in seconds"
).expect("metric can be created");
pub static ref BUILD_INFO: GaugeVec = register_gauge_vec!(
"pg2any_build_info",
"Build information",
&["version"]
).expect("metric can be created");
pub static ref TRANSACTIONS_PROCESSED_TOTAL: Counter = register_counter!(
"pg2any_transactions_processed_total",
"Total number of transactions successfully processed"
).expect("metric can be created");
pub static ref FULL_TRANSACTIONS_PROCESSED_TOTAL: Counter = register_counter!(
"pg2any_full_transactions_processed_total",
"Total number of complete transactions (final batches) successfully processed"
).expect("metric can be created");
}
pub fn init_metrics() -> Result<(), Box<dyn std::error::Error>> {
REGISTRY
.register(Box::new(EVENTS_PROCESSED_TOTAL.clone()))
.map_err(|e| format!("Failed to register EVENTS_PROCESSED_TOTAL: {e}"))?;
REGISTRY
.register(Box::new(EVENTS_BY_TYPE.clone()))
.map_err(|e| format!("Failed to register EVENTS_BY_TYPE: {e}"))?;
REGISTRY
.register(Box::new(EVENTS_RATE.clone()))
.map_err(|e| format!("Failed to register EVENTS_RATE: {e}"))?;
REGISTRY
.register(Box::new(CONSUMER_QUEUE_SIZE.clone()))
.map_err(|e| format!("Failed to register CONSUMER_QUEUE_SIZE: {e}"))?;
REGISTRY
.register(Box::new(LAST_PROCESSED_LSN.clone()))
.map_err(|e| format!("Failed to register LAST_PROCESSED_LSN: {e}"))?;
REGISTRY
.register(Box::new(CURRENT_RECEIVED_LSN.clone()))
.map_err(|e| format!("Failed to register CURRENT_RECEIVED_LSN: {e}"))?;
REGISTRY
.register(Box::new(ERRORS_TOTAL.clone()))
.map_err(|e| format!("Failed to register ERRORS_TOTAL: {e}"))?;
REGISTRY
.register(Box::new(SOURCE_CONNECTION_STATUS.clone()))
.map_err(|e| format!("Failed to register SOURCE_CONNECTION_STATUS: {e}"))?;
REGISTRY
.register(Box::new(DESTINATION_CONNECTION_STATUS.clone()))
.map_err(|e| format!("Failed to register DESTINATION_CONNECTION_STATUS: {e}"))?;
REGISTRY
.register(Box::new(EVENT_PROCESSING_DURATION.clone()))
.map_err(|e| format!("Failed to register EVENT_PROCESSING_DURATION: {e}"))?;
REGISTRY
.register(Box::new(ACTIVE_CONNECTIONS.clone()))
.map_err(|e| format!("Failed to register ACTIVE_CONNECTIONS: {e}"))?;
REGISTRY
.register(Box::new(UPTIME_SECONDS.clone()))
.map_err(|e| format!("Failed to register UPTIME_SECONDS: {e}"))?;
REGISTRY
.register(Box::new(BUILD_INFO.clone()))
.map_err(|e| format!("Failed to register BUILD_INFO: {e}"))?;
REGISTRY
.register(Box::new(TRANSACTIONS_PROCESSED_TOTAL.clone()))
.map_err(|e| format!("Failed to register TRANSACTIONS_PROCESSED_TOTAL: {e}"))?;
REGISTRY
.register(Box::new(FULL_TRANSACTIONS_PROCESSED_TOTAL.clone()))
.map_err(|e| format!("Failed to register FULL_TRANSACTIONS_PROCESSED_TOTAL: {e}"))?;
debug!("All metrics registered successfully");
Ok(())
}
pub fn gather_metrics() -> Result<String, Box<dyn std::error::Error>> {
let metric_families = REGISTRY.gather();
let encoder = prometheus::TextEncoder::new();
let mut output = Vec::new();
encoder.encode(&metric_families, &mut output)?;
Ok(String::from_utf8(output)?)
}