#![cfg(feature = "postgres")]
use narwhal_core::schema::TableSchema;
use narwhal_core::{ConnectionConfig, ConnectionParams, DatabaseDriver, DynConnection, TableKind};
use narwhal_drivers::postgres::PostgresDriver;
use narwhal_schema_diff::diff;
use narwhal_schema_diff::emit::{DdlEmitter, PostgresEmitter};
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::postgres::Postgres;
use uuid::Uuid;
struct TwinHarness {
_container: testcontainers::ContainerAsync<Postgres>,
driver: PostgresDriver,
src_config: ConnectionConfig,
tgt_config: ConnectionConfig,
password: String,
}
impl TwinHarness {
async fn start() -> Self {
let container = Postgres::default().start().await.expect("start postgres");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("postgres port");
let make_cfg = |name: &str, db: &str| ConnectionConfig {
id: Uuid::new_v4(),
name: name.into(),
driver: PostgresDriver::NAME.into(),
params: ConnectionParams::with(|p| {
p.host = Some("127.0.0.1".into());
p.port = Some(port);
p.database = Some(db.into());
p.username = Some("postgres".into());
}),
};
let mut h = Self {
_container: container,
driver: PostgresDriver::new(),
src_config: make_cfg("source", "src_db"),
tgt_config: make_cfg("target", "tgt_db"),
password: "postgres".into(),
};
let mut admin = make_cfg("admin", "postgres");
admin.params.database = Some("postgres".into());
let mut admin_conn = h
.driver
.connect(&admin, Some(&h.password))
.await
.expect("admin connect");
admin_conn
.execute("CREATE DATABASE src_db", &[])
.await
.expect("create src_db");
admin_conn
.execute("CREATE DATABASE tgt_db", &[])
.await
.expect("create tgt_db");
let _ = admin_conn.close().await;
h.password = h.password.clone();
h
}
async fn connect_src(&self) -> Box<dyn DynConnection> {
self.driver
.connect(&self.src_config, Some(&self.password))
.await
.expect("connect source")
}
async fn connect_tgt(&self) -> Box<dyn DynConnection> {
self.driver
.connect(&self.tgt_config, Some(&self.password))
.await
.expect("connect target")
}
}
async fn run(conn: &mut Box<dyn DynConnection>, sql: &str) {
conn.execute(sql, &[])
.await
.unwrap_or_else(|e| panic!("execute `{sql}` failed: {e}"));
}
async fn introspect(conn: &mut Box<dyn DynConnection>) -> Vec<TableSchema> {
let catalog = conn.list_all_tables().await.expect("list_all_tables");
let mut out = Vec::new();
for (schema, tables) in catalog {
for table in tables {
if !matches!(table.kind, TableKind::Table) {
continue;
}
if let Ok(ts) = conn.describe_table(&schema.name, &table.name).await {
out.push(ts);
}
}
}
out
}
#[tokio::test]
#[ignore = "requires docker"]
async fn schema_diff_postgres_round_trip() {
let h = TwinHarness::start().await;
let mut src = h.connect_src().await;
run(
&mut src,
"CREATE TABLE users (
id serial PRIMARY KEY,
email text NOT NULL,
created_at timestamp DEFAULT now()
)",
)
.await;
run(&mut src, "CREATE INDEX users_email_idx ON users (email)").await;
let mut tgt = h.connect_tgt().await;
run(
&mut tgt,
"CREATE TABLE users (
id serial PRIMARY KEY,
email text
)",
)
.await;
let source_tables = introspect(&mut src).await;
let target_tables = introspect(&mut tgt).await;
let initial = diff(&source_tables, &target_tables);
assert!(
!initial.is_empty(),
"test precondition: the schemas must differ before the round-trip"
);
let ddl = PostgresEmitter::new()
.emit(&initial)
.expect("emit postgres DDL");
for stmt in split_statements(&ddl) {
run(&mut tgt, &stmt).await;
}
let target_after = introspect(&mut tgt).await;
let after = diff(&source_tables, &target_after);
assert!(
after.is_empty(),
"post-migration diff must be empty.\nDDL applied:\n{ddl}\nstill-differing:\n{after:#?}",
);
let _ = src.close().await;
let _ = tgt.close().await;
}
fn split_statements(ddl: &str) -> Vec<String> {
let mut out = Vec::new();
let mut buf = String::new();
for line in ddl.split('\n') {
if line.trim_start().starts_with("--") || line.trim().is_empty() {
continue;
}
buf.push_str(line);
buf.push('\n');
if line.trim_end().ends_with(';') {
out.push(std::mem::take(&mut buf));
}
}
out
}