use rustsim_io::arrow::ArrowValue;
use rustsim_io::bridge::{BridgeError, BridgeMetrics, CollectArrowBridge};
use rustsim_io::clickhouse::{
ClickHouseConfig, ClickHouseError, ClickHouseWriter, WriterMetricsSnapshot, WriterStats,
};
use thiserror::Error;
use tracing::{debug, warn};
#[derive(Debug, Error)]
pub enum TelemetryError {
#[error("bridge error: {0}")]
Bridge(#[from] BridgeError),
#[error("clickhouse error: {0}")]
ClickHouse(#[from] ClickHouseError),
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TelemetryPipelineStats {
pub rows_pushed: u64,
pub batches_enqueued: u64,
pub rows_enqueued: u64,
pub enqueue_errors: u64,
pub bridge: BridgeMetrics,
pub writer: WriterMetricsSnapshot,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TelemetryShutdownStats {
pub rows_pushed: u64,
pub batches_enqueued: u64,
pub rows_enqueued: u64,
pub enqueue_errors: u64,
pub batches_unsubmitted: u64,
pub rows_unsubmitted: u64,
pub writer: WriterStats,
}
impl TelemetryShutdownStats {
pub fn batches_dropped(&self) -> u64 {
self.batches_unsubmitted + self.writer.batches_failed
}
pub fn rows_dropped(&self) -> u64 {
self.rows_unsubmitted + self.writer.rows_failed
}
pub fn all_rows_delivered(&self) -> bool {
self.rows_dropped() == 0 && self.rows_pushed == self.writer.rows_sent
}
}
pub struct TelemetryPipeline {
bridge: CollectArrowBridge,
writer: ClickHouseWriter,
rows_pushed: u64,
batches_enqueued: u64,
rows_enqueued: u64,
enqueue_errors: u64,
}
impl TelemetryPipeline {
pub fn new(
schema: arrow_schema::SchemaRef,
batch_size: usize,
ch_config: ClickHouseConfig,
channel_capacity: usize,
) -> Result<Self, TelemetryError> {
let bridge = CollectArrowBridge::new(schema, batch_size)?;
let writer = ClickHouseWriter::new(ch_config, channel_capacity)?;
Ok(Self {
bridge,
writer,
rows_pushed: 0,
batches_enqueued: 0,
rows_enqueued: 0,
enqueue_errors: 0,
})
}
pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), TelemetryError> {
self.bridge.push_row(values)?;
self.rows_pushed += 1;
self.drain_ready_batches()?;
Ok(())
}
pub fn flush(&mut self) -> Result<(), TelemetryError> {
self.bridge.flush()?;
self.drain_ready_batches()?;
Ok(())
}
pub fn stats(&self) -> TelemetryPipelineStats {
TelemetryPipelineStats {
rows_pushed: self.rows_pushed,
batches_enqueued: self.batches_enqueued,
rows_enqueued: self.rows_enqueued,
enqueue_errors: self.enqueue_errors,
bridge: self.bridge.metrics(),
writer: self.writer.metrics(),
}
}
pub fn shutdown(mut self) -> Result<TelemetryShutdownStats, TelemetryError> {
let batches = self.bridge.take_batches()?;
let mut batches_unsubmitted = 0u64;
let mut rows_unsubmitted = 0u64;
let mut pending = batches.into_iter();
while let Some(batch) = pending.next() {
let rows = batch.num_rows() as u64;
match self.writer.send(batch) {
Ok(()) => {
self.batches_enqueued += 1;
self.rows_enqueued += rows;
}
Err(e) => {
self.enqueue_errors += 1;
rows_unsubmitted += rows;
batches_unsubmitted += 1;
for remaining in pending {
rows_unsubmitted += remaining.num_rows() as u64;
batches_unsubmitted += 1;
}
warn!(error = %e, batches_unsubmitted, rows_unsubmitted, "failed to enqueue remaining telemetry batches during shutdown");
break;
}
}
}
let writer = self.writer.shutdown();
Ok(TelemetryShutdownStats {
rows_pushed: self.rows_pushed,
batches_enqueued: self.batches_enqueued,
rows_enqueued: self.rows_enqueued,
enqueue_errors: self.enqueue_errors,
batches_unsubmitted,
rows_unsubmitted,
writer,
})
}
fn drain_ready_batches(&mut self) -> Result<(), TelemetryError> {
let drained = self.bridge.drain_completed();
let mut pending = drained.into_iter();
let mut enqueued = 0usize;
while let Some(batch) = pending.next() {
let rows = batch.num_rows() as u64;
if let Err(e) = self.writer.send(batch.clone()) {
self.enqueue_errors += 1;
warn!(error = %e, "failed to send batch to ClickHouse writer");
let mut unsent = Vec::new();
unsent.push(batch);
unsent.extend(pending);
self.bridge.restore_completed(unsent);
return Err(e.into());
}
self.batches_enqueued += 1;
self.rows_enqueued += rows;
enqueued += 1;
debug!(
rows,
batches_enqueued = self.batches_enqueued,
"drained completed batch to writer"
);
}
if enqueued > 0 {
debug!(
enqueued,
"completed telemetry batches enqueued successfully"
);
}
Ok(())
}
}