rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
//! Crowd → Arrow telemetry adapter.
//!
//! Bridges [`rustsim_crowd::CrowdObserver`] (and the layered 2.5-D
//! [`rustsim_crowd::LayeredObserver`]) into the umbrella's Arrow
//! telemetry surface so a tick-by-tick stream of pedestrian state can
//! flow into [`crate::telemetry::TelemetryPipeline`] (ClickHouse), a
//! [`rustsim_io::CollectArrowBridge`] (in-memory `RecordBatch`es,
//! Parquet, custom sinks), or any other consumer of `RecordBatch`
//! without consumer-side glue.
//!
//! The umbrella owns this adapter rather than `rustsim-crowd` itself
//! because `rustsim-crowd` deliberately has no `arrow_*` /
//! `rustsim-io` dependency — it is a domain crate and stays that way.
//! Adding the adapter here keeps the dependency arrows pointing
//! down: `rustsim` → `{rustsim-crowd, rustsim-io}`, never the other
//! way around.
//!
//! # 2-D drive
//!
//! ```ignore
//! use rustsim::crowd_telemetry::{push_crowd_row, crowd_arrow_schema, CROWD_BATCH_SIZE};
//! use rustsim::rustsim_io::CollectArrowBridge;
//! use rustsim::rustsim_crowd::prelude::*;
//!
//! let mut bridge = CollectArrowBridge::new(crowd_arrow_schema(), CROWD_BATCH_SIZE).unwrap();
//! let mut tick: i64 = 0;
//! // ... per tick:
//! step_scratch_store_observed(
//!     &SocialForceModel,
//!     &mut store,
//!     &walls,
//!     &params,
//!     dt,
//!     &mut scratch,
//!     &mut peds_buf,
//!     &mut |id, ped: &Pedestrian| {
//!         push_crowd_row(&mut bridge, tick, id, ped).expect("schema mismatch");
//!     },
//! );
//! tick += 1;
//! ```
//!
//! # Schema
//!
//! [`crowd_arrow_schema`] returns a fixed eight-column schema:
//!
//! | column          | type     | meaning                                  |
//! |-----------------|----------|------------------------------------------|
//! | `tick`          | `Int64`  | monotonic simulation tick (caller-owned) |
//! | `agent_id`      | `Int64`  | [`rustsim_core::AgentId`] cast to `i64`  |
//! | `pos_x`         | `Float64`| world-frame position (m)                 |
//! | `pos_y`         | `Float64`| world-frame position (m)                 |
//! | `vel_x`         | `Float64`| world-frame velocity (m/s)               |
//! | `vel_y`         | `Float64`| world-frame velocity (m/s)               |
//! | `radius`        | `Float64`| body radius (m)                          |
//! | `desired_speed` | `Float64`| free-flow speed setpoint (m/s)           |
//!
//! The schema is stable: adding a column is a breaking change and
//! requires a major version bump.

use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema, SchemaRef};

use rustsim_core::prelude::AgentId;
use rustsim_crowd::Pedestrian;
use rustsim_io::arrow::ArrowValue;
use rustsim_io::bridge::{BridgeError, CollectArrowBridge};

/// Default Arrow batch size for crowd telemetry.
///
/// 8 192 rows ≈ a typical Arrow / ClickHouse batch sweet-spot:
/// large enough to amortise per-batch overhead, small enough to
/// keep peak memory bounded at scale (8 192 × 8 columns × 8 B ≈ 0.5
/// MiB per batch). Callers writing custom sinks can pass a different
/// value to [`CollectArrowBridge::new`] without affecting the
/// schema or row layout.
pub const CROWD_BATCH_SIZE: usize = 8_192;

/// Number of columns in [`crowd_arrow_schema`]. Used by tests to
/// guard against accidental schema drift.
pub const CROWD_NUM_COLUMNS: usize = 8;

/// Arrow schema for one row of crowd telemetry.
///
/// See the module docs for the column order and meaning. Returns a
/// freshly allocated [`SchemaRef`] every call (Arrow `SchemaRef` is
/// cheaply cloneable, so callers should typically clone once per
/// pipeline at startup rather than per row).
pub fn crowd_arrow_schema() -> SchemaRef {
    Arc::new(Schema::new(vec![
        Field::new("tick", DataType::Int64, false),
        Field::new("agent_id", DataType::Int64, false),
        Field::new("pos_x", DataType::Float64, false),
        Field::new("pos_y", DataType::Float64, false),
        Field::new("vel_x", DataType::Float64, false),
        Field::new("vel_y", DataType::Float64, false),
        Field::new("radius", DataType::Float64, false),
        Field::new("desired_speed", DataType::Float64, false),
    ]))
}

/// Push a single `(tick, id, ped)` triple into a
/// [`CollectArrowBridge`] using the [`crowd_arrow_schema`] layout.
///
/// This is the row-level building block that
/// [`crowd_observer`] composes into a one-shot
/// [`rustsim_crowd::CrowdObserver`]. Use it directly when you want
/// to interleave crowd rows with rows from other subsystems on the
/// same bridge, or when you need to log a derived quantity (e.g.
/// after applying a filter).
///
/// # Errors
///
/// Returns [`BridgeError`] only if the bridge's schema does not
/// match [`crowd_arrow_schema`]. This indicates programmer error
/// (mis-configured bridge) and should be treated as such.
pub fn push_crowd_row(
    bridge: &mut CollectArrowBridge,
    tick: i64,
    id: AgentId,
    ped: &Pedestrian,
) -> Result<(), BridgeError> {
    bridge.push_row(&[
        ArrowValue::Int64(tick),
        ArrowValue::Int64(id as i64),
        ArrowValue::Float64(ped.pos[0]),
        ArrowValue::Float64(ped.pos[1]),
        ArrowValue::Float64(ped.vel[0]),
        ArrowValue::Float64(ped.vel[1]),
        ArrowValue::Float64(ped.radius),
        ArrowValue::Float64(ped.desired_speed),
    ])
}

/// Build a [`rustsim_crowd::CrowdObserver`]-compatible closure that
/// forwards every observed `(id, &Pedestrian)` into the supplied
/// bridge under the given `tick`.
///
/// The returned `FnMut(AgentId, &Pedestrian)` panics on schema
/// mismatch — by construction this can only happen if the bridge
/// was built with a schema other than [`crowd_arrow_schema`], which
/// is a programmer error rather than a runtime condition (the
/// observer hot path must not surface fallible behaviour to keep
/// the tick loop simple). For long-running services that want to
/// degrade gracefully on a future schema change, use
/// [`push_crowd_row`] directly inside a closure that handles the
/// error.
///
/// The bridge is borrowed mutably for the lifetime of the returned
/// closure — re-derive a new closure each tick (or pass the bridge
/// in a different way) if you need to interleave crowd rows with
/// non-crowd rows on the same bridge inside one tick.
pub fn crowd_observer<'a>(
    bridge: &'a mut CollectArrowBridge,
    tick: i64,
) -> impl FnMut(AgentId, &Pedestrian) + 'a {
    move |id: AgentId, ped: &Pedestrian| {
        push_crowd_row(bridge, tick, id, ped).expect(
            "crowd_observer: bridge schema does not match crowd_arrow_schema; \
             this is a programmer error, fix the bridge construction",
        );
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use rustsim_core::prelude::VecStore;
    use rustsim_core::store::AgentStore;
    use rustsim_crowd::common::Pedestrian;
    use rustsim_crowd::prelude::{
        recommended_cell_size, social_force, step_scratch_store_observed, CrowdAgent, Scratch,
        SocialForceModel,
    };

    fn fixture(n: usize) -> (VecStore<CrowdAgent>, Vec<rustsim_crowd::WallSegment>) {
        let mut store = VecStore::<CrowdAgent>::new();
        let cols = (n as f64).sqrt().ceil() as usize;
        for i in 0..n {
            let r = i / cols;
            let c = i % cols;
            let id = (i as u64) + 1;
            let pos = [c as f64 * 1.5, r as f64 * 1.5];
            let dest = [pos[0] + 30.0, pos[1]];
            let ped = Pedestrian::new(pos, [0.0, 0.0], 0.25, 1.34, dest);
            store.insert(CrowdAgent { id, ped });
        }
        let walls = vec![rustsim_crowd::WallSegment {
            a: [-10.0, -1.0],
            b: [200.0, -1.0],
        }];
        (store, walls)
    }

    #[test]
    fn schema_is_stable_eight_columns() {
        let s = crowd_arrow_schema();
        assert_eq!(s.fields().len(), CROWD_NUM_COLUMNS);
        let names: Vec<&str> = s.fields().iter().map(|f| f.name().as_str()).collect();
        assert_eq!(
            names,
            vec![
                "tick",
                "agent_id",
                "pos_x",
                "pos_y",
                "vel_x",
                "vel_y",
                "radius",
                "desired_speed"
            ]
        );
        for f in s.fields() {
            assert!(!f.is_nullable(), "{} must be non-nullable", f.name());
        }
    }

    #[test]
    fn end_to_end_observed_drive_emits_one_row_per_agent_per_tick() {
        // Wire `step_scratch_store_observed` into a `CollectArrowBridge`
        // via `crowd_observer` and check the resulting `RecordBatch`
        // stream:
        //
        //   1. row count == n_agents * n_ticks (no dropped rows on the
        //      auto-flush boundary, and no duplicated rows from the
        //      observer being invoked twice);
        //   2. schema matches `crowd_arrow_schema` so the bridge can
        //      be wired straight into a `TelemetryPipeline`;
        //   3. tick column is monotone non-decreasing across batches
        //      (the bridge does not reorder rows);
        //   4. every agent_id appears exactly `n_ticks` times,
        //      proving the observer ran once per agent per tick in a
        //      deterministic order — the contract a downstream
        //      analyst-facing pipeline depends on.
        const N: usize = 200;
        const TICKS: i64 = 60;
        const DT: f64 = 0.05;

        let (mut store, walls) = fixture(N);
        let params = social_force::Params::default();
        let cell = recommended_cell_size(social_force::neighbor_cutoff(&params));
        let mut scratch = Scratch::new(cell);
        let mut peds_buf: Vec<Pedestrian> = Vec::new();

        let mut bridge = CollectArrowBridge::new(crowd_arrow_schema(), CROWD_BATCH_SIZE).unwrap();

        for tick in 0..TICKS {
            // Re-derive the observer closure each tick so the bridge
            // borrow does not outlive the call.
            let mut obs = crowd_observer(&mut bridge, tick);
            step_scratch_store_observed(
                &SocialForceModel,
                &mut store,
                &walls,
                &params,
                DT,
                &mut scratch,
                &mut peds_buf,
                &mut obs,
            );
        }

        let batches = bridge.take_batches().unwrap();
        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
        assert_eq!(
            total_rows,
            N * TICKS as usize,
            "expected one row per agent per tick"
        );

        // Schema check — every batch must agree with crowd_arrow_schema.
        let expected = crowd_arrow_schema();
        for b in &batches {
            assert_eq!(b.schema(), expected);
        }

        // Row-level invariants.
        let mut last_tick: i64 = -1;
        let mut id_counts: std::collections::HashMap<i64, usize> =
            std::collections::HashMap::with_capacity(N);
        for b in &batches {
            let tick_col = b
                .column_by_name("tick")
                .unwrap()
                .as_any()
                .downcast_ref::<arrow_array::Int64Array>()
                .unwrap();
            let id_col = b
                .column_by_name("agent_id")
                .unwrap()
                .as_any()
                .downcast_ref::<arrow_array::Int64Array>()
                .unwrap();
            for i in 0..b.num_rows() {
                let t = tick_col.value(i);
                assert!(t >= last_tick, "tick column must be monotone");
                last_tick = t;
                *id_counts.entry(id_col.value(i)).or_insert(0) += 1;
            }
        }
        assert_eq!(id_counts.len(), N, "every agent must be observed");
        for (id, count) in &id_counts {
            assert_eq!(*count, TICKS as usize, "agent {id} appeared {count} times");
        }
    }
}