mod common;
use common::{test_hyper_params, test_result_path};
use futures::{StreamExt, TryStreamExt};
use hyperdb_api::{AsyncConnection, CreateMode, FromRow, HyperProcess, Result};
async fn fresh_async_conn(name: &str) -> Result<(HyperProcess, AsyncConnection)> {
let db_path = test_result_path(name, "hyper")?;
let params = test_hyper_params(name)?;
let hyper = HyperProcess::new(None, Some(¶ms))?;
let endpoint = hyper.require_endpoint()?.to_string();
let conn = AsyncConnection::connect(
&endpoint,
db_path.to_str().expect("path"),
CreateMode::CreateAndReplace,
)
.await?;
Ok((hyper, conn))
}
#[tokio::test(flavor = "current_thread")]
async fn execute_query_streaming_chunks() {
let (_hyper, conn) = fresh_async_conn("async_exec_query_chunks").await.unwrap();
conn.execute_command("CREATE TABLE t (v INT NOT NULL)")
.await
.unwrap();
for i in 1..=8 {
conn.execute_command(&format!("INSERT INTO t VALUES ({i})"))
.await
.unwrap();
}
let mut rs = conn
.execute_query("SELECT v FROM t ORDER BY v")
.await
.unwrap();
let mut total = 0;
while let Some(chunk) = rs.next_chunk().await.unwrap() {
total += chunk.len();
}
assert_eq!(total, 8);
drop(rs);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn fetch_family_roundtrip() {
let (_hyper, conn) = fresh_async_conn("async_fetch_family").await.unwrap();
conn.execute_command("CREATE TABLE t (id INT NOT NULL, name TEXT)")
.await
.unwrap();
conn.execute_command("INSERT INTO t VALUES (1, 'alice'), (2, 'bob'), (3, NULL)")
.await
.unwrap();
let row = conn
.fetch_one("SELECT id, name FROM t WHERE id = 1")
.await
.unwrap();
assert_eq!(row.get::<i32>(0), Some(1));
assert_eq!(row.get::<String>(1), Some("alice".to_string()));
let row = conn
.fetch_optional("SELECT id FROM t WHERE id = 2")
.await
.unwrap();
assert!(row.is_some());
let row = conn
.fetch_optional("SELECT id FROM t WHERE id = 999")
.await
.unwrap();
assert!(row.is_none());
let rows = conn
.fetch_all("SELECT id FROM t ORDER BY id")
.await
.unwrap();
assert_eq!(rows.len(), 3);
let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM t").await.unwrap();
assert_eq!(count, 3);
let name: Option<String> = conn
.fetch_optional_scalar("SELECT name FROM t WHERE id = 1")
.await
.unwrap();
assert_eq!(name, Some("alice".to_string()));
let n = conn
.query_count("SELECT COUNT(*) FROM t WHERE name IS NOT NULL")
.await
.unwrap();
assert_eq!(n, 2);
conn.close().await.unwrap();
}
#[derive(Debug, PartialEq)]
struct User {
id: i32,
name: Option<String>,
}
impl FromRow for User {
fn from_row(row: hyperdb_api::RowAccessor<'_>) -> Result<Self> {
Ok(User {
id: row.get("id")?,
name: row.get_opt("name")?,
})
}
}
#[tokio::test(flavor = "current_thread")]
async fn fetch_as_struct_mapping() {
let (_hyper, conn) = fresh_async_conn("async_fetch_as").await.unwrap();
conn.execute_command("CREATE TABLE users (id INT NOT NULL, name TEXT)")
.await
.unwrap();
conn.execute_command("INSERT INTO users VALUES (1, 'alice'), (2, 'bob')")
.await
.unwrap();
let user: User = conn
.fetch_one_as("SELECT id, name FROM users WHERE id = 1")
.await
.unwrap();
assert_eq!(
user,
User {
id: 1,
name: Some("alice".to_string())
}
);
let users: Vec<User> = conn
.fetch_all_as("SELECT id, name FROM users ORDER BY id")
.await
.unwrap();
assert_eq!(users.len(), 2);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn query_and_command_params() {
let (_hyper, conn) = fresh_async_conn("async_params").await.unwrap();
conn.execute_command("CREATE TABLE orders (id INT NOT NULL, total DOUBLE PRECISION)")
.await
.unwrap();
let n = conn
.command_params("INSERT INTO orders VALUES ($1, $2)", &[&1i32, &99.5_f64])
.await
.unwrap();
assert_eq!(n, 1);
conn.command_params("INSERT INTO orders VALUES ($1, $2)", &[&2i32, &200.0_f64])
.await
.unwrap();
let rs = conn
.query_params(
"SELECT id FROM orders WHERE total > $1 ORDER BY id",
&[&100.0_f64],
)
.await
.unwrap();
let rows = rs.collect_rows().await.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].get::<i32>(0), Some(2));
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn has_table_and_schema() {
let (_hyper, conn) = fresh_async_conn("async_catalog").await.unwrap();
assert!(!conn.has_table("nope").await.unwrap());
conn.execute_command("CREATE TABLE kept (id INT)")
.await
.unwrap();
assert!(conn.has_table("kept").await.unwrap());
assert!(conn.has_schema("public").await.unwrap());
assert!(!conn.has_schema("nonexistent_schema").await.unwrap());
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn ping_and_version() {
let (_hyper, conn) = fresh_async_conn("async_ping").await.unwrap();
conn.ping().await.unwrap();
assert!(conn.is_alive());
let _ = conn.server_version().await;
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn execute_batch_runs_all_statements() {
let (_hyper, conn) = fresh_async_conn("async_batch").await.unwrap();
let total = conn
.execute_batch(&[
"CREATE TABLE b (id INT NOT NULL)",
"INSERT INTO b VALUES (1)",
"INSERT INTO b VALUES (2)",
])
.await
.unwrap();
assert!(total >= 2);
let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM b").await.unwrap();
assert_eq!(count, 2);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn transaction_with_commit() {
let (_hyper, mut conn) = fresh_async_conn("async_tx_commit").await.unwrap();
conn.execute_command("CREATE TABLE t (v INT NOT NULL)")
.await
.unwrap();
{
let txn = conn.transaction().await.unwrap();
txn.execute_command("INSERT INTO t VALUES (1)")
.await
.unwrap();
txn.execute_command("INSERT INTO t VALUES (2)")
.await
.unwrap();
txn.commit().await.unwrap();
}
let count: i64 = conn.fetch_scalar("SELECT COUNT(*) FROM t").await.unwrap();
assert_eq!(count, 2);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn stream_as_happy_path() {
let (_hyper, conn) = fresh_async_conn("async_stream_as_happy").await.unwrap();
conn.execute_command("CREATE TABLE users (id INT NOT NULL, name TEXT)")
.await
.unwrap();
conn.execute_command("INSERT INTO users VALUES (1, 'alice'), (2, 'bob'), (3, NULL)")
.await
.unwrap();
let users = {
let stream = conn.stream_as::<User>("SELECT id, name FROM users ORDER BY id");
tokio::pin!(stream);
stream.try_collect::<Vec<User>>().await.unwrap()
};
assert_eq!(users.len(), 3);
assert_eq!(
users[0],
User {
id: 1,
name: Some("alice".to_string())
}
);
assert_eq!(
users[1],
User {
id: 2,
name: Some("bob".to_string())
}
);
assert_eq!(users[2], User { id: 3, name: None });
let fetch_all: Vec<User> = conn
.fetch_all_as("SELECT id, name FROM users ORDER BY id")
.await
.unwrap();
assert_eq!(users, fetch_all);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn stream_as_multi_chunk() {
let (_hyper, conn) = fresh_async_conn("async_stream_as_multi_chunk")
.await
.unwrap();
conn.execute_command("CREATE TABLE big (id INT NOT NULL, name TEXT)")
.await
.unwrap();
const ROWS: i32 = 140_000;
conn.execute_command(&format!(
"INSERT INTO big SELECT id, 'user_' || id::TEXT FROM GENERATE_SERIES(1, {ROWS}) AS id",
))
.await
.unwrap();
let users = {
let stream = conn.stream_as::<User>("SELECT id, name FROM big ORDER BY id");
tokio::pin!(stream);
stream.try_collect::<Vec<User>>().await.unwrap()
};
let last = usize::try_from(ROWS).expect("row count fits usize") - 1;
assert_eq!(users.len(), ROWS as usize);
assert_eq!(
users[0],
User {
id: 1,
name: Some("user_1".to_string())
}
);
assert_eq!(
users[last],
User {
id: ROWS,
name: Some(format!("user_{ROWS}"))
}
);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn stream_as_submit_error() {
let (_hyper, conn) = fresh_async_conn("async_stream_as_submit_error")
.await
.unwrap();
{
let stream = conn.stream_as::<User>("SELECT id, name FROM nonexistent_table");
tokio::pin!(stream);
let first = stream.next().await;
assert!(first.is_some());
assert!(first.unwrap().is_err());
}
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn stream_as_empty() {
let (_hyper, conn) = fresh_async_conn("async_stream_as_empty").await.unwrap();
conn.execute_command("CREATE TABLE empty_users (id INT NOT NULL, name TEXT)")
.await
.unwrap();
let users = {
let stream = conn.stream_as::<User>("SELECT id, name FROM empty_users WHERE 1=0");
tokio::pin!(stream);
stream.try_collect::<Vec<User>>().await.unwrap()
};
assert_eq!(users.len(), 0);
conn.close().await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn stream_as_lenient_extra_column() {
let (_hyper, conn) = fresh_async_conn("async_stream_as_lenient").await.unwrap();
conn.execute_command("CREATE TABLE users_extra (id INT NOT NULL, name TEXT, extra TEXT)")
.await
.unwrap();
conn.execute_command("INSERT INTO users_extra VALUES (1, 'alice', 'data')")
.await
.unwrap();
let users = {
let stream = conn.stream_as::<User>("SELECT * FROM users_extra");
tokio::pin!(stream);
stream.try_collect::<Vec<User>>().await.unwrap()
};
assert_eq!(users.len(), 1);
assert_eq!(
users[0],
User {
id: 1,
name: Some("alice".to_string())
}
);
conn.close().await.unwrap();
}