use tokio_util::sync::CancellationToken;
use tracing::info;
#[cfg(feature = "metrics")]
use crate::MetricsServer;
use crate::{client::CdcClient, config::Config, types::Lsn, CdcResult};
#[derive(Debug, Clone)]
pub struct CdcAppConfig {
pub cdc_config: Config,
pub metrics_port: Option<u16>,
pub version: String,
}
impl CdcAppConfig {
pub fn new(cdc_config: Config) -> Self {
Self {
cdc_config,
metrics_port: None,
version: "unknown".to_string(),
}
}
pub fn with_metrics_port(&mut self, port: u16) {
self.metrics_port = Some(port);
}
pub fn with_version(&mut self, version: &str) {
self.version = version.to_string();
}
}
pub struct CdcApp {
client: CdcClient,
config: CdcAppConfig,
start_lsn: Option<Lsn>,
}
impl CdcApp {
pub async fn new(config: CdcAppConfig, lsn_file_path: Option<&str>) -> CdcResult<Self> {
info!("Initializing CDC client");
let (client, start_lsn) = CdcClient::new(config.cdc_config.clone(), lsn_file_path).await?;
Ok(Self {
client,
config,
start_lsn,
})
}
pub async fn from_config(cdc_config: Config, lsn_file_path: Option<&str>) -> CdcResult<Self> {
let app_config = CdcAppConfig::new(cdc_config);
Self::new(app_config, lsn_file_path).await
}
pub async fn run(&mut self) -> CdcResult<()> {
self.client.init_build_info(&self.config.version);
info!("Starting CDC replication stream");
#[cfg(feature = "metrics")]
{
if let Some(port) = self.config.metrics_port {
info!("Starting metrics server on port {}", port);
let server = crate::create_metrics_server(port);
return self
.run_with_optional_server(self.start_lsn, Some(server))
.await;
}
}
self.run_with_optional_server(self.start_lsn, None).await
}
async fn run_with_optional_server(
&mut self,
start_lsn: Option<Lsn>,
#[cfg(feature = "metrics")] server: Option<MetricsServer>,
#[cfg(not(feature = "metrics"))] _: Option<()>,
) -> CdcResult<()> {
#[cfg(feature = "metrics")]
if let Some(server) = server {
let _ = tokio::spawn(async move { server.start().await });
} else {
tracing::warn!(
"Metrics server not started (metrics feature enabled but no port configured)"
);
}
let shutdown_handler = setup_shutdown_handler(self.client.cancellation_token());
tokio::select! {
result = self.client.start_replication_from_lsn(start_lsn) => {
match result {
Ok(()) => {
info!("CDC replication completed successfully");
Ok(())
}
Err(e) => {
tracing::error!("CDC replication failed: {}", e);
Err(e)
}
}
}
_ = shutdown_handler => {
info!("Shutdown signal received, stopping CDC replication gracefully");
self.client.stop().await?;
info!("CDC replication stopped successfully");
Ok(())
}
}
}
}
pub async fn run_cdc_app(config: Config, lsn_file_path: Option<&str>) -> CdcResult<()> {
let app_config = CdcAppConfig {
cdc_config: config,
metrics_port: None,
version: "unknown".to_string(),
};
let mut app = CdcApp::new(app_config, lsn_file_path).await?;
app.run().await
}
async fn setup_shutdown_handler(shutdown_token: CancellationToken) {
use tokio::signal;
#[cfg(unix)]
{
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler");
let mut sigint = signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Failed to install SIGINT handler");
tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM, initiating graceful shutdown");
shutdown_token.cancel();
}
_ = sigint.recv() => {
info!("Received SIGINT (Ctrl+C), initiating graceful shutdown");
shutdown_token.cancel();
}
}
}
#[cfg(windows)]
{
signal::ctrl_c().await.expect("Failed to listen for ctrl-c");
info!("Received Ctrl+C, initiating graceful shutdown");
shutdown_token.cancel();
}
}