use crate::{
error::{CdcError, Result},
types::DestinationType,
};
use async_trait::async_trait;
use std::{collections::HashMap, future::Future, pin::Pin};
pub type PreCommitHook =
Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send>;
#[cfg(feature = "mysql")]
use super::mysql::MySQLDestination;
#[cfg(feature = "sqlserver")]
use super::sqlserver::SqlServerDestination;
#[cfg(feature = "sqlite")]
use super::sqlite::SQLiteDestination;
#[async_trait]
pub trait DestinationHandler: Send + Sync {
async fn connect(&mut self, connection_string: &str) -> Result<()>;
fn set_schema_mappings(&mut self, mappings: HashMap<String, String>);
async fn execute_sql_batch_with_hook(
&mut self,
commands: &[String],
pre_commit_hook: Option<PreCommitHook>,
) -> Result<()>;
async fn close(&mut self) -> Result<()>;
}
pub struct DestinationFactory;
impl DestinationFactory {
pub fn create(destination_type: &DestinationType) -> Result<Box<dyn DestinationHandler>> {
match *destination_type {
#[cfg(feature = "mysql")]
DestinationType::MySQL => Ok(Box::new(MySQLDestination::new())),
#[cfg(feature = "sqlserver")]
DestinationType::SqlServer => Ok(Box::new(SqlServerDestination::new())),
#[cfg(feature = "sqlite")]
DestinationType::SQLite => Ok(Box::new(SQLiteDestination::new())),
#[allow(unreachable_patterns)]
_ => Err(CdcError::unsupported(format!(
"Destination type {destination_type:?} is not supported or not enabled"
))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_destination_factory_create() {
#[cfg(feature = "mysql")]
{
let result = DestinationFactory::create(&DestinationType::MySQL);
assert!(result.is_ok());
}
#[cfg(feature = "sqlserver")]
{
let result = DestinationFactory::create(&DestinationType::SqlServer);
assert!(result.is_ok());
}
#[cfg(feature = "sqlite")]
{
let result = DestinationFactory::create(&DestinationType::SQLite);
assert!(result.is_ok());
}
}
#[test]
fn test_destination_types_serialization() {
use serde_json;
let mysql_type = DestinationType::MySQL;
let json = serde_json::to_string(&mysql_type).unwrap();
assert_eq!(json, "\"MySQL\"");
let deserialized: DestinationType = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, mysql_type);
let sqlite_type = DestinationType::SQLite;
let sqlite_json = serde_json::to_string(&sqlite_type).unwrap();
assert_eq!(sqlite_json, "\"SQLite\"");
let sqlite_deserialized: DestinationType = serde_json::from_str(&sqlite_json).unwrap();
assert_eq!(sqlite_deserialized, sqlite_type);
}
}