#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod common;
use chrono::Utc;
use common::*;
use dbpulse::queries::postgres;
use dbpulse::tls::cache::CertCache;
use dbpulse::tls::{TlsConfig, TlsMode};
use std::fs::File;
use std::process::{Child, Command, Stdio};
use tokio::time::Duration;
struct ChildGuard(Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
let _ = self.0.kill();
let _ = self.0.wait();
}
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_basic_connection() {
if skip_if_no_postgres() {
return;
}
let dsn = parse_dsn(POSTGRES_DSN);
let now = Utc::now();
let tls = TlsConfig::default();
let table_name = test_table_name("test_postgres_basic_connection");
let cert_cache = CertCache::new(std::time::Duration::from_mins(5));
let result = postgres::test_rw_with_table(&dsn, now, 100, &tls, &cert_cache, &table_name).await;
assert!(
result.is_ok(),
"Failed to connect to PostgreSQL: {result:?}"
);
let health = result.unwrap();
assert_version_and_uptime("PostgreSQL", &health);
assert!(
health.version.chars().any(|c| c.is_ascii_digit()),
"Should contain version number"
);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_read_write_operations() {
if skip_if_no_postgres() {
return;
}
let dsn = parse_dsn(POSTGRES_DSN);
let now = Utc::now();
let tls = TlsConfig::default();
let cert_cache = CertCache::new(std::time::Duration::from_mins(5));
for i in 0..5 {
let table_name = test_table_name(&format!("test_postgres_read_write_operations_{i}"));
let result =
postgres::test_rw_with_table(&dsn, now, 100, &tls, &cert_cache, &table_name).await;
let health = result.unwrap_or_else(|e| panic!("Iteration {i} failed: {e:?}"));
assert_version_and_uptime("PostgreSQL", &health);
}
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_transaction_rollback() {
if skip_if_no_postgres() {
return;
}
let dsn = parse_dsn(POSTGRES_DSN);
let now = Utc::now();
let tls = TlsConfig::default();
let table_name = test_table_name("test_postgres_transaction_rollback");
let cert_cache = CertCache::new(std::time::Duration::from_mins(5));
let result = postgres::test_rw_with_table(&dsn, now, 100, &tls, &cert_cache, &table_name).await;
let health = result.unwrap_or_else(|e| panic!("Transaction test failed: {e:?}"));
assert_version_and_uptime("PostgreSQL", &health);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_concurrent_connections() {
if skip_if_no_postgres() {
return;
}
let mut handles = vec![];
for i in 0..10 {
let table_name = test_table_name(&format!("test_postgres_concurrent_connections_{i}"));
let handle = tokio::spawn(async move {
let dsn = parse_dsn(POSTGRES_DSN);
let tls = TlsConfig::default();
let now = Utc::now();
let cert_cache = CertCache::new(std::time::Duration::from_mins(5));
postgres::test_rw_with_table(&dsn, now, 100, &tls, &cert_cache, &table_name).await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.expect("Task panicked");
match result {
Ok(health) => assert_version_and_uptime("PostgreSQL", &health),
Err(e) => panic!("Concurrent test failed: {e:?}"),
}
}
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_with_different_ranges() {
if skip_if_no_postgres() {
return;
}
let dsn = parse_dsn(POSTGRES_DSN);
let now = Utc::now();
let tls = TlsConfig::default();
let cert_cache = CertCache::new(std::time::Duration::from_mins(5));
for range in [10, 50, 100, 500, 1000] {
let table_name = test_table_name(&format!("test_postgres_with_different_ranges_{range}"));
let result =
postgres::test_rw_with_table(&dsn, now, range, &tls, &cert_cache, &table_name).await;
let health = result.unwrap_or_else(|e| panic!("Range {range} failed: {e:?}"));
assert_version_and_uptime("PostgreSQL", &health);
}
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container with TLS"]
async fn test_postgres_tls_disable() {
if skip_if_no_postgres() {
return;
}
let result = test_postgres_with_tls(POSTGRES_DSN, TlsMode::Disable).await;
assert!(result.is_ok(), "TLS Disable failed: {result:?}");
let health = result.unwrap();
assert_version_and_uptime("PostgreSQL", &health);
assert!(
health.tls_metadata.is_none(),
"TLS metadata should be None when disabled"
);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container with TLS enabled"]
async fn test_postgres_tls_require() {
if skip_if_no_postgres() {
return;
}
let result = test_postgres_with_tls(POSTGRES_DSN, TlsMode::Require).await;
match result {
Ok(health) => {
assert_version_and_uptime("PostgreSQL", &health);
println!("TLS connection successful");
if let Some(ref tls_meta) = health.tls_metadata {
println!("TLS Version: {:?}", tls_meta.version);
println!("TLS Cipher: {:?}", tls_meta.cipher);
assert!(
tls_meta.version.is_some() || tls_meta.cipher.is_some(),
"Should have TLS metadata when TLS is enabled"
);
}
}
Err(e) => {
println!("TLS test skipped (no TLS configured): {e}");
}
}
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_database_creation() {
if skip_if_no_postgres() {
return;
}
let dsn_str = "postgres://postgres:secret@tcp(localhost:5432)/dbpulse_test_db";
let table_name = test_table_name("test_postgres_database_creation");
let result = test_postgres_connection_with_table(dsn_str, &table_name).await;
let health = result.unwrap_or_else(|e| panic!("Database auto-creation failed: {e:?}"));
assert_version_and_uptime("PostgreSQL", &health);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_invalid_credentials() {
if skip_if_no_postgres() {
return;
}
let dsn_str = "postgres://invalid:invalid@tcp(localhost:5432)/testdb";
let result = test_postgres_connection(dsn_str).await;
assert!(result.is_err(), "Should fail with invalid credentials");
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_version_info() {
if skip_if_no_postgres() {
return;
}
let table_name = test_table_name("test_postgres_version_info");
let result = test_postgres_connection_with_table(POSTGRES_DSN, &table_name).await;
assert!(result.is_ok());
let health = result.unwrap();
assert_version_and_uptime("PostgreSQL", &health);
println!("PostgreSQL version: {}", health.version);
assert!(!health.version.is_empty(), "Version should not be empty");
assert!(
health.version.chars().any(|c| c.is_ascii_digit()),
"Version should contain version number"
);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_read_only_detection() {
if skip_if_no_postgres() {
return;
}
let table_name = test_table_name("test_postgres_read_only_detection");
let result = test_postgres_connection_with_table(POSTGRES_DSN, &table_name).await;
assert!(result.is_ok());
let health = result.unwrap();
assert!(
!health.version.contains("recovery mode")
&& !health.version.contains("read-only mode enabled"),
"Database should not be in read-only/recovery mode"
);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_reports_backend_host() {
if skip_if_no_postgres() {
return;
}
let table_name = test_table_name("test_postgres_reports_backend_host");
let result = test_postgres_connection_with_table(POSTGRES_DSN, &table_name).await;
assert!(result.is_ok());
let health = result.unwrap();
let host = health.db_host.unwrap_or_default();
assert!(
!host.trim().is_empty(),
"Expected non-empty PostgreSQL backend host"
);
}
#[tokio::test]
#[ignore = "requires running PostgreSQL container"]
async fn test_postgres_metrics_collection() {
if skip_if_no_postgres() {
return;
}
let table_name = test_table_name("test_postgres_metrics_collection");
let result = test_postgres_connection_with_table(POSTGRES_DSN, &table_name).await;
assert!(result.is_ok(), "Connection should succeed");
let metric_families = dbpulse::metrics::REGISTRY.gather();
let mut buffer = Vec::new();
let encoder = prometheus::TextEncoder::new();
prometheus::Encoder::encode(&encoder, &metric_families, &mut buffer)
.expect("Failed to encode metrics");
let metrics_output = String::from_utf8(buffer).expect("Metrics should be valid UTF-8");
assert!(
metrics_output.contains("dbpulse_operation_duration_seconds"),
"dbpulse_operation_duration_seconds metric should be present"
);
assert!(
metrics_output.contains("dbpulse_rows_affected_total"),
"dbpulse_rows_affected_total metric should be present"
);
assert!(
metrics_output.contains("dbpulse_connection_duration_seconds"),
"dbpulse_connection_duration_seconds metric should be present"
);
assert!(
metrics_output.contains("database=\"postgres\""),
"Metrics should be labeled with database='postgres'"
);
assert!(
metrics_output.contains("operation=\"connect\"")
|| metrics_output.contains("operation=\\\"connect\\\""),
"Should have connect operation metrics"
);
assert!(
metrics_output.contains("operation=\"insert\"")
|| metrics_output.contains("operation=\\\"insert\\\""),
"Should have insert operation metrics"
);
assert!(
metrics_output.contains("operation=\"select\"")
|| metrics_output.contains("operation=\\\"select\\\""),
"Should have select operation metrics"
);
if metrics_output.contains("dbpulse_database_size_bytes") {
println!("✓ Database size metrics are being collected");
}
if metrics_output.contains("dbpulse_table_size_bytes") {
println!("✓ Table size metrics are being collected");
}
println!("Metrics verification complete for PostgreSQL");
}
#[tokio::test]
#[ignore = "requires running dbpulse-postgres container and podman/docker access"]
async fn test_postgres_pulse_transition_stop_start() {
if skip_if_no_postgres() {
return;
}
if std::env::var("RUN_FAILOVER_TRANSITION_TESTS").as_deref() != Ok("1") {
println!("Skipping failover transition test (set RUN_FAILOVER_TRANSITION_TESTS=1)");
return;
}
assert!(
wait_for_postgres_ready(POSTGRES_DSN, Duration::from_secs(30)).await,
"PostgreSQL is not reachable with application DSN before failover test"
);
let port = pick_free_port();
let binary = dbpulse_binary_path();
let stdout_log = format!("/tmp/dbpulse-postgres-failover-{port}.stdout.log");
let stderr_log = format!("/tmp/dbpulse-postgres-failover-{port}.stderr.log");
let stdout_file = File::create(&stdout_log).expect("failed to create stdout log file");
let stderr_file = File::create(&stderr_log).expect("failed to create stderr log file");
let child = Command::new(binary)
.args([
"--dsn",
POSTGRES_DSN,
"--interval",
"1",
"--listen",
"127.0.0.1",
"--port",
&port.to_string(),
])
.stdout(Stdio::from(stdout_file))
.stderr(Stdio::from(stderr_file))
.spawn()
.expect("failed to spawn dbpulse");
let mut guard = ChildGuard(child);
assert!(
wait_for_metrics_endpoint(port, Duration::from_secs(10)).await,
"dbpulse metrics endpoint not reachable on port {port}. process status: {:?}\nstdout:\n{}\nstderr:\n{}",
guard.0.try_wait().ok().flatten(),
std::fs::read_to_string(&stdout_log).unwrap_or_default(),
std::fs::read_to_string(&stderr_log).unwrap_or_default()
);
let initial_pulse = wait_for_pulse_value_detailed(port, 1, Duration::from_secs(40)).await;
assert!(
initial_pulse.is_ok(),
"Expected initial pulse=1 before failover simulation: {}. process status: {:?}\nstdout:\n{}\nstderr:\n{}",
initial_pulse.err().unwrap_or_default(),
guard.0.try_wait().ok().flatten(),
std::fs::read_to_string(&stdout_log).unwrap_or_default(),
std::fs::read_to_string(&stderr_log).unwrap_or_default()
);
assert!(
control_container("stop", "dbpulse-postgres"),
"Failed to stop PostgreSQL container (dbpulse-postgres)"
);
assert!(
wait_for_pulse_value(port, 0, Duration::from_secs(30)).await,
"Expected pulse transition to 0 after container stop"
);
assert!(
control_container("start", "dbpulse-postgres"),
"Failed to start PostgreSQL container (dbpulse-postgres)"
);
assert!(
wait_for_pulse_value(port, 1, Duration::from_mins(1)).await,
"Expected pulse transition back to 1 after container start"
);
}