use arrow_array::builder::Int64Builder;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use rustsim_io::clickhouse::{ClickHouseConfig, ClickHouseError, ClickHouseWriter};
use std::sync::Arc;
use std::time::Duration;
fn make_batch(n: usize) -> RecordBatch {
let mut builder = Int64Builder::new();
for i in 0..n {
builder.append_value(i as i64);
}
let array = builder.finish();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
}
#[test]
fn clickhouse_config_builder_helpers_work() {
let config = ClickHouseConfig::new("http://localhost:8123", "metrics")
.with_max_batch_rows(42)
.with_max_retries(5)
.with_base_retry_delay(Duration::from_millis(25))
.with_max_retry_delay(Duration::from_secs(2));
assert_eq!(config.url, "http://localhost:8123");
assert_eq!(config.table, "metrics");
assert_eq!(config.max_batch_rows, 42);
assert_eq!(config.max_retries, 5);
assert_eq!(config.base_retry_delay, Duration::from_millis(25));
assert_eq!(config.max_retry_delay, Duration::from_secs(2));
}
#[test]
fn clickhouse_writer_sends_and_shuts_down() {
let config = ClickHouseConfig::default();
let writer = ClickHouseWriter::new(config, 16).unwrap();
let initial = writer.metrics();
assert_eq!(initial.outstanding_batches, 0);
assert_eq!(initial.batches_received, 0);
assert_eq!(initial.rows_received, 0);
assert_eq!(initial.batches_sent, 0);
assert_eq!(initial.rows_sent, 0);
assert_eq!(initial.errors, 0);
writer.send(make_batch(10)).unwrap();
writer.send(make_batch(20)).unwrap();
let stats = writer.shutdown();
assert_eq!(stats.batches_received, 2);
assert_eq!(stats.rows_received, 30);
if stats.errors == 0 {
assert_eq!(stats.batches_sent, 2);
assert_eq!(stats.rows_sent, 30);
assert_eq!(stats.batches_failed, 0);
assert_eq!(stats.rows_failed, 0);
} else {
assert!(
stats.errors > 0,
"errors should be reported when ClickHouse is unavailable"
);
assert!(stats.batches_failed > 0);
assert!(stats.rows_failed > 0);
}
assert!(stats.max_outstanding_batches >= 1);
}
#[test]
fn clickhouse_writer_handles_empty_shutdown() {
let config = ClickHouseConfig::default();
let writer = ClickHouseWriter::new(config, 4).unwrap();
let stats = writer.shutdown();
assert_eq!(stats.batches_received, 0);
assert_eq!(stats.rows_received, 0);
assert_eq!(stats.batches_sent, 0);
assert_eq!(stats.rows_sent, 0);
assert_eq!(stats.batches_failed, 0);
assert_eq!(stats.rows_failed, 0);
}
#[test]
fn clickhouse_writer_rejects_invalid_configuration() {
let err = ClickHouseWriter::new(ClickHouseConfig::default(), 0)
.err()
.unwrap();
assert!(matches!(err, ClickHouseError::Config(_)));
let err = ClickHouseWriter::new(
ClickHouseConfig {
max_retries: 0,
..ClickHouseConfig::default()
},
4,
)
.err()
.unwrap();
assert!(matches!(err, ClickHouseError::Config(_)));
}
#[test]
fn clickhouse_writer_pool_drains_batches_across_workers() {
let config = ClickHouseConfig::new("http://127.0.0.1:1", "metrics")
.with_workers(4)
.with_max_retries(1)
.with_timeouts(Duration::from_millis(50), Duration::from_millis(50));
let writer = ClickHouseWriter::new(config, 32).unwrap();
for _ in 0..16 {
writer.send(make_batch(3)).unwrap();
}
let stats = writer.shutdown();
assert_eq!(stats.batches_received, 16);
assert_eq!(stats.rows_received, 48);
assert_eq!(stats.batches_failed + stats.batches_sent, 16);
assert_eq!(stats.rows_failed + stats.rows_sent, 48);
}
#[test]
fn clickhouse_config_with_workers_clamps_zero_to_one() {
let config = ClickHouseConfig::new("http://localhost:8123", "t").with_workers(0);
assert_eq!(config.workers, 1);
}