rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
//! End-to-end crowd telemetry example: pedestrian dynamics →
//! `CrowdObserver` → `CollectArrowBridge` → in-memory `RecordBatch`
//! stream.
//!
//! This is the canonical wiring for the
//! [`rustsim::crowd_telemetry`] adapter. Run it to see how a
//! production pipeline turns 2-D pedestrian state into a stable
//! eight-column Arrow stream that can be drained into
//! [`TelemetryPipeline`] (ClickHouse), serialised to Parquet, or
//! handed to any other Arrow consumer:
//!
//! 1. Build a `VecStore<CrowdAgent>` of pedestrians.
//! 2. Construct a `CollectArrowBridge` against
//!    [`crowd_arrow_schema()`] with [`CROWD_BATCH_SIZE`].
//! 3. Each tick, drive the store with
//!    [`step_scratch_store_observed`] and pass a
//!    [`crowd_observer(&mut bridge, tick)`] closure as the observer.
//! 4. After the loop, drain the bridge into `RecordBatch`es and
//!    print a summary the same way a real pipeline would forward
//!    them to a sink.
//!
//! Run with:
//!
//! ```bash
//! cargo run -p rustsim --example crowd_telemetry_demo --release
//! ```
//!
//! The example deliberately uses a small population (60 agents × 30
//! ticks) so the entire batch stream fits in stdout. For
//! production-scale wiring, replace the in-memory drain with a
//! [`TelemetryPipeline::push_row`] forward; the schema and observer
//! contract are the same.
//!
//! [`TelemetryPipeline`]: rustsim::telemetry::TelemetryPipeline
//! [`TelemetryPipeline::push_row`]: rustsim::telemetry::TelemetryPipeline::push_row

use rustsim::crowd_telemetry::{
    crowd_arrow_schema, crowd_observer, CROWD_BATCH_SIZE, CROWD_NUM_COLUMNS,
};
use rustsim::rustsim_core::prelude::{AgentStore, VecStore};
use rustsim::rustsim_crowd::common::Pedestrian;
use rustsim::rustsim_crowd::prelude::{
    recommended_cell_size, step_scratch_store_observed, CrowdAgent, Scratch, SocialForceModel,
    WallSegment,
};
use rustsim::rustsim_crowd::social_force;
use rustsim::rustsim_io::bridge::CollectArrowBridge;

const N: usize = 60;
const TICKS: i64 = 30;
const DT: f64 = 0.05;
const RADIUS: f64 = 0.25;
const DESIRED_SPEED: f64 = 1.34;

fn main() {
    // ---- 1. Build the agent store ----------------------------------
    let mut store = VecStore::<CrowdAgent>::new();
    let cols = (N as f64).sqrt().ceil() as usize;
    for i in 0..N {
        let r = i / cols;
        let c = i % cols;
        let id = (i as u64) + 1;
        let pos = [c as f64 * 1.5, r as f64 * 1.5];
        let dest = [pos[0] + 30.0, pos[1]];
        let ped = Pedestrian::new(pos, [0.0, 0.0], RADIUS, DESIRED_SPEED, dest);
        store.insert(CrowdAgent { id, ped });
    }
    let walls = vec![WallSegment {
        a: [-10.0, -1.0],
        b: [200.0, -1.0],
    }];

    // ---- 2. Build the Arrow bridge ---------------------------------
    let mut bridge = CollectArrowBridge::new(crowd_arrow_schema(), CROWD_BATCH_SIZE)
        .expect("crowd_arrow_schema is always valid");

    // Validate parameters once at startup.
    let params = social_force::Params::default();
    params
        .validate(DT)
        .expect("default Params should validate at dt = 0.05 s");

    // Allocate scratch buffers once and reuse for the whole run.
    let cell = recommended_cell_size(social_force::neighbor_cutoff(&params));
    let mut scratch = Scratch::new(cell);
    let mut peds_buf: Vec<Pedestrian> = Vec::with_capacity(N);

    // ---- 3. Drive the store with the observer ----------------------
    println!("# crowd_telemetry_demo");
    println!("# n_agents = {N}, ticks = {TICKS}, dt = {DT} s");
    println!(
        "# schema = {} columns ({})",
        CROWD_NUM_COLUMNS,
        crowd_arrow_schema()
            .fields()
            .iter()
            .map(|f| f.name().as_str())
            .collect::<Vec<_>>()
            .join(", ")
    );
    println!("# auto-flush batch_size = {CROWD_BATCH_SIZE}");

    for tick in 0..TICKS {
        // Re-derive the observer each tick so the bridge borrow is
        // scoped to the call.
        let mut obs = crowd_observer(&mut bridge, tick);
        step_scratch_store_observed(
            &SocialForceModel,
            &mut store,
            &walls,
            &params,
            DT,
            &mut scratch,
            &mut peds_buf,
            &mut obs,
        );
    }

    // ---- 4. Drain and summarise ------------------------------------
    let batches = bridge
        .take_batches()
        .expect("bridge drain should not fail on the canonical schema");
    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();

    println!();
    println!(
        "Drained {} RecordBatch(es), {total_rows} total rows.",
        batches.len()
    );
    println!(
        "Expected: {} rows ({} agents × {} ticks).",
        N * TICKS as usize,
        N,
        TICKS
    );
    assert_eq!(
        total_rows,
        N * TICKS as usize,
        "row count must match the deterministic per-agent-per-tick contract"
    );

    // Show a peek of the first batch so a reader can see the column
    // layout in action.
    if let Some(first) = batches.first() {
        println!();
        println!(
            "First batch: {} rows × {} columns, schema fields = {:?}",
            first.num_rows(),
            first.num_columns(),
            first
                .schema()
                .fields()
                .iter()
                .map(|f| f.name().clone())
                .collect::<Vec<_>>()
        );
    }

    println!();
    println!("In a production pipeline, replace the `take_batches` drain");
    println!("with `rustsim::telemetry::TelemetryPipeline::push_row` for");
    println!("ClickHouse delivery, or feed `batches` to a Parquet writer.");
}