rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
//! Headless telemetry example.
//!
//! Demonstrates running a simple simulation with no rendering, pushing
//! per-step rows through [`TelemetryPipeline`] to a ClickHouse endpoint.
//!
//! ## Running
//!
//! By default this example exits immediately without connecting to
//! ClickHouse — the pipeline is constructed only if the
//! `CLICKHOUSE_URL` environment variable is set. This keeps the example
//! runnable in CI without external dependencies.
//!
//! To run against a local ClickHouse:
//!
//! ```sh
//! docker run -d --rm --name ch -p 8123:8123 clickhouse/clickhouse-server
//! # create the destination table:
//! curl -X POST 'http://localhost:8123/?query=CREATE+TABLE+demo+(step+Int64,+agent_id+Int64,+x+Float64,+v+Float64)+ENGINE=Memory'
//! CLICKHOUSE_URL=http://localhost:8123 \
//!     cargo run --example headless_telemetry
//! ```
//!
//! With TLS + auth + compression:
//!
//! ```sh
//! CLICKHOUSE_URL=https://ch.example.com:8443 \
//! CLICKHOUSE_USER=writer CLICKHOUSE_PASSWORD=secret \
//! cargo run --example headless_telemetry --features clickhouse-gzip
//! ```

use std::sync::Arc;
use std::time::Duration;

use arrow_schema::{DataType, Field, Schema};
use rustsim::telemetry::{TelemetryError, TelemetryPipeline};
use rustsim_io::arrow::ArrowValue;
use rustsim_io::clickhouse::ClickHouseConfig;

fn main() -> Result<(), TelemetryError> {
    // --- Skip quickly when no endpoint is configured ---------------------
    let Ok(url) = std::env::var("CLICKHOUSE_URL") else {
        eprintln!(
            "CLICKHOUSE_URL not set; skipping. \
             Set CLICKHOUSE_URL=http://localhost:8123 to run end-to-end."
        );
        return Ok(());
    };

    // --- Build ClickHouse config ----------------------------------------
    let mut cfg = ClickHouseConfig::new(url, "demo")
        .with_max_batch_rows(1024)
        .with_max_retries(3)
        .with_base_retry_delay(Duration::from_millis(100))
        .with_timeouts(Duration::from_secs(5), Duration::from_secs(30));

    if let (Ok(user), Ok(pass)) = (
        std::env::var("CLICKHOUSE_USER"),
        std::env::var("CLICKHOUSE_PASSWORD"),
    ) {
        cfg = cfg.with_auth(user, pass);
    }

    #[cfg(feature = "clickhouse-gzip")]
    {
        cfg = cfg.with_gzip(true);
    }

    // --- Arrow schema for our rows --------------------------------------
    let schema = Arc::new(Schema::new(vec![
        Field::new("step", DataType::Int64, false),
        Field::new("agent_id", DataType::Int64, false),
        Field::new("x", DataType::Float64, false),
        Field::new("v", DataType::Float64, false),
    ]));

    // --- Build the pipeline ---------------------------------------------
    let batch_size = 512;
    let channel_capacity = 8;
    let mut pipeline = TelemetryPipeline::new(schema, batch_size, cfg, channel_capacity)?;

    // --- Run a headless simulation --------------------------------------
    // Trivial 1D ballistic agents; replace with your actual model step.
    let n_agents: i64 = 256;
    let n_steps: i64 = 100;
    let mut positions = vec![0.0_f64; n_agents as usize];
    let mut velocities: Vec<f64> = (0..n_agents).map(|i| 0.01 * (i as f64 + 1.0)).collect();
    let dt = 0.1_f64;

    for step in 0..n_steps {
        for i in 0..n_agents as usize {
            positions[i] += velocities[i] * dt;
            velocities[i] *= 0.999; // trivial drag
        }

        for (i, (x, v)) in positions.iter().zip(velocities.iter()).enumerate() {
            pipeline.push_row(&[
                ArrowValue::Int64(step),
                ArrowValue::Int64(i as i64),
                ArrowValue::Float64(*x),
                ArrowValue::Float64(*v),
            ])?;
        }
    }

    // --- Shutdown + report ----------------------------------------------
    let stats = pipeline.shutdown()?;
    println!(
        "rows_pushed={} rows_sent={} all_delivered={}",
        stats.rows_pushed,
        stats.writer.rows_sent,
        stats.all_rows_delivered()
    );
    Ok(())
}