use chrono::NaiveDateTime;
use sql_middleware::middleware::PgConfig;
use sql_middleware::middleware::{
ConfigAndPool, ConversionMode, MiddlewarePoolConnection, PostgresOptions, QueryAndParams,
RowValues,
};
use sql_middleware::postgres::{
Params as PostgresParams, build_result_set as postgres_build_result_set,
};
#[cfg(feature = "postgres")]
use sql_middleware::typed_postgres::{Idle as PgIdle, PgConnection, PgManager};
use sql_middleware::{SqlMiddlewareDbError, convert_sql_params};
use std::env;
use std::vec;
use tokio::runtime::Runtime;
#[cfg(feature = "postgres")]
fn build_typed_pg_config(cfg: &PgConfig) -> tokio_postgres::Config {
cfg.to_tokio_config()
}
#[allow(clippy::too_many_lines)]
#[test]
fn test2_postgres_cr_and_del_tbls() -> Result<(), Box<dyn std::error::Error>> {
let mut cfg = PgConfig::new();
cfg.dbname = Some("testing".to_string());
cfg.host = Some("10.3.0.201".to_string());
cfg.port = Some(5432);
cfg.user = Some("testuser".to_string());
cfg.password = Some(env::var("TESTING_PG_PASSWORD").unwrap_or_default());
let rt = Runtime::new().unwrap();
rt.block_on(async {
let test_table = "test02_postgres_test";
let test_table_2 = "test02_postgres_test_2";
let stmt = format!(
"CREATE TABLE IF NOT EXISTS -- drop table event cascade
{test_table} (
event_id BIGSERIAL NOT NULL PRIMARY KEY,
espn_id BIGINT NOT NULL,
name TEXT NOT NULL,
ins_ts TIMESTAMP NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS -- drop table event cascade
{test_table_2} (
event_id BIGSERIAL NOT NULL PRIMARY KEY,
espn_id BIGINT NOT NULL,
name TEXT NOT NULL,
ins_ts TIMESTAMP NOT NULL DEFAULT now()
);"
);
#[cfg(feature = "postgres")]
let typed_pg_cfg = build_typed_pg_config(&cfg);
let config_and_pool = ConfigAndPool::new_postgres(PostgresOptions::new(cfg)).await?;
let conn = config_and_pool.get_connection().await?;
let mut pgconn = match conn {
MiddlewarePoolConnection::Postgres { client, .. } => client,
MiddlewarePoolConnection::Sqlite { .. } => {
panic!("Only postgres is supported in this test");
}
MiddlewarePoolConnection::Mssql { .. } => {
panic!("Only postgres is supported in this test");
}
#[cfg(feature = "turso")]
MiddlewarePoolConnection::Turso { .. } => {
panic!("Only postgres is supported in this test");
}
};
({
let tx = pgconn.transaction().await?;
{
tx.batch_execute(&stmt).await?;
};
tx.commit().await?;
Ok::<_, SqlMiddlewareDbError>(())
})?;
let query = format!("DELETE FROM {test_table};");
({
let tx = pgconn.transaction().await?;
{
tx.batch_execute(&query).await?;
};
tx.commit().await?;
Ok::<_, SqlMiddlewareDbError>(())
})?;
let query_and_params = QueryAndParams {
query: format!(
"INSERT INTO {test_table} (espn_id, name, ins_ts) VALUES ($1, $2, $3)"
),
params: vec![
RowValues::Int(123_456),
RowValues::Text("test name".to_string()),
RowValues::Timestamp(NaiveDateTime::parse_from_str(
"2021-08-06 16:00:00",
"%Y-%m-%d %H:%M:%S",
)?),
],
};
let converted_params = convert_sql_params::<PostgresParams>(
&query_and_params.params,
ConversionMode::Execute,
)?;
let tx = pgconn.transaction().await?;
tx.prepare(query_and_params.query.as_str()).await?;
tx.execute(query_and_params.query.as_str(), converted_params.as_refs())
.await?;
tx.commit().await?;
let query = format!("select * FROM {test_table};");
let result = ({
let tx = pgconn.transaction().await?;
let stmt = tx.prepare(&query).await?;
let result_set = { postgres_build_result_set(&stmt, &[], &tx).await? };
tx.commit().await?;
Ok::<_, SqlMiddlewareDbError>(result_set)
})?;
let expected_result = [sql_middleware::test_helpers::create_test_row(
vec![
"event_id".to_string(),
"espn_id".to_string(),
"name".to_string(),
"ins_ts".to_string(),
],
vec![
RowValues::Int(1),
RowValues::Int(123_456),
RowValues::Text("test name".to_string()),
RowValues::Timestamp(
NaiveDateTime::parse_from_str("2021-08-06 16:00:00", "%Y-%m-%d %H:%M:%S")
.unwrap(),
),
],
)];
let cols_to_actually_check = ["espn_id", "name", "ins_ts"];
for (index, row) in result.results.iter().enumerate() {
let left: Vec<RowValues> = row
.column_names
.iter()
.zip(&row.rows) .filter(|(col_name, _)| cols_to_actually_check.contains(&col_name.as_str()))
.map(|(_, value)| value.clone()) .collect();
let right: Vec<RowValues> = expected_result[index]
.column_names
.iter()
.zip(&expected_result[index].rows) .filter(|(col_name, _)| cols_to_actually_check.contains(&col_name.as_str()))
.map(|(_, value)| value.clone()) .collect();
assert_eq!(left, right);
}
#[cfg(feature = "postgres")]
{
let pool = PgManager::new(typed_pg_cfg).build_pool().await?;
let mut typed_conn: PgConnection<PgIdle> = PgConnection::from_pool(&pool).await?;
let typed_truncate = format!("TRUNCATE {test_table_2};");
typed_conn.execute_batch(&typed_truncate).await?;
let typed_ts =
NaiveDateTime::parse_from_str("2021-08-06 16:00:00", "%Y-%m-%d %H:%M:%S")?;
let mut tx = typed_conn.begin().await?;
let inserted = tx
.dml(
&format!(
"INSERT INTO {test_table_2} (espn_id, name, ins_ts) VALUES ($1, $2, $3)"
),
&[
RowValues::Int(123_456),
RowValues::Text("test name".to_string()),
RowValues::Timestamp(typed_ts),
],
)
.await?;
assert_eq!(inserted, 1);
let rs = tx
.select(
&format!(
"SELECT espn_id, name, ins_ts FROM {test_table_2} WHERE espn_id = $1"
),
&[RowValues::Int(123_456)],
)
.await?;
let _ = tx.commit().await?;
assert_eq!(rs.results.len(), 1);
let row = &rs.results[0];
assert_eq!(*row.get("espn_id").unwrap().as_int().unwrap(), 123_456);
assert_eq!(row.get("name").unwrap().as_text().unwrap(), "test name");
assert_eq!(row.get("ins_ts").unwrap().as_timestamp().unwrap(), typed_ts);
}
let query = format!("DROP TABLE {test_table};
DROP TABLE {test_table_2};");
({
let tx = pgconn.transaction().await?;
{
tx.batch_execute(&query).await?;
};
tx.commit().await?;
Ok::<_, SqlMiddlewareDbError>(())
})?;
Ok::<(), Box<dyn std::error::Error>>(())
})?;
Ok(())
}