use rustsim::crowd_telemetry::{
crowd_arrow_schema, crowd_observer, CROWD_BATCH_SIZE, CROWD_NUM_COLUMNS,
};
use rustsim::rustsim_core::prelude::{AgentStore, VecStore};
use rustsim::rustsim_crowd::common::Pedestrian;
use rustsim::rustsim_crowd::prelude::{
recommended_cell_size, step_scratch_store_observed, CrowdAgent, Scratch, SocialForceModel,
WallSegment,
};
use rustsim::rustsim_crowd::social_force;
use rustsim::rustsim_io::bridge::CollectArrowBridge;
const N: usize = 60;
const TICKS: i64 = 30;
const DT: f64 = 0.05;
const RADIUS: f64 = 0.25;
const DESIRED_SPEED: f64 = 1.34;
fn main() {
let mut store = VecStore::<CrowdAgent>::new();
let cols = (N as f64).sqrt().ceil() as usize;
for i in 0..N {
let r = i / cols;
let c = i % cols;
let id = (i as u64) + 1;
let pos = [c as f64 * 1.5, r as f64 * 1.5];
let dest = [pos[0] + 30.0, pos[1]];
let ped = Pedestrian::new(pos, [0.0, 0.0], RADIUS, DESIRED_SPEED, dest);
store.insert(CrowdAgent { id, ped });
}
let walls = vec![WallSegment {
a: [-10.0, -1.0],
b: [200.0, -1.0],
}];
let mut bridge = CollectArrowBridge::new(crowd_arrow_schema(), CROWD_BATCH_SIZE)
.expect("crowd_arrow_schema is always valid");
let params = social_force::Params::default();
params
.validate(DT)
.expect("default Params should validate at dt = 0.05 s");
let cell = recommended_cell_size(social_force::neighbor_cutoff(¶ms));
let mut scratch = Scratch::new(cell);
let mut peds_buf: Vec<Pedestrian> = Vec::with_capacity(N);
println!("# crowd_telemetry_demo");
println!("# n_agents = {N}, ticks = {TICKS}, dt = {DT} s");
println!(
"# schema = {} columns ({})",
CROWD_NUM_COLUMNS,
crowd_arrow_schema()
.fields()
.iter()
.map(|f| f.name().as_str())
.collect::<Vec<_>>()
.join(", ")
);
println!("# auto-flush batch_size = {CROWD_BATCH_SIZE}");
for tick in 0..TICKS {
let mut obs = crowd_observer(&mut bridge, tick);
step_scratch_store_observed(
&SocialForceModel,
&mut store,
&walls,
¶ms,
DT,
&mut scratch,
&mut peds_buf,
&mut obs,
);
}
let batches = bridge
.take_batches()
.expect("bridge drain should not fail on the canonical schema");
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
println!();
println!(
"Drained {} RecordBatch(es), {total_rows} total rows.",
batches.len()
);
println!(
"Expected: {} rows ({} agents × {} ticks).",
N * TICKS as usize,
N,
TICKS
);
assert_eq!(
total_rows,
N * TICKS as usize,
"row count must match the deterministic per-agent-per-tick contract"
);
if let Some(first) = batches.first() {
println!();
println!(
"First batch: {} rows × {} columns, schema fields = {:?}",
first.num_rows(),
first.num_columns(),
first
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<Vec<_>>()
);
}
println!();
println!("In a production pipeline, replace the `take_batches` drain");
println!("with `rustsim::telemetry::TelemetryPipeline::push_row` for");
println!("ClickHouse delivery, or feed `batches` to a Parquet writer.");
}