use thiserror::Error;
#[derive(Error, Debug)]
pub enum CdcError {
#[error("Replication error: {0}")]
Replication(#[from] pg_walstream::ReplicationError),
#[cfg(feature = "sqlserver")]
#[error("SQL Server connection error: {0}")]
SqlServer(#[from] tiberius::error::Error),
#[cfg(feature = "mysql")]
#[error("MySQL connection error: {0}")]
MySQL(#[from] sqlx::Error),
#[error("Configuration error: {0}")]
Config(String),
#[error("Replication slot error: {0}")]
ReplicationSlot(String),
#[error("Publication error: {0}")]
Publication(String),
#[error("Protocol parsing error: {0}")]
Protocol(String),
#[error("Buffer error: {0}")]
Buffer(String),
#[error("Message processing error: {0}")]
MessageProcessing(String),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("String conversion error: {0}")]
StringConversion(#[from] std::ffi::NulError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("CDC error: {0}")]
Generic(String),
#[error("Operation timed out: {0}")]
Timeout(String),
#[error("Authentication failed: {0}")]
Authentication(String),
#[error("Unsupported operation: {0}")]
Unsupported(String),
#[error("Transient connection error: {0}")]
TransientConnection(String),
#[error("Permanent connection error: {0}")]
PermanentConnection(String),
#[error("Replication connection error: {0}")]
ReplicationConnection(String),
#[error("Operation was cancelled: {0}")]
Cancelled(String),
}
impl CdcError {
pub fn config<S: Into<String>>(msg: S) -> Self {
CdcError::Config(msg.into())
}
pub fn replication_slot<S: Into<String>>(msg: S) -> Self {
CdcError::ReplicationSlot(msg.into())
}
pub fn publication<S: Into<String>>(msg: S) -> Self {
CdcError::Publication(msg.into())
}
pub fn protocol<S: Into<String>>(msg: S) -> Self {
CdcError::Protocol(msg.into())
}
pub fn buffer<S: Into<String>>(msg: S) -> Self {
CdcError::Buffer(msg.into())
}
pub fn connection<S: Into<String>>(msg: S) -> Self {
CdcError::Generic(format!("Connection error: {}", msg.into()))
}
pub fn message_processing<S: Into<String>>(msg: S) -> Self {
CdcError::MessageProcessing(msg.into())
}
pub fn generic<S: Into<String>>(msg: S) -> Self {
CdcError::Generic(msg.into())
}
pub fn timeout<S: Into<String>>(msg: S) -> Self {
CdcError::Timeout(msg.into())
}
pub fn authentication<S: Into<String>>(msg: S) -> Self {
CdcError::Authentication(msg.into())
}
pub fn unsupported<S: Into<String>>(msg: S) -> Self {
CdcError::Unsupported(msg.into())
}
pub fn transient_connection<S: Into<String>>(msg: S) -> Self {
CdcError::TransientConnection(msg.into())
}
pub fn permanent_connection<S: Into<String>>(msg: S) -> Self {
CdcError::PermanentConnection(msg.into())
}
pub fn replication_connection<S: Into<String>>(msg: S) -> Self {
CdcError::ReplicationConnection(msg.into())
}
pub fn cancelled<S: Into<String>>(msg: S) -> Self {
CdcError::Cancelled(msg.into())
}
pub fn is_transient(&self) -> bool {
matches!(
self,
CdcError::TransientConnection(_)
| CdcError::Timeout(_)
| CdcError::Io(_)
| CdcError::ReplicationConnection(_)
)
}
pub fn is_permanent(&self) -> bool {
matches!(
self,
CdcError::PermanentConnection(_)
| CdcError::Authentication(_)
| CdcError::Config(_)
| CdcError::Unsupported(_)
)
}
pub fn is_cancelled(&self) -> bool {
matches!(self, CdcError::Cancelled(_))
}
}
pub type Result<T> = std::result::Result<T, CdcError>;