rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
use arrow_schema::{DataType, Field};
use rustsim::prelude::*;

#[test]
fn telemetry_pipeline_end_to_end() {
    let schema = schema_from_fields(vec![
        Field::new("step", DataType::Int64, false),
        Field::new("id", DataType::Int64, false),
        Field::new("value", DataType::Float64, false),
    ]);

    let config = ClickHouseConfig::default();
    let mut pipeline = TelemetryPipeline::new(schema, 5, config, 8).unwrap();

    // Push 12 rows. With batch_size=5, the first 10 rows produce 2 auto-flushed
    // batches. Those are drained to the writer channel. The remaining 2 rows
    // stay in the builder until shutdown.
    //
    // push_row may return Err when the writer channel reports a send failure
    // (e.g. ClickHouse is unreachable). We tolerate errors here because the
    // test validates pipeline plumbing, not ClickHouse connectivity.
    for step in 0..12 {
        let _ = pipeline.push_row(&[
            ArrowValue::Int64(step),
            ArrowValue::Int64(1),
            ArrowValue::Float64(step as f64 * 0.1),
        ]);
    }

    let stats = pipeline.shutdown().unwrap();
    assert_eq!(stats.rows_pushed, 12);
    assert_eq!(stats.rows_enqueued + stats.rows_unsubmitted, 12);
    assert_eq!(stats.writer.rows_received + stats.rows_unsubmitted, 12);

    // When ClickHouse is running locally, all rows are delivered.
    // When it is not (typical CI), rows may be dropped after retries.
    // Either way, the pipeline must not panic and accounting must remain explicit.
    assert_eq!(
        stats.writer.rows_sent + stats.rows_dropped(),
        stats.rows_pushed,
        "telemetry accounting should balance pushed vs sent+dropped rows"
    );
}

/// Verifies that the Arrow bridge correctly batches rows without requiring
/// a ClickHouse connection.
#[test]
fn arrow_bridge_batching() {
    let schema = schema_from_fields(vec![
        Field::new("step", DataType::Int64, false),
        Field::new("value", DataType::Float64, false),
    ]);

    let mut bridge = CollectArrowBridge::new(schema, 5).unwrap();

    for step in 0..12i64 {
        bridge
            .push_row(&[
                ArrowValue::Int64(step),
                ArrowValue::Float64(step as f64 * 0.1),
            ])
            .unwrap();
    }

    let batches = bridge.take_batches().unwrap();
    // 12 rows / batch_size 5 = 2 full + 1 partial = 3 batches
    assert_eq!(batches.len(), 3);
    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
    assert_eq!(total_rows, 12);
}

#[test]
fn compute_backend_detection() {
    let backend = detect_backend();
    // Should return either Cpu or Cuda without panicking
    assert!(backend == ComputeBackend::Cpu || backend == ComputeBackend::Cuda);
}