#![cfg(feature = "mariadb")]
use rustcdc::TransportConfig;
use rustcdc::{MariaDbConnection, MariaDbSourceConfig, MysqlSourceConfig};
use testcontainers::{
core::{IntoContainerPort, WaitFor},
runners::AsyncRunner,
GenericImage, ImageExt,
};
use tokio::time::{sleep, Duration};
const READY_TIMEOUT: Duration = Duration::from_secs(90);
const CONNECT_TIMEOUT_SECS: u64 = 5;
const CONNECT_RETRY_BUDGET: Duration = Duration::from_secs(75);
struct MariadbTestTarget {
_container: testcontainers::ContainerAsync<GenericImage>,
config: MariaDbSourceConfig,
}
fn skip_mariadb_connection_case(case_label: &str) -> bool {
if std::env::var("CDC_RS_RUN_DOCKER_TESTS").as_deref() == Ok("1") {
return false;
}
eprintln!("skipping {case_label} (set CDC_RS_RUN_DOCKER_TESTS=1)",);
true
}
async fn start_mariadb_container(
version: &str,
) -> rustcdc::Result<testcontainers::ContainerAsync<GenericImage>> {
GenericImage::new("mariadb", version)
.with_exposed_port(3306.tcp())
.with_wait_for(WaitFor::message_on_stderr("ready for connections"))
.with_cmd(vec![
"--log-bin=mysql-bin",
"--binlog-format=ROW",
"--server-id=1",
])
.with_env_var("MYSQL_ROOT_PASSWORD", "rootpass")
.with_env_var("MYSQL_DATABASE", "cdc")
.start()
.await
.map_err(|error| rustcdc::Error::SourceError(error.to_string()))
}
async fn wait_for_mariadb_admin_ready(host: &str, port: u16) -> rustcdc::Result<()> {
let dsn = format!("mysql://root:rootpass@{host}:{port}/cdc");
let deadline = std::time::Instant::now() + READY_TIMEOUT;
let mut backoff = Duration::from_millis(250);
let mut last_error = None;
while std::time::Instant::now() < deadline {
match sqlx::mysql::MySqlPoolOptions::new()
.max_connections(1)
.acquire_timeout(Duration::from_secs(2))
.connect(&dsn)
.await
{
Ok(pool) => {
match sqlx::query("SELECT 1").execute(&pool).await {
Ok(_) => {
pool.close().await;
return Ok(());
}
Err(error) => {
last_error = Some(error.to_string());
}
}
pool.close().await;
}
Err(error) => {
last_error = Some(error.to_string());
}
}
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(2));
}
Err(rustcdc::Error::SourceError(format!(
"mariadb admin readiness probe timed out: {}",
last_error.unwrap_or_else(|| "unknown error".to_string())
)))
}
async fn mariadb_base_config(version: &str, server_id: u32) -> rustcdc::Result<MariadbTestTarget> {
let container = start_mariadb_container(version).await?;
let host = container
.get_host()
.await
.map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
let port = container
.get_host_port_ipv4(3306.tcp())
.await
.map_err(|error| rustcdc::Error::SourceError(error.to_string()))?;
let host_string = host.to_string();
wait_for_mariadb_admin_ready(&host_string, port).await?;
Ok(MariadbTestTarget {
_container: container,
config: MariaDbSourceConfig::new(MysqlSourceConfig {
host: host_string,
port,
user: "root".to_string(),
password: "rootpass".to_string().into(),
database: "cdc".to_string(),
server_id,
gtid_mode_enabled: false,
binlog_format_check: true,
transport: TransportConfig::plaintext(),
conn_timeout_secs: CONNECT_TIMEOUT_SECS,
stream_poll_interval_ms: 50,
max_events_per_poll: 1_000,
..Default::default()
}),
})
}
async fn run_mariadb_connection_lifecycle(version: &str, server_id: u32) -> rustcdc::Result<()> {
let target = mariadb_base_config(version, server_id).await?;
let connection = MariaDbConnection::new(target.config.into_inner());
connect_with_retry(&connection).await?;
assert!(connection.is_connected().await);
connection.close().await;
Ok(())
}
macro_rules! mariadb_connection_test {
($name:ident, $version:literal, $server_id:expr, $label:literal) => {
#[tokio::test]
async fn $name() -> rustcdc::Result<()> {
if skip_mariadb_connection_case($label) {
return Ok(());
}
run_mariadb_connection_lifecycle($version, $server_id).await
}
};
}
async fn connect_with_retry(connection: &MariaDbConnection) -> rustcdc::Result<()> {
let deadline = std::time::Instant::now() + CONNECT_RETRY_BUDGET;
let mut backoff = Duration::from_millis(250);
let mut last_error = None;
while std::time::Instant::now() < deadline {
match connection.connect().await {
Ok(()) => return Ok(()),
Err(error) => {
last_error = Some(error);
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(2));
}
}
}
Err(last_error.unwrap_or_else(|| {
rustcdc::Error::SourceError("mariadb connection did not become ready in time".into())
}))
}
mariadb_connection_test!(
mariadb_connection_10_5,
"10.5",
100,
"mariadb 10.5 connection integration test"
);
mariadb_connection_test!(
mariadb_connection_10_6,
"10.6",
101,
"mariadb 10.6 connection integration test"
);
#[tokio::test]
async fn mariadb_gtid_mode_support() -> rustcdc::Result<()> {
if skip_mariadb_connection_case("mariadb gtid mode support test") {
return Ok(());
}
let target = mariadb_base_config("10.6", 102).await?;
let connection = MariaDbConnection::new(target.config.into_inner());
connect_with_retry(&connection).await?;
assert!(connection.is_connected().await);
connection.close().await;
Ok(())
}