#![allow(
clippy::panic,
clippy::panic_in_result_fn,
clippy::shadow_unrelated,
clippy::unwrap_used
)]
use std::env;
use zero_postgres::Error;
use zero_postgres::handler::DropHandler;
use zero_postgres::sync::Conn;
fn get_conn() -> Result<Conn, Error> {
let mut db_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "postgres://localhost/postgres".to_string());
if !db_url.contains("sslmode=") {
if db_url.contains('?') {
db_url.push_str("&sslmode=disable");
} else {
db_url.push_str("?sslmode=disable");
}
}
Conn::new(db_url.as_str())
}
fn portal_exists(conn: &mut Conn, portal: &str) -> bool {
let mut handler = DropHandler::new();
match conn.lowlevel_execute(portal, 0, &mut handler) {
Ok(_) => true,
Err(e) => {
if e.sqlstate() == Some("34000") {
false
} else {
panic!("Unexpected error while checking portal existence: {e}");
}
}
}
}
#[test]
fn en0a_named_portal_implicit_tx_sync() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal does not exist in tx"
);
conn.lowlevel_sync()?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after SYNC"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en0b_named_portal_implicit_tx_flush() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
conn.lowlevel_flush()?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal gone after FLUSH (should still exist)"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en1_named_portal_before_begin_commit() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
conn.query_drop("BEGIN")?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal does not exist in tx"
);
conn.query_drop("COMMIT")?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after COMMIT"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en2_named_portal_before_begin_rollback() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
conn.query_drop("BEGIN")?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal does not exist in tx"
);
conn.query_drop("ROLLBACK")?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after ROLLBACK"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en3_named_portal_before_begin_error() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
conn.query_drop("BEGIN")?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal does not exist in tx"
);
let _ = conn.query_drop("SELECT 1/0");
let mut handler = DropHandler::new();
let result = conn.lowlevel_execute("portal1", 0, &mut handler);
assert!(result.is_err(), "Portal usable in aborted tx (should fail)");
let _ = conn.lowlevel_sync();
conn.query_drop("ROLLBACK")?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after ERROR + ROLLBACK"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en4_named_portal_within_tx_commit() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("BEGIN")?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
conn.query_drop("COMMIT")?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after explicit COMMIT"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en5_named_portal_within_tx_rollback() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("BEGIN")?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
conn.query_drop("ROLLBACK")?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after explicit ROLLBACK"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn en6_named_portal_within_tx_error() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("BEGIN")?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt1.wire_name(), ())?;
let _ = conn.query_drop("SELECT 1/0");
let mut handler = DropHandler::new();
let result = conn.lowlevel_execute("portal1", 0, &mut handler);
assert!(result.is_err(), "Portal usable in aborted tx (should fail)");
let _ = conn.lowlevel_sync();
conn.query_drop("ROLLBACK")?;
assert!(
!portal_exists(&mut conn, "portal1"),
"Portal still exists after ROLLBACK from error"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn eu1_unnamed_portal_implicit_tx_sync() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("", &stmt1.wire_name(), ())?;
conn.lowlevel_sync()?;
assert!(
!portal_exists(&mut conn, ""),
"Unnamed portal still exists after SYNC"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn eu3_unnamed_portal_implicit_tx_flush() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("", &stmt1.wire_name(), ())?;
conn.lowlevel_flush()?;
assert!(
portal_exists(&mut conn, ""),
"Unnamed portal gone after FLUSH (should still exist)"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn eu2_unnamed_portal_implicit_tx_error() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("DROP TABLE IF EXISTS _err_test")?;
conn.query_drop("CREATE TEMP TABLE _err_test (n int)")?;
conn.query_drop("INSERT INTO _err_test VALUES (0)")?;
let stmt1 = conn.prepare("SELECT 1")?;
let err_stmt = conn.prepare("SELECT 1/n FROM _err_test")?;
conn.lowlevel_bind("", &stmt1.wire_name(), ())?;
conn.lowlevel_bind("err_portal", &err_stmt.wire_name(), ())?;
let mut handler = DropHandler::new();
let _ = conn.lowlevel_execute("err_portal", 0, &mut handler);
let _ = conn.lowlevel_sync();
assert!(
!portal_exists(&mut conn, ""),
"Unnamed portal still exists after ERROR"
);
let _ = conn.lowlevel_sync();
Ok(())
}
#[test]
fn eu4_unnamed_portal_replaced_by_new_bind() -> Result<(), Error> {
let mut conn = get_conn()?;
let stmt1 = conn.prepare("SELECT 1 as a")?;
let stmt2 = conn.prepare("SELECT 2 as b")?;
conn.lowlevel_bind("", &stmt1.wire_name(), ())?;
conn.lowlevel_bind("", &stmt2.wire_name(), ())?;
let rows: Vec<(i32,)> = {
let mut handler = zero_postgres::handler::CollectHandler::new();
conn.lowlevel_execute("", 0, &mut handler)?;
handler.into_rows()
};
conn.lowlevel_sync()?;
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].0, 2, "Expected result from stmt2, got {:?}", rows);
Ok(())
}
#[test]
fn p1_insert_commit_persists() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("DROP TABLE IF EXISTS portal_test_p1")?;
conn.query_drop("CREATE TABLE portal_test_p1 (id int)")?;
conn.query_drop("BEGIN")?;
let ins = conn.prepare("INSERT INTO portal_test_p1 VALUES (1)")?;
conn.lowlevel_bind("", &ins.wire_name(), ())?;
let mut handler = DropHandler::new();
conn.lowlevel_execute("", 0, &mut handler)?;
conn.lowlevel_sync()?;
conn.query_drop("COMMIT")?;
let count: Vec<(i64,)> = conn.query_collect("SELECT count(*) FROM portal_test_p1")?;
conn.query_drop("DROP TABLE portal_test_p1")?;
assert_eq!(count[0].0, 1, "Expected 1 row after COMMIT");
Ok(())
}
#[test]
fn p2_insert_rollback_gone() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("DROP TABLE IF EXISTS portal_test_p2")?;
conn.query_drop("CREATE TABLE portal_test_p2 (id int)")?;
conn.query_drop("BEGIN")?;
let ins = conn.prepare("INSERT INTO portal_test_p2 VALUES (1)")?;
conn.lowlevel_bind("", &ins.wire_name(), ())?;
let mut handler = DropHandler::new();
conn.lowlevel_execute("", 0, &mut handler)?;
conn.lowlevel_sync()?;
conn.query_drop("ROLLBACK")?;
let count: Vec<(i64,)> = conn.query_collect("SELECT count(*) FROM portal_test_p2")?;
conn.query_drop("DROP TABLE portal_test_p2")?;
assert_eq!(count[0].0, 0, "Expected 0 rows after ROLLBACK");
Ok(())
}
#[test]
fn p3_insert_error_rollback_gone() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("DROP TABLE IF EXISTS portal_test_p3")?;
conn.query_drop("CREATE TABLE portal_test_p3 (id int)")?;
conn.query_drop("BEGIN")?;
let ins_p3 = conn.prepare("INSERT INTO portal_test_p3 VALUES (1)")?;
conn.lowlevel_bind("", &ins_p3.wire_name(), ())?;
let mut handler = DropHandler::new();
conn.lowlevel_execute("", 0, &mut handler)?;
conn.lowlevel_sync()?;
let err = conn.query_drop("SELECT 1/0");
assert!(err.is_err(), "Expected error from SELECT 1/0");
conn.query_drop("ROLLBACK")?;
let mut conn2 = get_conn()?;
let count: Vec<(i64,)> = conn2.query_collect("SELECT count(*) FROM portal_test_p3")?;
conn2.query_drop("DROP TABLE portal_test_p3")?;
assert_eq!(count[0].0, 0, "Expected 0 rows after ERROR + ROLLBACK");
Ok(())
}
#[test]
fn p4_named_portal_within_tx_rollback_then_insert() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("DROP TABLE IF EXISTS portal_test_p4")?;
conn.query_drop("CREATE TABLE portal_test_p4 (id int)")?;
conn.query_drop("BEGIN")?;
let ins_p4 = conn.prepare("INSERT INTO portal_test_p4 VALUES (1)")?;
conn.lowlevel_bind("portal_ins", &ins_p4.wire_name(), ())?;
conn.lowlevel_sync()?;
conn.query_drop("ROLLBACK")?;
let mut handler = DropHandler::new();
let result = conn.lowlevel_execute("portal_ins", 0, &mut handler);
assert!(
result.is_err(),
"Execute should fail on non-existent portal"
);
let err = result.unwrap_err();
assert_eq!(
err.sqlstate(),
Some("34000"),
"Expected SQLSTATE 34000 (invalid_cursor_name)"
);
let _ = conn.lowlevel_sync();
let count: Vec<(i64,)> = conn.query_collect("SELECT count(*) FROM portal_test_p4")?;
conn.query_drop("DROP TABLE portal_test_p4")?;
assert_eq!(
count[0].0, 0,
"Expected 0 rows - portal was destroyed by ROLLBACK"
);
Ok(())
}
#[test]
fn sync_inside_explicit_tx_preserves_portal() -> Result<(), Error> {
let mut conn = get_conn()?;
conn.query_drop("BEGIN")?;
let stmt = conn.prepare("SELECT 1")?;
conn.lowlevel_bind("portal1", &stmt.wire_name(), ())?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal should exist after bind"
);
conn.lowlevel_sync()?;
assert!(
portal_exists(&mut conn, "portal1"),
"Portal was destroyed by SYNC inside explicit transaction!"
);
conn.query_drop("ROLLBACK")?;
Ok(())
}