use crate::{CdcError, Config, DestinationType};
use std::collections::HashMap;
use std::time::Duration;
pub fn load_config_from_env() -> Result<Config, CdcError> {
tracing::info!("Loading configuration from environment variables");
let source_connection_string = std::env::var("CDC_SOURCE_CONNECTION_STRING").expect(
"CDC_SOURCE_CONNECTION_STRING environment variable is required. Example: postgresql://user:password@host:port/dbname?replication=database",
);
let dest_type_str = std::env::var("CDC_DEST_TYPE").unwrap_or_else(|_| "MySQL".to_string());
let dest_type = match dest_type_str.as_str() {
"MySQL" | "mysql" => DestinationType::MySQL,
"SqlServer" | "sqlserver" => DestinationType::SqlServer,
"SQLite" | "sqlite" => DestinationType::SQLite,
"Kafka" | "kafka" => DestinationType::Kafka,
_ => {
tracing::warn!(
"Unknown destination type '{}', defaulting to MySQL",
dest_type_str
);
DestinationType::MySQL
}
};
let destination_connection_string = std::env::var("CDC_DEST_URI").expect(
"CDC_DEST_URI environment variable is required. Example for MySQL mysql://replicator:pass.123@127.0.0.1:3306/publif or ./cdc_target.db for SQLite ..etc",
);
let schema_mappings = parse_schema_mapping_env("CDC_SCHEMA_MAPPING")?;
if !schema_mappings.is_empty() {
tracing::info!("Schema mappings configured: {:?}", schema_mappings);
}
let replication_slot =
std::env::var("CDC_REPLICATION_SLOT").unwrap_or_else(|_| "cdc_slot".to_string());
let publication = std::env::var("CDC_PUBLICATION").unwrap_or_else(|_| "cdc_pub".to_string());
let protocol_version = parse_u32_env("CDC_PROTOCOL_VERSION", 1)?;
let binary_format = parse_bool_env("CDC_BINARY_FORMAT", false)?;
let streaming = parse_bool_env("CDC_STREAMING", true)?;
let connection_timeout_secs = parse_u64_env("CDC_CONNECTION_TIMEOUT", 30)?;
let query_timeout_secs = parse_u64_env("CDC_QUERY_TIMEOUT", 10)?;
let buffer_size =
parse_usize_env_with_fallback("CDC_CHANNEL_CAPACITY", "CDC_BUFFER_SIZE", 1000)?;
let batch_size =
parse_usize_env_with_fallback("CDC_BATCH_SIZE", "CDC_COMMIT_BATCH_SIZE", 1000)?;
let segment_size_mb = parse_usize_env("CDC_TRANSACTION_SEGMENT_SIZE_MB", 64)?;
let segment_size_bytes = segment_size_mb.saturating_mul(1024 * 1024);
let bulk_insert_threshold = parse_usize_env("CDC_BULK_INSERT_THRESHOLD", 500)?;
let max_rows_per_insert = parse_usize_env("CDC_MAX_ROWS_PER_INSERT", 0)?;
let transaction_file_base_path =
std::env::var("CDC_TRANSACTION_FILE_BASE_PATH").unwrap_or_else(|_| ".".to_string());
tracing::info!(
"CDC Config - Slot: {}, Publication: {}, Protocol: {}, Streaming: {}, Binary: {}",
replication_slot,
publication,
protocol_version,
streaming,
binary_format
);
tracing::info!(
"Timeouts - Connection: {}s, Query: {}s",
connection_timeout_secs,
query_timeout_secs
);
tracing::info!(
"Performance - Transaction Channel Size: {}, Batch Size: {}",
buffer_size,
batch_size
);
tracing::info!(
"Transaction file segment size: {} MB ({} bytes)",
segment_size_mb,
segment_size_bytes
);
tracing::info!(
"Transaction file persistence at: {}",
transaction_file_base_path
);
let config = Config::builder()
.source_connection_string(source_connection_string)
.destination_type(dest_type)
.destination_connection_string(destination_connection_string)
.replication_slot_name(replication_slot)
.publication_name(publication)
.protocol_version(protocol_version)
.binary_format(binary_format)
.streaming(streaming)
.connection_timeout(Duration::from_secs(connection_timeout_secs))
.query_timeout(Duration::from_secs(query_timeout_secs))
.schema_mappings(schema_mappings)
.buffer_size(buffer_size)
.batch_size(batch_size)
.transaction_file_base_path(transaction_file_base_path)
.transaction_segment_size_bytes(segment_size_bytes)
.bulk_insert_threshold(bulk_insert_threshold)
.max_rows_per_insert(max_rows_per_insert)
.build()?;
tracing::info!("Configuration loaded successfully");
Ok(config)
}
fn parse_schema_mapping_env(key: &str) -> Result<HashMap<String, String>, CdcError> {
match std::env::var(key) {
Ok(value) if !value.is_empty() => {
let mut mappings = HashMap::new();
for pair in value.split(',') {
let pair = pair.trim();
if pair.is_empty() {
continue;
}
let parts: Vec<&str> = pair.splitn(2, ':').collect();
if parts.len() != 2 {
return Err(CdcError::config(format!(
"Invalid schema mapping format '{pair}'. Expected 'source:dest' format."
)));
}
let source = parts[0].trim();
let dest = parts[1].trim();
if source.is_empty() || dest.is_empty() {
return Err(CdcError::config(format!(
"Invalid schema mapping '{pair}'. Both source and destination must be non-empty."
)));
}
mappings.insert(source.to_string(), dest.to_string());
}
Ok(mappings)
}
_ => Ok(HashMap::new()),
}
}
fn parse_bool_env(key: &str, default: bool) -> Result<bool, CdcError> {
match std::env::var(key) {
Ok(value) => value.parse::<bool>().map_err(|e| {
CdcError::config(format!("Invalid boolean value for {key}: {value} ({e})"))
}),
Err(_) => Ok(default),
}
}
fn parse_u32_env(key: &str, default: u32) -> Result<u32, CdcError> {
match std::env::var(key) {
Ok(value) => value
.parse::<u32>()
.map_err(|e| CdcError::config(format!("Invalid u32 value for {key}: {value} ({e})"))),
Err(_) => Ok(default),
}
}
fn parse_u64_env(key: &str, default: u64) -> Result<u64, CdcError> {
match std::env::var(key) {
Ok(value) => value
.parse::<u64>()
.map_err(|e| CdcError::config(format!("Invalid u64 value for {key}: {value} ({e})"))),
Err(_) => Ok(default),
}
}
fn parse_usize_env(key: &str, default: usize) -> Result<usize, CdcError> {
match std::env::var(key) {
Ok(value) => value
.parse::<usize>()
.map_err(|e| CdcError::config(format!("Invalid usize value for {key}: {value} ({e})"))),
Err(_) => Ok(default),
}
}
fn parse_usize_env_with_fallback(
new_key: &str,
old_key: &str,
default: usize,
) -> Result<usize, CdcError> {
if let Ok(value) = std::env::var(new_key) {
return value.parse::<usize>().map_err(|e| {
CdcError::config(format!("Invalid usize value for {new_key}: {value} ({e})"))
});
}
if let Ok(value) = std::env::var(old_key) {
tracing::warn!("Environment variable {old_key} is deprecated, use {new_key} instead");
return value.parse::<usize>().map_err(|e| {
CdcError::config(format!("Invalid usize value for {old_key}: {value} ({e})"))
});
}
Ok(default)
}