#![expect(unused_crate_dependencies)]
mod common;
use clickhouse_arrow::prelude::*;
use clickhouse_arrow::test_utils::{ClickHouseContainer, arrow_tests};
use futures_util::StreamExt;
const INSERTS: usize = 3;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::any::Any + Send>> {
common::run_example_with_cleanup(|ch| async move { run(ch).await.unwrap() }, None).await?;
Ok(())
}
#[expect(clippy::disallowed_methods)]
async fn run(ch: &'static ClickHouseContainer) -> Result<()> {
let db = common::DB_NAME;
let native_url = ch.get_native_url();
let arrow_client = arrow_tests::setup_test_arrow_client(native_url, &ch.user, &ch.password)
.with_compression(CompressionMethod::LZ4)
.with_ext(|ext| {
ext.with_chunked_send_mode(ChunkedProtocolMode::ChunkedOptional)
.with_chunked_recv_mode(ChunkedProtocolMode::ChunkedOptional)
})
.build::<ArrowFormat>()
.await?;
arrow_tests::setup_database(db, &arrow_client).await?;
let schema_batch = arrow_tests::create_test_batch(1, false);
let table = arrow_tests::setup_table(&arrow_client, db, &schema_batch.schema()).await?;
let mut insert_tasks = tokio::task::JoinSet::<()>::new();
for _ in 0..INSERTS {
let table = table.clone();
let arrow_client = arrow_client.clone();
drop(insert_tasks.spawn(async move {
let qid = Qid::new();
let batches = (0..50)
.map(|_| arrow_tests::create_test_batch(common::ROWS, false))
.collect::<Vec<_>>();
let mut stream = arrow_client
.insert_many(format!("INSERT INTO {table} FORMAT Native"), batches, Some(qid))
.await
.unwrap();
while let Some(result) = stream.next().await {
result.unwrap();
}
eprintln!("Inserted {} Rows: {qid}", common::ROWS * 50);
}));
}
#[allow(clippy::disallowed_methods)]
while let Some(result) = insert_tasks.join_next().await {
result.unwrap();
}
let mut tasks: Vec<_> = Vec::with_capacity(INSERTS);
for i in 0..3 {
let table = table.clone();
let client = arrow_client.clone();
tasks.push(tokio::spawn(async move {
let query = format!("SELECT * FROM {table}");
let mut stream = client.query(query, None).await.unwrap();
let mut total_rows = 0;
while let Some(maybe_batch) = stream.next().await {
total_rows += maybe_batch.unwrap().num_rows();
}
eprintln!("Task {i}, total rows = {total_rows}");
}));
}
for result in tasks {
result.await.unwrap();
}
Ok(())
}