#![allow(clippy::unwrap_used, unused_crate_dependencies)]
use std::collections::BTreeSet;
use schema_core::{DatabaseSchema, TableName};
use sources_core::{CaptureProvisioning, QualifiedTable};
use sources_postgres::{ReplicationConfig, WalChangeCapture};
use sqlx::PgPool;
use sqlx::postgres::PgPoolOptions;
use testcontainers_modules::postgres::Postgres;
use testcontainers_modules::testcontainers::runners::AsyncRunner;
const PUBLICATION: &str = "flusso";
fn capture(port: u16, user: &str, password: &str, db: &str) -> WalChangeCapture {
let config =
ReplicationConfig::new("127.0.0.1", user, password, db, "flusso-test", PUBLICATION)
.with_port(port);
let url = format!("postgres://{user}:{password}@127.0.0.1:{port}/{db}");
WalChangeCapture::new(config, url)
}
fn required(tables: &[&str]) -> BTreeSet<QualifiedTable> {
tables
.iter()
.map(|t| {
QualifiedTable::new(
DatabaseSchema::try_new("public").unwrap(),
TableName::try_new(*t).unwrap(),
)
})
.collect()
}
async fn published_tables(pool: &PgPool) -> Vec<String> {
sqlx::query_scalar::<_, String>(
"SELECT schemaname || '.' || tablename FROM pg_publication_tables \
WHERE pubname = $1 ORDER BY 1",
)
.bind(PUBLICATION)
.fetch_all(pool)
.await
.unwrap()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn privileged_role_creates_extends_and_respects_opt_out() {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let admin_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&admin_url).await.unwrap();
for statement in [
"CREATE TABLE users (id int PRIMARY KEY)",
"CREATE TABLE orders (id int PRIMARY KEY)",
"CREATE TABLE items (id int PRIMARY KEY)",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
let cap = capture(port, "postgres", "postgres", "postgres");
let two = required(&["users", "orders"]);
let report = cap.inspect_coverage(&two).await.unwrap();
assert!(!report.satisfied);
assert_eq!(report.missing.len(), 2);
assert!(report.manageable);
assert!(report.remediation[0].contains("CREATE PUBLICATION"));
cap.ensure_coverage(&two, false).await.unwrap();
assert!(published_tables(&pool).await.is_empty());
cap.ensure_coverage(&two, true).await.unwrap();
assert_eq!(
published_tables(&pool).await,
["public.orders", "public.users"]
);
assert!(cap.inspect_coverage(&two).await.unwrap().satisfied);
let three = required(&["users", "orders", "items"]);
let report = cap.inspect_coverage(&three).await.unwrap();
assert_eq!(
report.missing,
required(&["items"]).into_iter().collect::<Vec<_>>()
);
assert!(report.remediation[0].contains("ALTER PUBLICATION"));
cap.ensure_coverage(&three, true).await.unwrap();
assert_eq!(
published_tables(&pool).await,
["public.items", "public.orders", "public.users"]
);
cap.ensure_coverage(&three, true).await.unwrap();
assert!(cap.inspect_coverage(&three).await.unwrap().satisfied);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires docker"]
async fn read_only_role_reports_gap_without_creating() {
let container = Postgres::default().start().await.unwrap();
let port = container.get_host_port_ipv4(5432).await.unwrap();
let admin_url = format!("postgres://postgres:postgres@127.0.0.1:{port}/postgres");
let pool = PgPoolOptions::new().connect(&admin_url).await.unwrap();
for statement in [
"CREATE TABLE users (id int PRIMARY KEY)",
"CREATE ROLE reader LOGIN PASSWORD 'reader'",
"GRANT SELECT ON ALL TABLES IN SCHEMA public TO reader",
] {
sqlx::query(statement).execute(&pool).await.unwrap();
}
let cap = capture(port, "reader", "reader", "postgres");
let one = required(&["users"]);
let report = cap.inspect_coverage(&one).await.unwrap();
assert!(!report.satisfied);
assert!(
!report.manageable,
"a non-owner read-only role can't manage"
);
assert!(
!report.blockers.is_empty(),
"a non-manageable verdict must explain why"
);
assert!(report.remediation[0].contains("CREATE PUBLICATION"));
cap.ensure_coverage(&one, true).await.unwrap();
assert!(
published_tables(&pool).await.is_empty(),
"nothing should have been created"
);
}