use std::env;
use std::process::Command;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use defer::defer;
use log::info;
use sqlx::postgres::PgPoolOptions;
use tokio::sync::Mutex;
use crate::config::Config;
use crate::config::inmem::InMemoryConfig;
use crate::consumer::NilCA;
use crate::database::Database;
use crate::database::postgresql::PostgresDatabase;
#[cfg(test)]
use crate::transform::TransformRequest;
use crate::transform::{NilTA, NilTR};
use crate::worker::worker_manager::WorkerManagerResult;
#[tokio::test]
async fn test_postgresql_migrations() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-postgresql-migrations",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args(["stop", "orch-test-postgres-test-postgresql-migrations"])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-postgresql-migrations",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let conn_str = toml::Value::String(conn_str);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let _pgdb: PostgresDatabase<NilTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let conn_str_bytes = inmem_config
.lock()
.await
.get("db.conn_str".to_string())
.await
.unwrap();
let conn_str: toml::Value = serde_json::from_slice(&conn_str_bytes).unwrap();
let conn_str = conn_str.as_str().unwrap().to_owned();
let pool = PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(5))
.connect(&conn_str)
.await
.expect("Failed to create database connection pool");
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM transform_requests")
.fetch_one(&pool)
.await
.expect("Failed to query transform_requests table");
assert!(
row.0 >= 0,
"Transform requests table should exist and be queryable"
);
let transform_attempts_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM transform_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query transform_attempts table");
assert!(
transform_attempts_count.0 >= 0,
"Transform attempts table should exist and be queryable"
);
let consume_attempts_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM consume_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query consume_attempts table");
assert!(
consume_attempts_count.0 >= 0,
"Consume attempts table should exist and be queryable"
);
let dynamic_configs_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM dynamic_configs")
.fetch_one(&pool)
.await
.expect("Failed to query dynamic_configs table");
assert!(
dynamic_configs_count.0 >= 0,
"Dynamic configs table should exist and be queryable"
);
let archive_transform_requests_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM archive_transform_requests")
.fetch_one(&pool)
.await
.expect("Failed to query archive_transform_requests table");
assert!(
archive_transform_requests_count.0 >= 0,
"Archive transform requests table should exist and be queryable"
);
let archive_transform_attempts_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM archive_transform_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query archive_transform_attempts table");
assert!(
archive_transform_attempts_count.0 >= 0,
"Archive transform attempts table should exist and be queryable"
);
let archive_consume_attempts_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM archive_consume_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query archive_consume_attempts table");
assert!(
archive_consume_attempts_count.0 >= 0,
"Archive consume attempts table should exist and be queryable"
);
}
#[tokio::test]
async fn test_insert_transform_request() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-insert-transform-request",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args(["stop", "orch-test-postgres-test-insert-transform-request"])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-insert-transform-request",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let conn_str = toml::Value::String(conn_str);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let mut pgdb: PostgresDatabase<NilTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let mock_request = NilTR;
pgdb.register_transform_request(&mock_request)
.await
.expect("Failed to insert transform request");
let conn_str_bytes = inmem_config
.lock()
.await
.get("db.conn_str".to_string())
.await
.unwrap();
let conn_str: toml::Value = serde_json::from_slice(&conn_str_bytes).unwrap();
let conn_str = conn_str.as_str().unwrap().to_owned();
let pool = PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(5))
.connect(&conn_str)
.await
.expect("Failed to create database connection pool");
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM transform_requests")
.fetch_one(&pool)
.await
.expect("Failed to query transform_requests table");
assert_eq!(
row.0, 1,
"Transform request should be inserted successfully"
);
}
#[tokio::test]
#[should_panic]
async fn test_insert_transform_attempt_without_transform_request() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-insert-transform-attempt-wo-request",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args([
"stop",
"orch-test-postgres-test-insert-transform-attempt-wo-request",
])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-insert-transform-attempt-wo-request",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let conn_str = toml::Value::String(conn_str);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let mut pgdb: PostgresDatabase<NilTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let conn_str_bytes = inmem_config
.lock()
.await
.get("db.conn_str".to_string())
.await
.unwrap();
let conn_str: toml::Value = serde_json::from_slice(&conn_str_bytes).unwrap();
let conn_str = conn_str.as_str().unwrap().to_owned();
let mock_attempt = NilTA;
pgdb.register_transform_attempt(&mock_attempt)
.await
.expect("Failed to insert transform attempt");
let pool = PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(5))
.connect(&conn_str)
.await
.expect("Failed to create database connection pool");
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM transform_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query transform_attempts table");
assert_eq!(
row.0, 1,
"Transform attempt should be inserted successfully"
);
}
#[tokio::test]
async fn test_insert_transform_attempt_with_transform_request() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-insert-transform-attempt-with-request",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args([
"stop",
"orch-test-postgres-test-insert-transform-attempt-with-request",
])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-insert-transform-attempt-with-request",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let conn_str = toml::Value::String(conn_str);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let mut pgdb: PostgresDatabase<NilTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let conn_str_bytes = inmem_config
.lock()
.await
.get("db.conn_str".to_string())
.await
.unwrap();
let conn_str: toml::Value = serde_json::from_slice(&conn_str_bytes).unwrap();
let conn_str = conn_str.as_str().unwrap().to_owned();
let mock_request = NilTR;
pgdb.register_transform_request(&mock_request)
.await
.expect("Failed to insert transform request");
let mock_attempt = NilTA;
pgdb.register_transform_attempt(&mock_attempt)
.await
.expect("Failed to insert transform attempt");
let pool = PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(5))
.connect(&conn_str)
.await
.expect("Failed to create database connection pool");
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM transform_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query transform_attempts table");
assert_eq!(
row.0, 1,
"Transform attempt should be inserted successfully"
);
}
#[tokio::test]
async fn test_insert_consume_attempt() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-insert-consume-attempt",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args(["stop", "orch-test-postgres-test-insert-consume-attempt"])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-insert-consume-attempt",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let mut pgdb: PostgresDatabase<NilTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let mock_request = NilTR;
pgdb.register_transform_request(&mock_request)
.await
.expect("Failed to insert transform request");
let mock_attempt = NilTA;
pgdb.register_transform_attempt(&mock_attempt)
.await
.expect("Failed to insert transform attempt");
let mock_attempt_result = WorkerManagerResult::<NilTA>::Success((), ((), (), Ok(())));
pgdb.update_transform_attempt(&mock_attempt_result)
.await
.expect("Failed to update transform attempt");
let mock_consume_attempt = NilCA;
pgdb.register_consume_attempt(&mock_consume_attempt)
.await
.expect("Failed to insert consume attempt");
let pool = PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(5))
.connect(&conn_str)
.await
.expect("Failed to create database connection pool");
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM consume_attempts")
.fetch_one(&pool)
.await
.expect("Failed to query consume_attempts table");
assert_eq!(row.0, 1, "Consume attempt should be inserted successfully");
}
#[tokio::test]
async fn test_dyn_configs_empty_on_init() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-dyn-config-nil-on-init",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args(["stop", "orch-test-postgres-test-dyn-config-nil-on-init"])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-dyn-config-nil-on-init",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
let conn_str = toml::Value::String(conn_str);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let mut pgdb: PostgresDatabase<NilTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let cfgs = pgdb
.get_dyn_configs()
.await
.expect("Failed to get dynamic configs");
assert_eq!(cfgs.len(), 0, "Dynamic configs should be empty on init");
}
#[cfg(test)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DynCfgOnlyTR(Vec<(String, Vec<u8>)>);
#[cfg(test)]
impl TransformRequest for DynCfgOnlyTR {
type Identifier = ();
type Input = ();
type Output = ();
fn request_id(&self) -> Self::Identifier { () }
fn input(&self) -> &Self::Input { &() }
fn get_dyn_configs(&self) -> Vec<(String, Vec<u8>)> { self.0.clone() }
}
#[tokio::test]
async fn test_dyn_configs_change_after_transform_request() {
let _ = env_logger::try_init();
let unique_folder = env::temp_dir().join(format!(
"shepherd_pgsql_testmount_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
info!("Using unique folder: {:?}", unique_folder);
let host_port = 10000 + rand::random::<u16>() % 10000;
let start_command = Command::new("docker")
.args([
"run",
"--rm",
"--name",
"orch-test-postgres-test-dyn-config-change-after-transform-request",
"-e",
"POSTGRES_PASSWORD=postgres",
"-e",
"POSTGRES_DB=test_db",
"-e",
"PGDATA=/var/lib/postgresql/data/pgdata",
"-v",
&format!(
"{}:/var/lib/postgresql/data",
unique_folder.as_os_str().to_str().unwrap()
),
"-p",
&format!("{}:5432", host_port),
"-d",
"postgres:latest", ])
.output()
.expect("Failed to start PostgreSQL Docker container");
assert!(
start_command.status.success(),
"Failed to start PostgreSQL container, stdout: {}, stderr: {}",
String::from_utf8_lossy(&start_command.stdout),
String::from_utf8_lossy(&start_command.stderr)
);
defer!({
let stop_command = Command::new("docker")
.args([
"stop",
"orch-test-postgres-test-dyn-config-change-after-transform-request",
])
.output()
.expect("Failed to stop PostgreSQL Docker container");
assert!(
stop_command.status.success(),
"Failed to stop PostgreSQL container"
);
});
loop {
let check_command = Command::new("docker")
.args([
"exec",
"orch-test-postgres-test-dyn-config-change-after-transform-request",
"pg_isready",
"-U",
"postgres",
])
.output()
.expect("Failed to check PostgreSQL readiness");
if check_command.status.success() {
break;
}
sleep(Duration::from_secs(1));
}
let conn_str = format!(
"postgres://postgres:postgres@localhost:{}/test_db",
host_port
);
let conn_str = toml::Value::String(conn_str);
let conn_str_bytes = serde_json::to_vec(&conn_str).unwrap();
let inmem_config = Arc::new(Mutex::new(
InMemoryConfig::new(())
.await
.expect("Failed to create InMemoryConfig"),
));
inmem_config
.lock()
.await
.set("db.conn_str".to_string(), conn_str_bytes)
.await
.expect("Failed to set connection string in config");
let mut pgdb: PostgresDatabase<DynCfgOnlyTR, NilTA, NilCA, InMemoryConfig<String, Vec<u8>>> =
PostgresDatabase::new(inmem_config.clone())
.await
.expect("Failed to create PostgreSQL instance");
let cfgs = pgdb
.get_dyn_configs()
.await
.expect("Failed to get dynamic configs");
assert_eq!(cfgs.len(), 0, "Dynamic configs should be empty on init");
let request = DynCfgOnlyTR(vec![
("key1".to_string(), vec![1, 2, 3]),
("key2".to_string(), vec![42]),
]);
pgdb.register_transform_request(&request)
.await
.expect("Failed to register transform request");
let cfgs = pgdb
.get_dyn_configs()
.await
.expect("Failed to get dynamic configs after request");
assert_eq!(cfgs.len(), 2, "Dynamic configs should contain 2 entries");
}