use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::process::{Command, Stdio};
use std::time::{Duration, Instant};
use tempfile::TempDir;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MySqlConformanceConfig {
pub mariadb_versions: Vec<String>,
pub auth_plugins: Vec<String>,
pub ssl_modes: Vec<String>,
pub test_database: String,
pub timeout_secs: u64,
}
impl Default for MySqlConformanceConfig {
fn default() -> Self {
Self {
mariadb_versions: vec!["10.5".to_string(), "10.11".to_string(), "11.2".to_string()],
auth_plugins: vec![
"mysql_native_password".to_string(),
"caching_sha2_password".to_string(),
],
ssl_modes: vec![
"PREFERRED".to_string(),
"REQUIRED".to_string(),
"VERIFY_CA".to_string(),
],
test_database: "asupersync_test".to_string(),
timeout_secs: 300,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ConformanceResult {
pub test_name: String,
pub passed: bool,
pub duration_ms: u64,
pub error: Option<String>,
pub server_version: String,
pub metadata: BTreeMap<String, serde_json::Value>,
}
pub struct MySqlConformanceRunner {
config: MySqlConformanceConfig,
_temp_dir: TempDir,
}
impl MySqlConformanceRunner {
pub fn new(config: MySqlConformanceConfig) -> Result<Self, std::io::Error> {
let temp_dir = TempDir::new()?;
Ok(Self {
config,
_temp_dir: temp_dir,
})
}
pub async fn run_suite(&self) -> Result<Vec<ConformanceResult>, Box<dyn std::error::Error>> {
let mut results = Vec::new();
for version in &self.config.mariadb_versions {
eprintln!("Testing against MariaDB {}", version);
let container = self.start_mariadb_container(version).await?;
results.extend(
self.test_authentication_plugins(&container, version)
.await?,
);
results.extend(self.test_ssl_modes(&container, version).await?);
results.extend(self.test_prepared_statements(&container, version).await?);
results.extend(
self.test_protocol_41_edge_cases(&container, version)
.await?,
);
self.stop_container(&container).await?;
}
Ok(results)
}
async fn start_mariadb_container(
&self,
version: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let container_name = format!("asupersync-mysql-test-{}", version.replace(".", "-"));
let output = Command::new("docker").args(["version"]).output()?;
if !output.status.success() {
return Err("Docker not available for testing".into());
}
let image = format!("mariadb:{version}");
let pull_cmd = Command::new("docker")
.args(["pull", image.as_str()])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()?;
if !pull_cmd.success() {
return Err(format!("Failed to pull {image} image").into());
}
let database_env = format!("MYSQL_DATABASE={}", self.config.test_database);
let start_cmd = Command::new("docker")
.args([
"run",
"-d",
"--name",
container_name.as_str(),
"-e",
"MYSQL_ROOT_PASSWORD=testpass",
"-e",
database_env.as_str(),
"-e",
"MYSQL_USER=testuser",
"-e",
"MYSQL_PASSWORD=testpass",
"-p",
"0:3306", image.as_str(),
"--default-authentication-plugin=mysql_native_password",
])
.output()?;
if !start_cmd.status.success() {
return Err(format!(
"Failed to start MariaDB container: {}",
String::from_utf8_lossy(&start_cmd.stderr)
)
.into());
}
let container_id = String::from_utf8(start_cmd.stdout)?.trim().to_string();
self.wait_for_mariadb_ready(&container_id).await?;
Ok(container_id)
}
async fn wait_for_mariadb_ready(
&self,
container_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let start = Instant::now();
let timeout = Duration::from_secs(self.config.timeout_secs);
while start.elapsed() < timeout {
let health_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysqladmin",
"ping",
"-h",
"localhost",
"-u",
"root",
"-ptestpass",
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()?;
if health_cmd.success() {
tokio::time::sleep(Duration::from_secs(2)).await;
return Ok(());
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
Err("MariaDB container failed to become ready".into())
}
async fn test_authentication_plugins(
&self,
container_id: &str,
version: &str,
) -> Result<Vec<ConformanceResult>, Box<dyn std::error::Error>> {
let mut results = Vec::new();
for plugin in &self.config.auth_plugins {
let start = Instant::now();
let mut metadata = BTreeMap::new();
metadata.insert(
"auth_plugin".to_string(),
serde_json::Value::String(plugin.clone()),
);
let result = match plugin.as_str() {
"mysql_native_password" => self.test_native_password_auth(container_id).await,
"caching_sha2_password" => self.test_caching_sha2_auth(container_id).await,
_ => Err("Unknown authentication plugin".into()),
};
results.push(ConformanceResult {
test_name: format!("auth_plugin_{}", plugin),
passed: result.is_ok(),
duration_ms: start.elapsed().as_millis() as u64,
error: result.err().map(|e| e.to_string()),
server_version: version.to_string(),
metadata,
});
}
Ok(results)
}
async fn test_native_password_auth(
&self,
container_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let _port = self.get_container_port(container_id).await?;
let test_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"testuser",
"-ptestpass",
"-e",
"SELECT 1 as test_connection;",
])
.output()?;
if test_cmd.status.success() {
Ok(())
} else {
Err(format!(
"Native password auth test failed: {}",
String::from_utf8_lossy(&test_cmd.stderr)
)
.into())
}
}
async fn test_caching_sha2_auth(
&self,
container_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let create_user_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"root",
"-ptestpass",
"-e",
"CREATE USER 'sha2user'@'%' IDENTIFIED WITH caching_sha2_password BY 'sha2pass'; \
GRANT ALL PRIVILEGES ON *.* TO 'sha2user'@'%'; FLUSH PRIVILEGES;",
])
.output()?;
if !create_user_cmd.status.success() {
return Err("Failed to create caching_sha2_password user".into());
}
let test_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"sha2user",
"-psha2pass",
"-e",
"SELECT 1 as test_sha2_connection;",
])
.output()?;
if test_cmd.status.success() {
Ok(())
} else {
Err(format!(
"Caching SHA2 auth test failed: {}",
String::from_utf8_lossy(&test_cmd.stderr)
)
.into())
}
}
async fn test_ssl_modes(
&self,
container_id: &str,
version: &str,
) -> Result<Vec<ConformanceResult>, Box<dyn std::error::Error>> {
let mut results = Vec::new();
for ssl_mode in &self.config.ssl_modes {
let start = Instant::now();
let mut metadata = BTreeMap::new();
metadata.insert(
"ssl_mode".to_string(),
serde_json::Value::String(ssl_mode.clone()),
);
let result = self.test_ssl_mode(container_id, ssl_mode).await;
results.push(ConformanceResult {
test_name: format!("ssl_mode_{}", ssl_mode.to_lowercase()),
passed: result.is_ok(),
duration_ms: start.elapsed().as_millis() as u64,
error: result.err().map(|e| e.to_string()),
server_version: version.to_string(),
metadata,
});
}
Ok(results)
}
async fn test_ssl_mode(
&self,
container_id: &str,
ssl_mode: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let ssl_mode_arg = format!("--ssl-mode={ssl_mode}");
let ssl_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"testuser",
"-ptestpass",
ssl_mode_arg.as_str(),
"-e",
"SHOW STATUS LIKE 'Ssl_cipher';",
])
.output()?;
if ssl_cmd.status.success() {
let output = String::from_utf8_lossy(&ssl_cmd.stdout);
if output.contains("Ssl_cipher") {
Ok(())
} else {
Err("SSL not properly configured".into())
}
} else {
Err(format!(
"SSL test failed: {}",
String::from_utf8_lossy(&ssl_cmd.stderr)
)
.into())
}
}
async fn test_prepared_statements(
&self,
container_id: &str,
version: &str,
) -> Result<Vec<ConformanceResult>, Box<dyn std::error::Error>> {
let start = Instant::now();
let mut metadata = BTreeMap::new();
metadata.insert(
"test_type".to_string(),
serde_json::Value::String("prepared_statements".to_string()),
);
let result = self.test_prepared_statement_lifecycle(container_id).await;
Ok(vec![ConformanceResult {
test_name: "prepared_statements_lifecycle".to_string(),
passed: result.is_ok(),
duration_ms: start.elapsed().as_millis() as u64,
error: result.err().map(|e| e.to_string()),
server_version: version.to_string(),
metadata,
}])
}
async fn test_prepared_statement_lifecycle(
&self,
container_id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let create_table_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"testuser",
"-ptestpass",
self.config.test_database.as_str(),
"-e",
"CREATE TABLE IF NOT EXISTS test_prep (id INT PRIMARY KEY, name VARCHAR(50));",
])
.output()?;
if !create_table_cmd.status.success() {
return Err("Failed to create test table".into());
}
let prep_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"testuser",
"-ptestpass",
self.config.test_database.as_str(),
"-e",
"PREPARE stmt FROM 'INSERT INTO test_prep (id, name) VALUES (?, ?)'; \
SET @id = 1, @name = 'test'; \
EXECUTE stmt USING @id, @name; \
DEALLOCATE PREPARE stmt;",
])
.output()?;
if prep_cmd.status.success() {
Ok(())
} else {
Err(format!(
"Prepared statement test failed: {}",
String::from_utf8_lossy(&prep_cmd.stderr)
)
.into())
}
}
async fn test_protocol_41_edge_cases(
&self,
container_id: &str,
version: &str,
) -> Result<Vec<ConformanceResult>, Box<dyn std::error::Error>> {
let start = Instant::now();
let mut metadata = BTreeMap::new();
metadata.insert(
"protocol".to_string(),
serde_json::Value::String("CLIENT_PROTOCOL_41".to_string()),
);
let result = self.test_protocol_41(container_id).await;
Ok(vec![ConformanceResult {
test_name: "protocol_41_edge_cases".to_string(),
passed: result.is_ok(),
duration_ms: start.elapsed().as_millis() as u64,
error: result.err().map(|e| e.to_string()),
server_version: version.to_string(),
metadata,
}])
}
async fn test_protocol_41(&self, container_id: &str) -> Result<(), Box<dyn std::error::Error>> {
let version_cmd = Command::new("docker")
.args([
"exec",
container_id,
"mysql",
"-h",
"localhost",
"-u",
"testuser",
"-ptestpass",
"-e",
"SELECT VERSION(), CONNECTION_ID();",
])
.output()?;
if version_cmd.status.success() {
Ok(())
} else {
Err("Protocol 41 test failed".into())
}
}
async fn get_container_port(
&self,
container_id: &str,
) -> Result<u16, Box<dyn std::error::Error>> {
let port_cmd = Command::new("docker")
.args(["port", container_id, "3306"])
.output()?;
if port_cmd.status.success() {
let port_output = String::from_utf8(port_cmd.stdout)?;
let port_str = port_output
.split(':')
.nth(1)
.ok_or("Invalid port output format")?
.trim();
let port: u16 = port_str.parse()?;
Ok(port)
} else {
Err("Failed to get container port".into())
}
}
async fn stop_container(&self, container_id: &str) -> Result<(), Box<dyn std::error::Error>> {
Command::new("docker")
.args(["stop", container_id])
.stdout(Stdio::null())
.status()?;
Command::new("docker")
.args(["rm", container_id])
.stdout(Stdio::null())
.status()?;
Ok(())
}
pub fn generate_report(&self, results: &[ConformanceResult]) -> String {
let mut report = String::new();
report.push_str("# MySQL Wire Protocol Conformance Report\n\n");
let total_tests = results.len();
let passed_tests = results.iter().filter(|r| r.passed).count();
let failed_tests = total_tests - passed_tests;
report.push_str(&format!(
"**Summary**: {}/{} tests passed, {} failed ({:.1}%)\n\n",
passed_tests,
total_tests,
failed_tests,
(passed_tests as f64 / total_tests as f64) * 100.0
));
for result in results {
let status = if result.passed {
"✅ PASS"
} else {
"❌ FAIL"
};
report.push_str(&format!(
"- {} {} ({} ms) - MariaDB {}\n",
status, result.test_name, result.duration_ms, result.server_version
));
if let Some(error) = &result.error {
report.push_str(&format!(" Error: {}\n", error));
}
}
report
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_conformance_config_default() {
let config = MySqlConformanceConfig::default();
assert!(!config.mariadb_versions.is_empty());
assert!(!config.auth_plugins.is_empty());
assert_eq!(config.test_database, "asupersync_test");
}
#[tokio::test]
async fn test_conformance_runner_creation() {
let config = MySqlConformanceConfig::default();
let runner = MySqlConformanceRunner::new(config);
assert!(runner.is_ok());
}
}