use std::time::Duration;
use narwhal_core::{
ConnectionConfig, ConnectionParams, DatabaseDriver, Error, IsolationLevel, Value,
};
use narwhal_drivers::postgres::PostgresDriver;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::postgres::Postgres;
use uuid::Uuid;
struct Harness {
_container: testcontainers::ContainerAsync<Postgres>,
driver: PostgresDriver,
config: ConnectionConfig,
password: String,
}
impl Harness {
async fn start() -> Self {
let container = Postgres::default()
.start()
.await
.expect("start postgres container");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("postgres host port");
let config = ConnectionConfig {
id: Uuid::nil(),
name: "it".into(),
driver: PostgresDriver::NAME.into(),
params: ConnectionParams::with(|p| {
p.host = Some("127.0.0.1".into());
p.port = Some(port);
p.database = Some("postgres".into());
p.username = Some("postgres".into());
}),
};
Self {
_container: container,
driver: PostgresDriver::new(),
config,
password: "postgres".into(),
}
}
async fn connect(&self) -> Box<dyn narwhal_core::DynConnection> {
self.driver
.connect(&self.config, Some(&self.password))
.await
.expect("driver connect")
}
}
#[tokio::test]
#[ignore = "requires docker"]
async fn round_trip_select_and_parameter_binding() {
let h = Harness::start().await;
let mut conn = h.connect().await;
conn.execute(
"CREATE TABLE items (id SERIAL PRIMARY KEY, name TEXT NOT NULL, qty INT)",
&[],
)
.await
.unwrap();
let insert = conn
.execute(
"INSERT INTO items (name, qty) VALUES ($1, $2)",
&[Value::String("widget".into()), Value::Int(7)],
)
.await
.unwrap();
assert_eq!(insert.rows_affected, Some(1));
let select = conn
.execute(
"SELECT name, qty FROM items WHERE qty >= $1",
&[Value::Int(1)],
)
.await
.unwrap();
assert_eq!(select.rows.len(), 1);
assert_eq!(
select.rows[0].get(0).map(Value::render),
Some("widget".into())
);
assert_eq!(select.rows[0].get(1).map(Value::render), Some("7".into()));
}
#[tokio::test]
#[ignore = "requires docker"]
async fn streaming_consumes_rows_lazily() {
let h = Harness::start().await;
let mut conn = h.connect().await;
conn.execute("CREATE TABLE nums (n INT)", &[])
.await
.unwrap();
conn.execute("INSERT INTO nums SELECT generate_series(1, 100)", &[])
.await
.unwrap();
let mut stream = conn
.stream("SELECT n FROM nums ORDER BY n", &[])
.await
.unwrap();
assert_eq!(stream.columns().len(), 1);
let mut total: i64 = 0;
let mut count: i64 = 0;
while let Some(row) = stream.next_row().await.unwrap() {
if let Some(Value::Int(n)) = row.get(0) {
total += *n;
count += 1;
}
}
assert_eq!(count, 100);
assert_eq!(total, (1..=100).sum::<i64>());
}
#[tokio::test]
#[ignore = "requires docker"]
async fn transaction_rollback_discards_changes() {
let h = Harness::start().await;
let mut conn = h.connect().await;
conn.execute("CREATE TABLE counters (k TEXT PRIMARY KEY, v INT)", &[])
.await
.unwrap();
conn.begin_with(IsolationLevel::Serializable).await.unwrap();
conn.execute(
"INSERT INTO counters VALUES ($1, $2)",
&[Value::String("a".into()), Value::Int(1)],
)
.await
.unwrap();
conn.rollback().await.unwrap();
let select = conn
.execute("SELECT count(*) FROM counters", &[])
.await
.unwrap();
assert_eq!(select.rows[0].get(0).map(Value::render), Some("0".into()));
}
#[tokio::test]
#[ignore = "requires docker"]
async fn savepoint_partial_rollback() {
let h = Harness::start().await;
let mut conn = h.connect().await;
conn.execute("CREATE TABLE t (n INT)", &[]).await.unwrap();
conn.begin().await.unwrap();
conn.execute("INSERT INTO t VALUES (1)", &[]).await.unwrap();
conn.savepoint("sp1").await.unwrap();
conn.execute("INSERT INTO t VALUES (2)", &[]).await.unwrap();
conn.rollback_to_savepoint("sp1").await.unwrap();
conn.release_savepoint("sp1").await.unwrap();
conn.commit().await.unwrap();
let result = conn
.execute("SELECT n FROM t ORDER BY n", &[])
.await
.unwrap();
assert_eq!(result.rows.len(), 1);
assert_eq!(result.rows[0].get(0).map(Value::render), Some("1".into()));
}
#[tokio::test]
#[ignore = "requires docker"]
async fn cancel_in_flight_query() {
let h = Harness::start().await;
let conn = h.connect().await;
let cancel = conn.cancel_handle().expect("postgres exposes cancel");
let task: tokio::task::JoinHandle<Result<_, Error>> = tokio::spawn(async move {
let mut conn = conn;
conn.execute("SELECT pg_sleep(30)", &[]).await
});
tokio::time::sleep(Duration::from_millis(500)).await;
cancel.cancel().await.expect("cancel succeeds");
let result = task.await.expect("join cancel task");
match result {
Err(Error::Cancelled) => {}
Err(other) => panic!("expected Cancelled, got {other:?}"),
Ok(_) => panic!("query was not cancelled"),
}
}
#[tokio::test]
#[ignore = "requires docker"]
async fn schema_introspection() {
let h = Harness::start().await;
let mut conn = h.connect().await;
conn.execute(
"CREATE TABLE products (
id SERIAL PRIMARY KEY,
sku TEXT NOT NULL UNIQUE,
price NUMERIC(10, 2) DEFAULT 0
)",
&[],
)
.await
.unwrap();
let schemas = conn.list_schemas().await.unwrap();
assert!(schemas.iter().any(|s| s.name == "public"));
let tables = conn.list_tables("public").await.unwrap();
assert!(tables.iter().any(|t| t.name == "products"));
let schema = conn.describe_table("public", "products").await.unwrap();
assert_eq!(schema.columns.len(), 3);
let id = schema
.columns
.iter()
.find(|c| c.name == "id")
.expect("id column");
assert!(id.primary_key);
}
#[tokio::test]
#[ignore]
async fn describe_table_reports_indexes_and_foreign_keys() {
let harness = Harness::start().await;
let mut conn = harness.connect().await;
conn.execute(
"CREATE TABLE customers (id SERIAL PRIMARY KEY, email TEXT NOT NULL UNIQUE)",
&[],
)
.await
.unwrap();
conn.execute(
"CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
placed_at TIMESTAMP NOT NULL,
CONSTRAINT uniq_orders UNIQUE (customer_id, placed_at)
)",
&[],
)
.await
.unwrap();
conn.execute(
"CREATE INDEX idx_orders_placed_at ON orders(placed_at)",
&[],
)
.await
.unwrap();
let schema = conn.describe_table("public", "orders").await.unwrap();
assert!(
schema
.indexes
.iter()
.any(|i| i.name == "idx_orders_placed_at" && !i.unique && !i.primary)
);
assert!(schema.indexes.iter().any(|i| i.primary));
let fks = &schema.foreign_keys;
assert_eq!(fks.len(), 1);
assert_eq!(fks[0].columns, vec!["customer_id"]);
assert_eq!(fks[0].referenced_table, "customers");
assert_eq!(fks[0].referenced_columns, vec!["id"]);
assert_eq!(
fks[0].on_delete,
Some(narwhal_core::ReferentialAction::Cascade)
);
assert!(
schema
.unique_constraints
.iter()
.any(|u| u.name == "uniq_orders" && u.columns == vec!["customer_id", "placed_at"])
);
}