use tokio_stream::StreamExt;
use databend_driver::{Client, Connection};
use crate::common::{DEFAULT_DSN, INIT_LOG};
async fn prepare(name: &str) -> (Connection, String) {
let dsn = option_env!("TEST_DATABEND_DSN").unwrap_or(DEFAULT_DSN);
let table = format!("{name}_{}", chrono::Utc::now().timestamp());
let client = Client::new(dsn.to_string());
let conn = client.get_conn().await.unwrap();
(conn, table)
}
async fn prepare_data(name: &str) -> (Connection, String) {
let (conn, table) = prepare(name).await;
let sql_create = format!(
"CREATE TABLE `{table}` (
i64 Int64,
u64 UInt64,
f64 Float64,
s String,
s2 String,
d Date,
t DateTime
);"
);
conn.inner().exec(&sql_create).await.unwrap();
let sql_insert = format!(
"INSERT INTO `{table}` VALUES
(-1, 1, 1.0, '1', '1', '2011-03-06', '2011-03-06 06:20:00'),
(-2, 2, 2.0, '2', '2', '2012-05-31', '2012-05-31 11:20:00'),
(-3, 3, 3.0, '3', '2', '2016-04-04', '2016-04-04 11:30:00')"
);
conn.inner().exec(&sql_insert).await.unwrap();
(conn, table)
}
#[tokio::test]
async fn select_iter_tuple() {
let (conn, table) = prepare_data("select_iter_tuple").await;
type RowResult = (
i64,
u64,
f64,
String,
String,
chrono::NaiveDate,
chrono::NaiveDateTime,
);
let expected: Vec<RowResult> = vec![
(
-1,
1,
1.0,
"1".into(),
"1".into(),
chrono::NaiveDate::from_ymd_opt(2011, 3, 6).unwrap(),
chrono::DateTime::parse_from_rfc3339("2011-03-06T06:20:00Z")
.unwrap()
.naive_utc(),
),
(
-2,
2,
2.0,
"2".into(),
"2".into(),
chrono::NaiveDate::from_ymd_opt(2012, 5, 31).unwrap(),
chrono::DateTime::parse_from_rfc3339("2012-05-31T11:20:00Z")
.unwrap()
.naive_utc(),
),
(
-3,
3,
3.0,
"3".into(),
"2".into(),
chrono::NaiveDate::from_ymd_opt(2016, 4, 4).unwrap(),
chrono::DateTime::parse_from_rfc3339("2016-04-04T11:30:00Z")
.unwrap()
.naive_utc(),
),
];
let sql_select = format!("SELECT * FROM `{table}`");
let mut rows = conn.inner().query_iter(&sql_select).await.unwrap();
let mut row_count = 0;
while let Some(row) = rows.next().await {
let v: RowResult = row.unwrap().try_into().unwrap();
assert_eq!(v, expected[row_count]);
row_count += 1;
}
assert_eq!(row_count, 3);
let sql_drop = format!("DROP TABLE `{table}`");
conn.inner().exec(&sql_drop).await.unwrap();
}
#[tokio::test]
async fn select_iter_struct() {
let (conn, table) = prepare_data("select_iter_struct").await;
use databend_driver::TryFromRow;
#[derive(TryFromRow)]
struct RowResult {
i64: i64,
u64: u64,
f64: f64,
s: String,
s2: String,
d: chrono::NaiveDate,
t: chrono::NaiveDateTime,
}
let expected: Vec<RowResult> = vec![
RowResult {
i64: -1,
u64: 1,
f64: 1.0,
s: "1".into(),
s2: "1".into(),
d: chrono::NaiveDate::from_ymd_opt(2011, 3, 6).unwrap(),
t: chrono::DateTime::parse_from_rfc3339("2011-03-06T06:20:00Z")
.unwrap()
.naive_utc(),
},
RowResult {
i64: -2,
u64: 2,
f64: 2.0,
s: "2".into(),
s2: "2".into(),
d: chrono::NaiveDate::from_ymd_opt(2012, 5, 31).unwrap(),
t: chrono::DateTime::parse_from_rfc3339("2012-05-31T11:20:00Z")
.unwrap()
.naive_utc(),
},
RowResult {
i64: -3,
u64: 3,
f64: 3.0,
s: "3".into(),
s2: "2".into(),
d: chrono::NaiveDate::from_ymd_opt(2016, 4, 4).unwrap(),
t: chrono::DateTime::parse_from_rfc3339("2016-04-04T11:30:00Z")
.unwrap()
.naive_utc(),
},
];
let sql_select = format!("SELECT * FROM `{table}`");
let rows = conn.inner().query_iter(&sql_select).await.unwrap();
let results = rows.try_collect::<RowResult>().await.unwrap();
for (idx, v) in results.iter().enumerate() {
let expected_row = &expected[idx];
assert_eq!(v.i64, expected_row.i64);
assert_eq!(v.u64, expected_row.u64);
assert_eq!(v.f64, expected_row.f64);
assert_eq!(v.s, expected_row.s);
assert_eq!(v.s2, expected_row.s2);
assert_eq!(v.d, expected_row.d);
assert_eq!(v.t, expected_row.t);
}
let sql_drop = format!("DROP TABLE `{table}`");
conn.inner().exec(&sql_drop).await.unwrap();
}
#[tokio::test]
async fn select_numbers() {
let (conn, _) = prepare("select_numbers").await;
let rows = conn
.inner()
.query_iter("select * from NUMBERS(5)")
.await
.unwrap();
let ret: Vec<u64> = rows
.map(|r| r.unwrap().try_into().unwrap())
.collect::<Vec<(u64,)>>()
.await
.into_iter()
.map(|r| r.0)
.collect();
assert_eq!(ret, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn select_multi_page() {
*INIT_LOG;
let (conn, _) = prepare("select_multi_page").await;
let n = 46000;
let sql = format!("select * from NUMBERS({n}) order by number");
let rows = conn.inner().query_iter(&sql).await.unwrap();
let ret: Vec<u64> = rows
.map(|r| r.unwrap().try_into().unwrap())
.collect::<Vec<(u64,)>>()
.await
.into_iter()
.map(|r| r.0)
.collect();
assert_eq!(ret, (0..n).collect::<Vec<u64>>());
}
#[tokio::test]
async fn select_sleep() {
let (conn, _) = prepare("select_sleep").await;
let mut rows = conn.inner().query_iter("select SLEEP(2);").await.unwrap();
let mut result = vec![];
while let Some(row) = rows.next().await {
let row: (u8,) = row.unwrap().try_into().unwrap();
result.push(row.0);
}
assert_eq!(result, vec![0]);
}