use pg2any_lib::{
destinations::{mysql::MySQLDestination, sqlserver::SqlServerDestination, DestinationFactory},
types::{ChangeEvent, EventType, ReplicaIdentity},
DestinationType,
};
use pg_walstream::{ColumnValue, Lsn, RowData};
use std::sync::Arc;
#[tokio::test]
async fn test_destination_handler_interface() {
let _events = vec![
create_test_event(),
create_update_event(),
create_delete_event(),
];
#[cfg(feature = "mysql")]
{
let mut destination = DestinationFactory::create(&DestinationType::MySQL).unwrap();
let empty_batch_result = destination.execute_sql_batch_with_hook(&[], None).await;
assert!(empty_batch_result.is_ok());
let sql_batch_result = destination
.execute_sql_batch_with_hook(&["INSERT INTO test (id) VALUES (1);".to_string()], None)
.await;
assert!(sql_batch_result.is_err());
let close_result = destination.close().await;
assert!(close_result.is_ok());
}
#[cfg(feature = "sqlserver")]
{
let mut destination = DestinationFactory::create(&DestinationType::SqlServer).unwrap();
let empty_batch_result = destination.execute_sql_batch_with_hook(&[], None).await;
assert!(empty_batch_result.is_ok());
let sql_batch_result = destination
.execute_sql_batch_with_hook(&["INSERT INTO test (id) VALUES (1);".to_string()], None)
.await;
assert!(sql_batch_result.is_err());
let close_result = destination.close().await;
assert!(close_result.is_ok());
}
}
#[test]
fn test_destination_type_serialization() {
use serde_json;
let mysql_type = DestinationType::MySQL;
let sqlserver_type = DestinationType::SqlServer;
let sqlite_type = DestinationType::SQLite;
let mysql_json = serde_json::to_string(&mysql_type).unwrap();
let sqlserver_json = serde_json::to_string(&sqlserver_type).unwrap();
let sqlite_json = serde_json::to_string(&sqlite_type).unwrap();
assert_eq!(mysql_json, "\"MySQL\"");
assert_eq!(sqlserver_json, "\"SqlServer\"");
assert_eq!(sqlite_json, "\"SQLite\"");
let deserialized_mysql: DestinationType = serde_json::from_str(&mysql_json).unwrap();
let deserialized_sqlserver: DestinationType = serde_json::from_str(&sqlserver_json).unwrap();
assert_eq!(deserialized_mysql, mysql_type);
assert_eq!(deserialized_sqlserver, sqlserver_type);
}
#[test]
fn test_unsupported_destination_types() {
#[cfg(feature = "sqlite")]
{
let sqlite_result = DestinationFactory::create(&DestinationType::SQLite);
assert!(sqlite_result.is_ok());
}
#[cfg(not(feature = "sqlite"))]
{
let sqlite_result = DestinationFactory::create(&DestinationType::SQLite);
assert!(sqlite_result.is_err());
}
}
fn create_test_event() -> ChangeEvent {
let data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("test")),
("active", ColumnValue::text("t")),
]);
ChangeEvent::insert("public", "test_table", 456, data, Lsn::from(100))
}
fn create_update_event() -> ChangeEvent {
let old_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("old_name")),
]);
let new_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("new_name")),
]);
ChangeEvent::update(
"public",
"test_table",
456,
Some(old_data),
new_data,
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::from(300),
)
}
fn create_delete_event() -> ChangeEvent {
let old_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("deleted_name")),
]);
ChangeEvent::delete(
"public",
"test_table",
456,
old_data,
ReplicaIdentity::Default,
vec![Arc::from("id")],
Lsn::from(200),
)
}
fn create_update_event_without_old_data() -> ChangeEvent {
let new_data = RowData::from_pairs(vec![
("id", ColumnValue::text("1")),
("name", ColumnValue::text("new_name")),
]);
ChangeEvent::update(
"public",
"test_table",
456,
None, new_data,
ReplicaIdentity::Nothing,
vec![Arc::from("id")], Lsn::from(300),
)
}
#[test]
fn test_mysql_destination_update_with_old_data() {
let _mysql_dest = MySQLDestination::new();
let update_event = create_update_event();
match &update_event.event_type {
EventType::Update {
old_data, new_data, ..
} => {
assert!(old_data.is_some());
assert!(!new_data.is_empty());
}
_ => panic!("Expected Update event"),
}
if let EventType::Update {
old_data, new_data, ..
} = &update_event.event_type
{
let old_data = old_data.as_ref().unwrap();
assert!(old_data.get("id").is_some());
assert_eq!(old_data.get("id").unwrap(), "1");
assert!(new_data.get("id").is_some());
assert!(new_data.get("name").is_some());
assert_eq!(new_data.get("name").unwrap(), "new_name");
} else {
panic!("Expected Update event");
}
}
#[test]
fn test_mysql_destination_update_without_old_data() {
let _mysql_dest = MySQLDestination::new();
let update_event = create_update_event_without_old_data();
match &update_event.event_type {
EventType::Update {
old_data, new_data, ..
} => {
assert!(old_data.is_none());
assert!(!new_data.is_empty());
assert!(new_data.get("id").is_some());
assert!(new_data.get("name").is_some());
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_sqlserver_destination_update_with_old_data() {
let _sqlserver_dest = SqlServerDestination::new();
let update_event = create_update_event();
match &update_event.event_type {
EventType::Update {
old_data, new_data, ..
} => {
assert!(old_data.is_some());
assert!(!new_data.is_empty());
let old_data = old_data.as_ref().unwrap();
assert!(old_data.get("id").is_some());
assert_eq!(old_data.get("id").unwrap(), "1");
assert!(new_data.get("id").is_some());
assert!(new_data.get("name").is_some());
assert_eq!(new_data.get("name").unwrap(), "new_name");
}
_ => panic!("Expected Update event"),
}
}
#[test]
fn test_delete_event_uses_old_data() {
let delete_event = create_delete_event();
match &delete_event.event_type {
EventType::Delete { old_data, .. } => {
assert!(old_data.get("id").is_some());
assert!(old_data.get("name").is_some());
assert_eq!(old_data.get("name").unwrap(), "deleted_name");
}
_ => panic!("Expected Delete event"),
}
}