use clickhouse_arrow::prelude::*;
use clickhouse_arrow::test_utils::ClickHouseContainer;
use clickhouse_arrow::test_utils::arrow_tests::{self, BatchConfig};
use futures_util::stream::{self, StreamExt};
#[allow(dead_code)] pub(crate) fn parse_number(s: &str) -> Option<usize> {
let s = s.trim();
if let Some(base) = s.strip_suffix("MM").or_else(|| s.strip_suffix("mm")) {
base.trim().parse::<usize>().ok().map(|n| n * 1_000_000)
} else if let Some(base) = s.strip_suffix("M").or_else(|| s.strip_suffix("m")) {
base.trim().parse::<usize>().ok().map(|n| n * 1_000)
} else if let Some(base) = s.strip_suffix("K").or_else(|| s.strip_suffix("k")) {
base.trim().parse::<usize>().ok().map(|n| n * 1_000)
} else {
s.parse::<usize>().ok()
}
}
#[allow(dead_code)] pub(crate) fn format_number(n: usize) -> String {
let s = n.to_string();
let mut result = String::new();
let chars: Vec<char> = s.chars().collect();
for (i, ch) in chars.iter().enumerate() {
if i > 0 && (chars.len() - i).is_multiple_of(3) {
result.push(',');
}
result.push(*ch);
}
result
}
#[allow(clippy::cast_precision_loss)]
#[allow(dead_code)] pub(crate) fn calculate_bytes_per_row(config: &BatchConfig) -> f64 {
let test_batch = arrow_tests::create_test_batch_with_config(100_000, config);
test_batch.get_array_memory_size() as f64 / 100_000.0
}
#[allow(dead_code)] pub(crate) async fn setup_benchmark_client(
ch: &ClickHouseContainer,
_workers: usize,
) -> Result<ArrowClient> {
let mut client_builder =
arrow_tests::setup_test_arrow_client(ch.get_native_url(), &ch.user, &ch.password)
.with_ipv4_only(true)
.with_compression(CompressionMethod::None);
let mut options = client_builder.options().clone();
options.ext = options.ext.with_fast_mode_size(16);
client_builder = client_builder.with_options(options);
client_builder.build::<ArrowFormat>().await
}
#[allow(dead_code)] pub(crate) async fn insert_concurrent(
client: ArrowClient,
table: String,
total_rows: usize,
batch_size: usize,
concurrent: usize,
config: &BatchConfig,
) {
let query = format!("INSERT INTO {table} FORMAT NATIVE");
let num_batches = total_rows.div_ceil(batch_size);
let config = *config;
let _results: Vec<_> = stream::iter(0..num_batches)
.map(|batch_idx| {
let q = query.clone();
let c = client.clone();
async move {
let rows_inserted_so_far = batch_idx * batch_size;
let rows_remaining = total_rows.saturating_sub(rows_inserted_so_far);
let this_batch_size = rows_remaining.min(batch_size);
let id_offset = if config.unique_id { Some(rows_inserted_so_far) } else { None };
let batch = arrow_tests::create_test_batch_with_config_offset(
this_batch_size,
&config,
id_offset,
);
let mut stream = c
.insert(q.as_str(), batch, None)
.await
.inspect_err(|e| eprintln!("Insert error on batch {batch_idx}\n{e:?}"))
.unwrap();
while let Some(result) = stream.next().await {
result.unwrap();
}
}
})
.buffer_unordered(concurrent) .collect()
.await;
}
#[allow(unused)]
pub(crate) fn print_schema_config(config: &BatchConfig) {
eprintln!("Schema Configuration:");
eprintln!(" INCLUDE_ID={} (Int64 'id' column for ORDER BY)", config.include_id);
eprintln!(" UNIQUE_ID={} (unique IDs across batches)", config.unique_id);
let mut fields = Vec::new();
if config.int8 > 0 {
fields.push(format!("INT8={}", config.int8));
}
if config.int16 > 0 {
fields.push(format!("INT16={}", config.int16));
}
if config.int32 > 0 {
fields.push(format!("INT32={}", config.int32));
}
if config.int64 > 0 {
fields.push(format!("INT64={}", config.int64));
}
if !fields.is_empty() {
eprintln!(" {}", fields.join(", "));
}
fields.clear();
if config.uint8 > 0 {
fields.push(format!("UINT8={}", config.uint8));
}
if config.uint16 > 0 {
fields.push(format!("UINT16={}", config.uint16));
}
if config.uint32 > 0 {
fields.push(format!("UINT32={}", config.uint32));
}
if config.uint64 > 0 {
fields.push(format!("UINT64={}", config.uint64));
}
if !fields.is_empty() {
eprintln!(" {}", fields.join(", "));
}
fields.clear();
if config.float32 > 0 {
fields.push(format!("FLOAT32={}", config.float32));
}
if config.float64 > 0 {
fields.push(format!("FLOAT64={}", config.float64));
}
if !fields.is_empty() {
eprintln!(" {}", fields.join(", "));
}
if config.bool > 0 {
eprintln!(" BOOL={}", config.bool);
}
if config.utf8 > 0 {
eprintln!(" UTF8={} (len={})", config.utf8, config.utf8_len);
}
if config.binary > 0 {
eprintln!(" BINARY={} (len={})", config.binary, config.binary_len);
}
if config.timestamp > 0 {
eprintln!(" TIMESTAMP={}", config.timestamp);
}
eprintln!(" RAND={} (random vs sequential data)", config.rand);
}
#[allow(unused)]
pub(crate) fn print_params_table(title: &str, params: &[(&str, String)]) {
use comfy_table::presets::UTF8_FULL;
use comfy_table::{Attribute, Cell, Table};
let mut table = Table::new();
let _ = table
.load_preset(UTF8_FULL)
.set_header(vec![Cell::new(title).add_attribute(Attribute::Bold)]);
for (key, value) in params {
let _ = table.add_row(vec![format!("{}: {}", key, value)]);
}
eprintln!("{table}");
}
#[allow(unused)]
pub(crate) fn print_schema_summary(config: &BatchConfig) {
let mut fields = Vec::new();
if config.int8 > 0 {
fields.push(format!("INT8={}", config.int8));
}
if config.int16 > 0 {
fields.push(format!("INT16={}", config.int16));
}
if config.int32 > 0 {
fields.push(format!("INT32={}", config.int32));
}
if config.int64 > 0 {
fields.push(format!("INT64={}", config.int64));
}
if config.uint8 > 0 {
fields.push(format!("UINT8={}", config.uint8));
}
if config.uint16 > 0 {
fields.push(format!("UINT16={}", config.uint16));
}
if config.uint32 > 0 {
fields.push(format!("UINT32={}", config.uint32));
}
if config.uint64 > 0 {
fields.push(format!("UINT64={}", config.uint64));
}
if config.float32 > 0 {
fields.push(format!("FLOAT32={}", config.float32));
}
if config.float64 > 0 {
fields.push(format!("FLOAT64={}", config.float64));
}
if config.bool > 0 {
fields.push(format!("BOOL={}", config.bool));
}
if config.utf8 > 0 {
fields.push(format!("UTF8={}", config.utf8));
}
if config.binary > 0 {
fields.push(format!("BINARY={}", config.binary));
}
if config.timestamp > 0 {
fields.push(format!("TIMESTAMP={}", config.timestamp));
}
fields.push(format!("INCLUDE_ID={}", config.include_id));
fields.push(format!("UNIQUE_ID={}", config.unique_id));
fields.push(format!("RAND={}", config.rand));
eprintln!(" {}", fields.join(", "));
}