use std::sync::Arc;
use std::time::Duration;
use arrow_schema::{DataType, Field, Schema};
use rustsim::telemetry::{TelemetryError, TelemetryPipeline};
use rustsim_io::arrow::ArrowValue;
use rustsim_io::clickhouse::ClickHouseConfig;
fn main() -> Result<(), TelemetryError> {
let Ok(url) = std::env::var("CLICKHOUSE_URL") else {
eprintln!(
"CLICKHOUSE_URL not set; skipping. \
Set CLICKHOUSE_URL=http://localhost:8123 to run end-to-end."
);
return Ok(());
};
let mut cfg = ClickHouseConfig::new(url, "demo")
.with_max_batch_rows(1024)
.with_max_retries(3)
.with_base_retry_delay(Duration::from_millis(100))
.with_timeouts(Duration::from_secs(5), Duration::from_secs(30));
if let (Ok(user), Ok(pass)) = (
std::env::var("CLICKHOUSE_USER"),
std::env::var("CLICKHOUSE_PASSWORD"),
) {
cfg = cfg.with_auth(user, pass);
}
#[cfg(feature = "clickhouse-gzip")]
{
cfg = cfg.with_gzip(true);
}
let schema = Arc::new(Schema::new(vec![
Field::new("step", DataType::Int64, false),
Field::new("agent_id", DataType::Int64, false),
Field::new("x", DataType::Float64, false),
Field::new("v", DataType::Float64, false),
]));
let batch_size = 512;
let channel_capacity = 8;
let mut pipeline = TelemetryPipeline::new(schema, batch_size, cfg, channel_capacity)?;
let n_agents: i64 = 256;
let n_steps: i64 = 100;
let mut positions = vec![0.0_f64; n_agents as usize];
let mut velocities: Vec<f64> = (0..n_agents).map(|i| 0.01 * (i as f64 + 1.0)).collect();
let dt = 0.1_f64;
for step in 0..n_steps {
for i in 0..n_agents as usize {
positions[i] += velocities[i] * dt;
velocities[i] *= 0.999; }
for (i, (x, v)) in positions.iter().zip(velocities.iter()).enumerate() {
pipeline.push_row(&[
ArrowValue::Int64(step),
ArrowValue::Int64(i as i64),
ArrowValue::Float64(*x),
ArrowValue::Float64(*v),
])?;
}
}
let stats = pipeline.shutdown()?;
println!(
"rows_pushed={} rows_sent={} all_delivered={}",
stats.rows_pushed,
stats.writer.rows_sent,
stats.all_rows_delivered()
);
Ok(())
}