xitca-postgres 0.4.0

an async postgres client
Documentation
use core::future::IntoFuture;

use xitca_postgres::{
    Client, ExecuteBlocking, Postgres,
    error::{DbError, RuntimeError, SqlState},
    statement::Statement,
    types::Type,
};

fn connect() -> Client {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    let (cli, drv) = rt
        .block_on(Postgres::new("postgres://postgres:postgres@localhost:5432").connect())
        .unwrap();
    std::thread::spawn(move || rt.block_on(drv.into_future()));
    cli
}

#[test]
fn query_unnamed() {
    let cli = connect();
    "CREATE TEMPORARY TABLE foo (name TEXT, age INT);"
        .execute_blocking(&cli)
        .unwrap();

    let mut stream = Statement::named(
        "INSERT INTO foo (name, age) VALUES ($1, $2), ($3, $4), ($5, $6) returning name, age",
        &[Type::TEXT, Type::INT4, Type::TEXT, Type::INT4, Type::TEXT, Type::INT4],
    )
    .bind_dyn(&[&"alice", &20i32, &"bob", &30i32, &"carol", &40i32])
    .query_blocking(&cli)
    .unwrap()
    .into_iter();

    let row = stream.next().unwrap().unwrap();
    assert_eq!(row.get::<&str>("name"), "alice");
    assert_eq!(row.get::<i32>(1), 20);

    let row = stream.next().unwrap().unwrap();
    assert_eq!(row.get::<&str>(0), "bob");
    assert_eq!(row.get::<i32>("age"), 30);

    let row = stream.next().unwrap().unwrap();
    assert_eq!(row.get::<&str>("name"), "carol");
    assert_eq!(row.get::<i32>("age"), 40);

    assert!(stream.next().is_none());
}

#[test]
fn cancel_query_blocking() {
    let cli = connect();

    let cancel_token = cli.cancel_token();

    let sleep = std::thread::spawn(move || "SELECT pg_sleep(10)".execute_blocking(&cli));

    std::thread::sleep(std::time::Duration::from_secs(3));

    cancel_token.query_cancel_blocking().unwrap();

    let e = sleep.join().unwrap().unwrap_err();

    let e = e.downcast_ref::<DbError>().unwrap();
    assert_eq!(e.code(), &SqlState::QUERY_CANCELED);
}

// this test will cause deadlock if run with single threaded tokio runtime
#[tokio::test(flavor = "multi_thread")]
async fn cancel_query_blocking_in_tokio() {
    let (cli, drv) = Postgres::new("postgres://postgres:postgres@localhost:5432")
        .connect()
        .await
        .unwrap();
    tokio::spawn(drv.into_future());

    let cancel_token = cli.cancel_token();

    std::thread::spawn(move || "SELECT pg_sleep(10)".execute_blocking(&cli));

    tokio::time::sleep(std::time::Duration::from_secs(3)).await;

    let e = cancel_token.query_cancel_blocking().unwrap_err();

    let e = e.downcast_ref::<RuntimeError>().unwrap();
    assert_eq!(e, &RuntimeError::RequireNoTokio);
}