use arrow_schema::{DataType, Field};
use rustsim::prelude::*;
#[derive(Debug, Clone)]
struct Particle {
id: AgentId,
x: f32,
vx: f32,
}
impl Agent for Particle {
fn id(&self) -> AgentId {
self.id
}
}
impl SoaExtractable for Particle {
fn num_columns() -> usize {
2
}
fn column_names() -> Vec<&'static str> {
vec!["x", "vx"]
}
fn extract_row(&self, columns: &mut [Vec<f32>]) {
columns[0].push(self.x);
columns[1].push(self.vx);
}
fn write_back_row(&mut self, columns: &[&[f32]], row: usize) {
self.x = columns[0][row];
self.vx = columns[1][row];
}
}
#[test]
fn telemetry_pipeline_stats_report_rows_without_flushing() {
let schema = schema_from_fields(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]);
let config = ClickHouseConfig {
url: "http://127.0.0.1:1".to_string(),
..ClickHouseConfig::default()
};
let mut pipeline = TelemetryPipeline::new(schema, 10, config, 4).unwrap();
let empty = pipeline.stats();
assert_eq!(empty.rows_pushed, 0);
assert_eq!(empty.batches_enqueued, 0);
assert_eq!(empty.enqueue_errors, 0);
assert_eq!(empty.bridge.pending_rows, 0);
assert_eq!(empty.writer.outstanding_batches, 0);
pipeline
.push_row(&[ArrowValue::Int64(1), ArrowValue::Float64(1.0)])
.unwrap();
pipeline
.push_row(&[ArrowValue::Int64(2), ArrowValue::Float64(2.0)])
.unwrap();
let stats = pipeline.stats();
assert_eq!(stats.rows_pushed, 2);
assert_eq!(stats.batches_enqueued, 0);
assert_eq!(stats.rows_enqueued, 0);
assert_eq!(stats.bridge.pending_rows, 2);
assert_eq!(stats.bridge.completed_batches, 0);
assert_eq!(stats.bridge.total_rows, 2);
}
#[test]
fn telemetry_pipeline_flush_moves_partial_batch_into_delivery_path() {
let schema = schema_from_fields(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]);
let config = ClickHouseConfig {
url: "http://127.0.0.1:1".to_string(),
..ClickHouseConfig::default()
};
let mut pipeline = TelemetryPipeline::new(schema, 10, config, 4).unwrap();
pipeline
.push_row(&[ArrowValue::Int64(1), ArrowValue::Float64(1.0)])
.unwrap();
pipeline
.push_row(&[ArrowValue::Int64(2), ArrowValue::Float64(2.0)])
.unwrap();
let _ = pipeline.flush();
let stats = pipeline.stats();
assert_eq!(stats.rows_pushed, 2);
assert_eq!(stats.bridge.pending_rows, 0);
assert_eq!(stats.bridge.completed_batches, 0);
assert_eq!(stats.rows_enqueued + stats.bridge.total_rows as u64, 2);
}
#[test]
fn device_soa_store_reports_resident_bytes() {
let mut store = VecStore::new();
for id in 1..=4 {
store.insert(Particle {
id,
x: id as f32,
vx: 1.0,
});
}
let device = DeviceSoaStore::upload::<Particle, _>(&store);
let expected = 4 * std::mem::size_of::<AgentId>() + 8 * std::mem::size_of::<f32>();
assert_eq!(device.resident_bytes(), expected);
}
#[test]
fn accel_step_result_reports_derived_throughput_metrics() {
let result = AccelStepResult {
backend: ComputeBackend::Cpu,
agent_count: 2_000,
kernel_us: 500,
};
assert!((result.kernel_ms() - 0.5).abs() < 1e-10);
assert!((result.agents_per_second() - 4_000_000.0).abs() < 1.0);
}