use crate::{CdcError, Config, DestinationType};
use std::collections::HashMap;
use std::time::Duration;
pub fn load_config_from_env() -> Result<Config, CdcError> {
tracing::info!("Loading configuration from environment variables");
let source_connection_string = std::env::var("CDC_SOURCE_CONNECTION_STRING").expect(
"CDC_SOURCE_CONNECTION_STRING environment variable is required. Example: postgresql://user:password@host:port/dbname?replication=database",
);
let dest_type_str = std::env::var("CDC_DEST_TYPE").unwrap_or_else(|_| "MySQL".to_string());
let dest_type = match dest_type_str.as_str() {
"MySQL" | "mysql" => DestinationType::MySQL,
"SqlServer" | "sqlserver" => DestinationType::SqlServer,
"SQLite" | "sqlite" => DestinationType::SQLite,
_ => {
tracing::warn!(
"Unknown destination type '{}', defaulting to MySQL",
dest_type_str
);
DestinationType::MySQL
}
};
let destination_connection_string = std::env::var("CDC_DEST_URI").expect(
"CDC_DEST_URI environment variable is required. Example for MySQL mysql://replicator:pass.123@127.0.0.1:3306/publif or ./cdc_target.db for SQLite ..etc",
);
let schema_mappings = parse_schema_mapping_env("CDC_SCHEMA_MAPPING")?;
if !schema_mappings.is_empty() {
tracing::info!("Schema mappings configured: {:?}", schema_mappings);
}
let replication_slot =
std::env::var("CDC_REPLICATION_SLOT").unwrap_or_else(|_| "cdc_slot".to_string());
let publication = std::env::var("CDC_PUBLICATION").unwrap_or_else(|_| "cdc_pub".to_string());
let protocol_version = parse_u32_env("CDC_PROTOCOL_VERSION", 1)?;
let binary_format = parse_bool_env("CDC_BINARY_FORMAT", false)?;
let streaming = parse_bool_env("CDC_STREAMING", true)?;
let connection_timeout_secs = parse_u64_env("CDC_CONNECTION_TIMEOUT", 30)?;
let query_timeout_secs = parse_u64_env("CDC_QUERY_TIMEOUT", 10)?;
let buffer_size = parse_usize_env("CDC_BUFFER_SIZE", 1000)?;
let batch_size = parse_usize_env("CDC_COMMIT_BATCH_SIZE", 1000)?;
let segment_size_mb = parse_usize_env("CDC_TRANSACTION_SEGMENT_SIZE_MB", 64)?;
let segment_size_bytes = segment_size_mb.saturating_mul(1024 * 1024);
let transaction_file_base_path =
std::env::var("CDC_TRANSACTION_FILE_BASE_PATH").unwrap_or_else(|_| ".".to_string());
tracing::info!(
"CDC Config - Slot: {}, Publication: {}, Protocol: {}, Streaming: {}, Binary: {}",
replication_slot,
publication,
protocol_version,
streaming,
binary_format
);
tracing::info!(
"Timeouts - Connection: {}s, Query: {}s",
connection_timeout_secs,
query_timeout_secs
);
tracing::info!(
"Performance - Transaction Channel Size: {}, Batch Size: {}",
buffer_size,
batch_size
);
tracing::info!(
"Transaction file segment size: {} MB ({} bytes)",
segment_size_mb,
segment_size_bytes
);
tracing::info!(
"Transaction file persistence at: {}",
transaction_file_base_path
);
let config = Config::builder()
.source_connection_string(source_connection_string)
.destination_type(dest_type)
.destination_connection_string(destination_connection_string)
.replication_slot_name(replication_slot)
.publication_name(publication)
.protocol_version(protocol_version)
.binary_format(binary_format)
.streaming(streaming)
.connection_timeout(Duration::from_secs(connection_timeout_secs))
.query_timeout(Duration::from_secs(query_timeout_secs))
.schema_mappings(schema_mappings)
.buffer_size(buffer_size)
.batch_size(batch_size)
.transaction_file_base_path(transaction_file_base_path)
.transaction_segment_size_bytes(segment_size_bytes)
.build()?;
tracing::info!("Configuration loaded successfully");
Ok(config)
}
fn parse_schema_mapping_env(key: &str) -> Result<HashMap<String, String>, CdcError> {
match std::env::var(key) {
Ok(value) if !value.is_empty() => {
let mut mappings = HashMap::new();
for pair in value.split(',') {
let pair = pair.trim();
if pair.is_empty() {
continue;
}
let parts: Vec<&str> = pair.splitn(2, ':').collect();
if parts.len() != 2 {
return Err(CdcError::config(format!(
"Invalid schema mapping format '{pair}'. Expected 'source:dest' format."
)));
}
let source = parts[0].trim();
let dest = parts[1].trim();
if source.is_empty() || dest.is_empty() {
return Err(CdcError::config(format!(
"Invalid schema mapping '{pair}'. Both source and destination must be non-empty."
)));
}
mappings.insert(source.to_string(), dest.to_string());
}
Ok(mappings)
}
_ => Ok(HashMap::new()),
}
}
fn parse_bool_env(key: &str, default: bool) -> Result<bool, CdcError> {
match std::env::var(key) {
Ok(value) => value.parse::<bool>().map_err(|e| {
CdcError::config(format!("Invalid boolean value for {key}: {value} ({e})"))
}),
Err(_) => Ok(default),
}
}
fn parse_u32_env(key: &str, default: u32) -> Result<u32, CdcError> {
match std::env::var(key) {
Ok(value) => value
.parse::<u32>()
.map_err(|e| CdcError::config(format!("Invalid u32 value for {key}: {value} ({e})"))),
Err(_) => Ok(default),
}
}
fn parse_u64_env(key: &str, default: u64) -> Result<u64, CdcError> {
match std::env::var(key) {
Ok(value) => value
.parse::<u64>()
.map_err(|e| CdcError::config(format!("Invalid u64 value for {key}: {value} ({e})"))),
Err(_) => Ok(default),
}
}
fn parse_usize_env(key: &str, default: usize) -> Result<usize, CdcError> {
match std::env::var(key) {
Ok(value) => value
.parse::<usize>()
.map_err(|e| CdcError::config(format!("Invalid usize value for {key}: {value} ({e})"))),
Err(_) => Ok(default),
}
}