rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
use arrow_schema::{DataType, Field};
use rustsim::prelude::*;

#[derive(Debug, Clone)]
struct Particle {
    id: AgentId,
    x: f32,
    vx: f32,
}

impl Agent for Particle {
    fn id(&self) -> AgentId {
        self.id
    }
}

impl SoaExtractable for Particle {
    fn num_columns() -> usize {
        2
    }

    fn column_names() -> Vec<&'static str> {
        vec!["x", "vx"]
    }

    fn extract_row(&self, columns: &mut [Vec<f32>]) {
        columns[0].push(self.x);
        columns[1].push(self.vx);
    }

    fn write_back_row(&mut self, columns: &[&[f32]], row: usize) {
        self.x = columns[0][row];
        self.vx = columns[1][row];
    }
}

#[test]
fn telemetry_pipeline_stats_report_rows_without_flushing() {
    let schema = schema_from_fields(vec![
        Field::new("id", DataType::Int64, false),
        Field::new("value", DataType::Float64, false),
    ]);

    let config = ClickHouseConfig {
        url: "http://127.0.0.1:1".to_string(),
        ..ClickHouseConfig::default()
    };

    let mut pipeline = TelemetryPipeline::new(schema, 10, config, 4).unwrap();
    let empty = pipeline.stats();
    assert_eq!(empty.rows_pushed, 0);
    assert_eq!(empty.batches_enqueued, 0);
    assert_eq!(empty.enqueue_errors, 0);
    assert_eq!(empty.bridge.pending_rows, 0);
    assert_eq!(empty.writer.outstanding_batches, 0);

    pipeline
        .push_row(&[ArrowValue::Int64(1), ArrowValue::Float64(1.0)])
        .unwrap();
    pipeline
        .push_row(&[ArrowValue::Int64(2), ArrowValue::Float64(2.0)])
        .unwrap();

    let stats = pipeline.stats();
    assert_eq!(stats.rows_pushed, 2);
    assert_eq!(stats.batches_enqueued, 0);
    assert_eq!(stats.rows_enqueued, 0);
    assert_eq!(stats.bridge.pending_rows, 2);
    assert_eq!(stats.bridge.completed_batches, 0);
    assert_eq!(stats.bridge.total_rows, 2);
}

#[test]
fn telemetry_pipeline_flush_moves_partial_batch_into_delivery_path() {
    let schema = schema_from_fields(vec![
        Field::new("id", DataType::Int64, false),
        Field::new("value", DataType::Float64, false),
    ]);

    let config = ClickHouseConfig {
        url: "http://127.0.0.1:1".to_string(),
        ..ClickHouseConfig::default()
    };

    let mut pipeline = TelemetryPipeline::new(schema, 10, config, 4).unwrap();
    pipeline
        .push_row(&[ArrowValue::Int64(1), ArrowValue::Float64(1.0)])
        .unwrap();
    pipeline
        .push_row(&[ArrowValue::Int64(2), ArrowValue::Float64(2.0)])
        .unwrap();

    let _ = pipeline.flush();
    let stats = pipeline.stats();
    assert_eq!(stats.rows_pushed, 2);
    assert_eq!(stats.bridge.pending_rows, 0);
    assert_eq!(stats.bridge.completed_batches, 0);
    assert_eq!(stats.rows_enqueued + stats.bridge.total_rows as u64, 2);
}

#[test]
fn device_soa_store_reports_resident_bytes() {
    let mut store = VecStore::new();
    for id in 1..=4 {
        store.insert(Particle {
            id,
            x: id as f32,
            vx: 1.0,
        });
    }

    let device = DeviceSoaStore::upload::<Particle, _>(&store);
    let expected = 4 * std::mem::size_of::<AgentId>() + 8 * std::mem::size_of::<f32>();
    assert_eq!(device.resident_bytes(), expected);
}

#[test]
fn accel_step_result_reports_derived_throughput_metrics() {
    let result = AccelStepResult {
        backend: ComputeBackend::Cpu,
        agent_count: 2_000,
        kernel_us: 500,
    };

    assert!((result.kernel_ms() - 0.5).abs() < 1e-10);
    assert!((result.agents_per_second() - 4_000_000.0).abs() < 1.0);
}