use arrow_schema::{DataType, Field};
use rustsim::prelude::*;
#[test]
fn telemetry_pipeline_end_to_end() {
let schema = schema_from_fields(vec![
Field::new("step", DataType::Int64, false),
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]);
let config = ClickHouseConfig::default();
let mut pipeline = TelemetryPipeline::new(schema, 5, config, 8).unwrap();
for step in 0..12 {
let _ = pipeline.push_row(&[
ArrowValue::Int64(step),
ArrowValue::Int64(1),
ArrowValue::Float64(step as f64 * 0.1),
]);
}
let stats = pipeline.shutdown().unwrap();
assert_eq!(stats.rows_pushed, 12);
assert_eq!(stats.rows_enqueued + stats.rows_unsubmitted, 12);
assert_eq!(stats.writer.rows_received + stats.rows_unsubmitted, 12);
assert_eq!(
stats.writer.rows_sent + stats.rows_dropped(),
stats.rows_pushed,
"telemetry accounting should balance pushed vs sent+dropped rows"
);
}
#[test]
fn arrow_bridge_batching() {
let schema = schema_from_fields(vec![
Field::new("step", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]);
let mut bridge = CollectArrowBridge::new(schema, 5).unwrap();
for step in 0..12i64 {
bridge
.push_row(&[
ArrowValue::Int64(step),
ArrowValue::Float64(step as f64 * 0.1),
])
.unwrap();
}
let batches = bridge.take_batches().unwrap();
assert_eq!(batches.len(), 3);
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 12);
}
#[test]
fn compute_backend_detection() {
let backend = detect_backend();
assert!(backend == ComputeBackend::Cpu || backend == ComputeBackend::Cuda);
}