use pg_walstream::RetryConfig;
use crate::error::{CdcError, Result};
use crate::types::DestinationType;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct Config {
pub source_connection_string: String,
pub destination_type: DestinationType,
pub destination_connection_string: String,
pub replication_slot_name: String,
pub publication_name: String,
pub protocol_version: u32,
pub binary_format: bool,
pub streaming: bool,
pub include_messages: bool,
pub two_phase: bool,
pub origin: OriginFilter,
pub connection_timeout: Duration,
pub query_timeout: Duration,
pub heartbeat_interval: Duration,
pub max_retry_attempts: u32,
pub initial_retry_delay: Duration,
pub max_retry_delay: Duration,
pub retry_multiplier: f64,
pub max_retry_duration: Duration,
pub retry_jitter: bool,
pub buffer_size: usize,
pub table_mappings: HashMap<String, TableMapping>,
pub schema_mappings: HashMap<String, String>,
pub batch_size: usize,
pub transaction_file_base_path: String,
pub transaction_segment_size_bytes: usize,
pub extra_options: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OriginFilter {
None,
Any,
Origin(String),
}
#[derive(Debug, Clone)]
pub struct TableMapping {
pub destination_schema: Option<String>,
pub destination_table: Option<String>,
pub column_mappings: HashMap<String, String>,
pub excluded_columns: Vec<String>,
pub transformations: Vec<ColumnTransformation>,
pub enabled: bool,
}
#[derive(Debug, Clone)]
pub struct ColumnTransformation {
pub source_column: String,
pub destination_column: String,
pub transformation: TransformationType,
}
#[derive(Debug, Clone)]
pub enum TransformationType {
Identity,
Uppercase,
Lowercase,
Function(String),
Static(serde_json::Value),
Null,
}
impl Default for Config {
fn default() -> Self {
Self {
source_connection_string: String::new(),
destination_type: DestinationType::MySQL,
destination_connection_string: String::new(),
replication_slot_name: "pg2any_slot".to_string(),
publication_name: "pg2any_pub".to_string(),
protocol_version: 1,
binary_format: false,
streaming: false,
include_messages: false,
two_phase: false,
origin: OriginFilter::None,
connection_timeout: Duration::from_secs(30),
query_timeout: Duration::from_secs(60),
heartbeat_interval: Duration::from_secs(10), max_retry_attempts: 5,
initial_retry_delay: Duration::from_secs(1),
max_retry_delay: Duration::from_secs(60),
retry_multiplier: 2.0,
max_retry_duration: Duration::from_secs(300), retry_jitter: true,
buffer_size: 1000,
table_mappings: HashMap::new(),
schema_mappings: HashMap::new(),
batch_size: 1000,
transaction_file_base_path: ".".to_string(),
transaction_segment_size_bytes: 64 * 1024 * 1024,
extra_options: HashMap::new(),
}
}
}
#[derive(Debug, Default)]
pub struct ConfigBuilder {
config: Config,
}
impl ConfigBuilder {
pub fn new() -> Self {
Self {
config: Config::default(),
}
}
pub fn source_connection_string<S: Into<String>>(mut self, connection_string: S) -> Self {
self.config.source_connection_string = connection_string.into();
self
}
pub fn destination_type(mut self, destination_type: DestinationType) -> Self {
self.config.destination_type = destination_type;
self
}
pub fn destination_connection_string<S: Into<String>>(mut self, connection_string: S) -> Self {
self.config.destination_connection_string = connection_string.into();
self
}
pub fn replication_slot_name<S: Into<String>>(mut self, slot_name: S) -> Self {
self.config.replication_slot_name = slot_name.into();
self
}
pub fn publication_name<S: Into<String>>(mut self, pub_name: S) -> Self {
self.config.publication_name = pub_name.into();
self
}
pub fn protocol_version(mut self, version: u32) -> Self {
self.config.protocol_version = version;
self
}
pub fn binary_format(mut self, enabled: bool) -> Self {
self.config.binary_format = enabled;
self
}
pub fn streaming(mut self, enabled: bool) -> Self {
self.config.streaming = enabled;
self
}
pub fn include_messages(mut self, enabled: bool) -> Self {
self.config.include_messages = enabled;
self
}
pub fn two_phase(mut self, enabled: bool) -> Self {
self.config.two_phase = enabled;
self
}
pub fn origin(mut self, origin: OriginFilter) -> Self {
self.config.origin = origin;
self
}
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.config.connection_timeout = timeout;
self
}
pub fn query_timeout(mut self, timeout: Duration) -> Self {
self.config.query_timeout = timeout;
self
}
pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
self.config.heartbeat_interval = interval;
self
}
pub fn max_retry_attempts(mut self, attempts: u32) -> Self {
self.config.max_retry_attempts = attempts;
self
}
pub fn initial_retry_delay(mut self, delay: Duration) -> Self {
self.config.initial_retry_delay = delay;
self
}
pub fn max_retry_delay(mut self, delay: Duration) -> Self {
self.config.max_retry_delay = delay;
self
}
pub fn retry_multiplier(mut self, multiplier: f64) -> Self {
self.config.retry_multiplier = multiplier;
self
}
pub fn max_retry_duration(mut self, duration: Duration) -> Self {
self.config.max_retry_duration = duration;
self
}
pub fn retry_jitter(mut self, enabled: bool) -> Self {
self.config.retry_jitter = enabled;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.config.batch_size = size.max(1);
self
}
pub fn table_mapping<S: Into<String>>(mut self, table_name: S, mapping: TableMapping) -> Self {
self.config
.table_mappings
.insert(table_name.into(), mapping);
self
}
pub fn schema_mapping<S1, S2>(mut self, source_schema: S1, destination_schema: S2) -> Self
where
S1: Into<String>,
S2: Into<String>,
{
self.config
.schema_mappings
.insert(source_schema.into(), destination_schema.into());
self
}
pub fn schema_mappings(mut self, mappings: HashMap<String, String>) -> Self {
self.config.schema_mappings = mappings;
self
}
pub fn transaction_file_base_path<S: Into<String>>(mut self, path: S) -> Self {
self.config.transaction_file_base_path = path.into();
self
}
pub fn transaction_segment_size_bytes(mut self, size: usize) -> Self {
self.config.transaction_segment_size_bytes = size;
self
}
pub fn extra_option<K, V>(mut self, key: K, value: V) -> Self
where
K: Into<String>,
V: Into<String>,
{
self.config.extra_options.insert(key.into(), value.into());
self
}
pub fn build(self) -> Result<Config> {
self.validate()?;
Ok(self.config)
}
fn validate(&self) -> Result<()> {
if self.config.source_connection_string.is_empty() {
return Err(CdcError::config("Source connection string is required"));
}
if self.config.destination_connection_string.is_empty() {
return Err(CdcError::config(
"Destination connection string is required",
));
}
if self.config.replication_slot_name.is_empty() {
return Err(CdcError::config("Replication slot name is required"));
}
if self.config.publication_name.is_empty() {
return Err(CdcError::config("Publication name is required"));
}
if !(1..=4).contains(&self.config.protocol_version) {
return Err(CdcError::config("Protocol version must be between 1 and 4"));
}
if self.config.streaming && self.config.protocol_version < 2 {
return Err(CdcError::config(
"Streaming requires protocol version 2 or higher",
));
}
if self.config.two_phase && self.config.protocol_version < 3 {
return Err(CdcError::config(
"Two-phase commit requires protocol version 3 or higher",
));
}
if self.config.buffer_size == 0 {
return Err(CdcError::config("Buffer size must be greater than 0"));
}
Ok(())
}
}
impl Config {
pub fn builder() -> ConfigBuilder {
ConfigBuilder::new()
}
}
impl Default for TableMapping {
fn default() -> Self {
Self {
destination_schema: None,
destination_table: None,
column_mappings: HashMap::new(),
excluded_columns: Vec::new(),
transformations: Vec::new(),
enabled: true,
}
}
}
impl TableMapping {
pub fn new() -> Self {
Self::default()
}
pub fn destination_schema<S: Into<String>>(mut self, schema: S) -> Self {
self.destination_schema = Some(schema.into());
self
}
pub fn destination_table<S: Into<String>>(mut self, table: S) -> Self {
self.destination_table = Some(table.into());
self
}
pub fn column_mapping<S1, S2>(mut self, source: S1, destination: S2) -> Self
where
S1: Into<String>,
S2: Into<String>,
{
self.column_mappings
.insert(source.into(), destination.into());
self
}
pub fn exclude_column<S: Into<String>>(mut self, column: S) -> Self {
self.excluded_columns.push(column.into());
self
}
pub fn transformation(mut self, transformation: ColumnTransformation) -> Self {
self.transformations.push(transformation);
self
}
pub fn enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
}
impl From<&Config> for pg_walstream::ReplicationStreamConfig {
fn from(config: &Config) -> Self {
let streaming_mode = if config.streaming {
pg_walstream::StreamingMode::On
} else {
pg_walstream::StreamingMode::Off
};
pg_walstream::ReplicationStreamConfig::new(
config.replication_slot_name.clone(),
config.publication_name.clone(),
config.protocol_version,
streaming_mode,
config.heartbeat_interval,
config.connection_timeout,
Duration::from_secs(30), RetryConfig::default(),
)
}
}