use clickhouse::{Client, Compression};
#[tokio::test]
async fn wait_end_of_query() {
let client = prepare_database!();
let scenarios = vec![
(false, 3), (true, 0), ];
for (wait_end_of_query, expected_rows) in scenarios {
let result = max_execution_time(client.clone(), wait_end_of_query).await;
assert_eq!(
result, expected_rows,
"wait_end_of_query: {wait_end_of_query}, expected_rows: {expected_rows}"
);
}
}
async fn max_execution_time(mut client: Client, wait_end_of_query: bool) -> u8 {
if wait_end_of_query {
client = client.with_setting("wait_end_of_query", "1")
}
let mut cursor = client
.with_compression(Compression::None)
.with_setting("max_execution_time", "0.1")
.with_setting("max_block_size", "1")
.query("SELECT sleepEachRow(0.03) AS s FROM system.numbers LIMIT 5")
.fetch::<u8>()
.unwrap();
let mut i = 0;
let err = loop {
match cursor.next().await {
Ok(Some(_)) => i += 1,
Ok(None) => panic!("DB exception hasn't been found"),
Err(err) => break err,
}
};
let err_s = err.to_string();
assert!(
err_s.contains("TIMEOUT_EXCEEDED"),
"expected TIMEOUT_EXCEEDED in error string, got {err_s:?}; original: {err:?}"
);
i
}
#[cfg(feature = "lz4")]
#[tokio::test]
async fn deferred_lz4() {
let client = prepare_database!().with_compression(Compression::Lz4);
client
.query("CREATE TABLE test(no UInt32) ENGINE = MergeTree ORDER BY no")
.execute()
.await
.unwrap();
#[derive(serde::Serialize, clickhouse::Row)]
struct Row {
no: u32,
}
let part_count = 100;
let part_size = 100_000;
for i in 0..part_count {
let mut insert = client.insert::<Row>("test").await.unwrap();
for j in 0..part_size {
let row = Row {
no: i * part_size + j,
};
insert.write(&row).await.unwrap();
}
insert.end().await.unwrap();
}
let mut cursor = client
.with_setting("max_execution_time", "0.1")
.query("SELECT no FROM test")
.fetch::<u32>()
.unwrap();
let mut i = 0;
let err = loop {
match cursor.next().await {
Ok(Some(_)) => i += 1,
Ok(None) => panic!("DB exception hasn't been found"),
Err(err) => break err,
}
};
assert_ne!(i, 0);
let err_s = err.to_string();
assert!(
err_s.contains("TIMEOUT_EXCEEDED"),
"expected TIMEOUT_EXCEEDED in error string, got {err_s:?}; original: {err:?}"
);
}