rustsim-io 0.0.1

Arrow batch builders, CSV bridge, and ClickHouse writer for rustsim
Documentation
use arrow_array::builder::Int64Builder;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use rustsim_io::clickhouse::{ClickHouseConfig, ClickHouseError, ClickHouseWriter};
use std::sync::Arc;
use std::time::Duration;

fn make_batch(n: usize) -> RecordBatch {
    let mut builder = Int64Builder::new();
    for i in 0..n {
        builder.append_value(i as i64);
    }
    let array = builder.finish();
    let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
    RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
}

#[test]
fn clickhouse_config_builder_helpers_work() {
    let config = ClickHouseConfig::new("http://localhost:8123", "metrics")
        .with_max_batch_rows(42)
        .with_max_retries(5)
        .with_base_retry_delay(Duration::from_millis(25))
        .with_max_retry_delay(Duration::from_secs(2));

    assert_eq!(config.url, "http://localhost:8123");
    assert_eq!(config.table, "metrics");
    assert_eq!(config.max_batch_rows, 42);
    assert_eq!(config.max_retries, 5);
    assert_eq!(config.base_retry_delay, Duration::from_millis(25));
    assert_eq!(config.max_retry_delay, Duration::from_secs(2));
}

#[test]
fn clickhouse_writer_sends_and_shuts_down() {
    let config = ClickHouseConfig::default();
    let writer = ClickHouseWriter::new(config, 16).unwrap();

    let initial = writer.metrics();
    assert_eq!(initial.outstanding_batches, 0);
    assert_eq!(initial.batches_received, 0);
    assert_eq!(initial.rows_received, 0);
    assert_eq!(initial.batches_sent, 0);
    assert_eq!(initial.rows_sent, 0);
    assert_eq!(initial.errors, 0);

    writer.send(make_batch(10)).unwrap();
    writer.send(make_batch(20)).unwrap();

    let stats = writer.shutdown();
    assert_eq!(stats.batches_received, 2);
    assert_eq!(stats.rows_received, 30);

    // When ClickHouse is running locally, batches_sent == 2 and errors == 0.
    // When it is not (typical CI), all sends fail and errors > 0.
    // Either way, the writer must not panic and must drain cleanly.
    if stats.errors == 0 {
        assert_eq!(stats.batches_sent, 2);
        assert_eq!(stats.rows_sent, 30);
        assert_eq!(stats.batches_failed, 0);
        assert_eq!(stats.rows_failed, 0);
    } else {
        assert!(
            stats.errors > 0,
            "errors should be reported when ClickHouse is unavailable"
        );
        assert!(stats.batches_failed > 0);
        assert!(stats.rows_failed > 0);
    }
    assert!(stats.max_outstanding_batches >= 1);
}

#[test]
fn clickhouse_writer_handles_empty_shutdown() {
    let config = ClickHouseConfig::default();
    let writer = ClickHouseWriter::new(config, 4).unwrap();
    let stats = writer.shutdown();
    assert_eq!(stats.batches_received, 0);
    assert_eq!(stats.rows_received, 0);
    assert_eq!(stats.batches_sent, 0);
    assert_eq!(stats.rows_sent, 0);
    assert_eq!(stats.batches_failed, 0);
    assert_eq!(stats.rows_failed, 0);
}

#[test]
fn clickhouse_writer_rejects_invalid_configuration() {
    let err = ClickHouseWriter::new(ClickHouseConfig::default(), 0)
        .err()
        .unwrap();
    assert!(matches!(err, ClickHouseError::Config(_)));

    let err = ClickHouseWriter::new(
        ClickHouseConfig {
            max_retries: 0,
            ..ClickHouseConfig::default()
        },
        4,
    )
    .err()
    .unwrap();
    assert!(matches!(err, ClickHouseError::Config(_)));
}

#[test]
fn clickhouse_writer_pool_drains_batches_across_workers() {
    // Point at an address that is guaranteed to fail fast so that we only
    // exercise channel/shutdown accounting, not real ClickHouse delivery.
    let config = ClickHouseConfig::new("http://127.0.0.1:1", "metrics")
        .with_workers(4)
        .with_max_retries(1)
        .with_timeouts(Duration::from_millis(50), Duration::from_millis(50));
    let writer = ClickHouseWriter::new(config, 32).unwrap();

    for _ in 0..16 {
        writer.send(make_batch(3)).unwrap();
    }

    let stats = writer.shutdown();
    assert_eq!(stats.batches_received, 16);
    assert_eq!(stats.rows_received, 48);
    // Every batch is dropped (no server listening) but the pool must still
    // drain cleanly and sum stats across the 4 workers.
    assert_eq!(stats.batches_failed + stats.batches_sent, 16);
    assert_eq!(stats.rows_failed + stats.rows_sent, 48);
}

#[test]
fn clickhouse_config_with_workers_clamps_zero_to_one() {
    let config = ClickHouseConfig::new("http://localhost:8123", "t").with_workers(0);
    assert_eq!(config.workers, 1);
}