rustsim 0.0.1

High-performance agent-based modelling engine - top-level orchestration crate
Documentation
//! End-to-end headless telemetry pipeline.
//!
//! [`TelemetryPipeline`] connects data collection to ClickHouse ingestion:
//!
//! 1. Rows are pushed via [`push_row`](TelemetryPipeline::push_row).
//! 2. The internal [`CollectArrowBridge`] batches rows into Arrow `RecordBatch` values.
//! 3. Completed batches are drained to a background [`ClickHouseWriter`] with
//!    bounded-channel backpressure.
//! 4. [`shutdown`](TelemetryPipeline::shutdown) flushes remaining data and returns statistics.
//!
//! The pipeline also exposes lightweight observability snapshots via
//! [`stats`](TelemetryPipeline::stats) so callers can inspect batching progress,
//! pending rows, and writer-side backlog without shutting the pipeline down.
//!
//! # Delivery semantics
//!
//! Delivery is tracked in three stages:
//! - rows accepted into the pipeline
//! - batches successfully enqueued to the background writer
//! - batches ultimately sent or dropped by the writer
//!
//! If enqueue to the writer fails during a normal `push_row`/`flush`, completed
//! batches are restored to the bridge so they are not silently lost.
//!
//! [`CollectArrowBridge`]: rustsim_io::bridge::CollectArrowBridge
//! [`ClickHouseWriter`]: rustsim_io::clickhouse::ClickHouseWriter

use rustsim_io::arrow::ArrowValue;
use rustsim_io::bridge::{BridgeError, BridgeMetrics, CollectArrowBridge};
use rustsim_io::clickhouse::{
    ClickHouseConfig, ClickHouseError, ClickHouseWriter, WriterMetricsSnapshot, WriterStats,
};
use thiserror::Error;
use tracing::{debug, warn};

/// Errors that can occur in the telemetry pipeline.
#[derive(Debug, Error)]
pub enum TelemetryError {
    /// An Arrow bridge/batching error.
    #[error("bridge error: {0}")]
    Bridge(#[from] BridgeError),
    /// A ClickHouse writer error (channel closed, HTTP failure, invalid config).
    #[error("clickhouse error: {0}")]
    ClickHouse(#[from] ClickHouseError),
}

/// Lightweight observability snapshot for [`TelemetryPipeline`].
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct TelemetryPipelineStats {
    /// Total rows accepted via `push_row`.
    pub rows_pushed: u64,
    /// Number of Arrow batches successfully handed off to the writer.
    pub batches_enqueued: u64,
    /// Number of rows contained in batches handed off to the writer.
    pub rows_enqueued: u64,
    /// Number of enqueue failures observed while handing batches to the writer.
    pub enqueue_errors: u64,
    /// Current bridge-side batching metrics.
    pub bridge: BridgeMetrics,
    /// Current writer-side live metrics.
    pub writer: WriterMetricsSnapshot,
}

/// End-to-end delivery summary returned by [`TelemetryPipeline::shutdown`].
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TelemetryShutdownStats {
    /// Total rows accepted via `push_row`.
    pub rows_pushed: u64,
    /// Number of batches successfully handed off to the writer.
    pub batches_enqueued: u64,
    /// Number of rows contained in batches successfully handed off to the writer.
    pub rows_enqueued: u64,
    /// Number of enqueue failures observed while handing batches to the writer.
    pub enqueue_errors: u64,
    /// Number of batches that could not be handed off during shutdown.
    pub batches_unsubmitted: u64,
    /// Number of rows in batches that could not be handed off during shutdown.
    pub rows_unsubmitted: u64,
    /// Final writer-side delivery statistics.
    pub writer: WriterStats,
}

impl TelemetryShutdownStats {
    /// Total number of batches not durably delivered.
    pub fn batches_dropped(&self) -> u64 {
        self.batches_unsubmitted + self.writer.batches_failed
    }

    /// Total number of rows not durably delivered.
    pub fn rows_dropped(&self) -> u64 {
        self.rows_unsubmitted + self.writer.rows_failed
    }

    /// Returns `true` if every accepted row was successfully delivered.
    pub fn all_rows_delivered(&self) -> bool {
        self.rows_dropped() == 0 && self.rows_pushed == self.writer.rows_sent
    }
}

/// End-to-end headless telemetry pipeline.
///
/// Collects rows into Arrow batches via [`CollectArrowBridge`],
/// then drains completed batches to a background [`ClickHouseWriter`]
/// with bounded-channel backpressure.
///
/// [`CollectArrowBridge`]: rustsim_io::bridge::CollectArrowBridge
/// [`ClickHouseWriter`]: rustsim_io::clickhouse::ClickHouseWriter
pub struct TelemetryPipeline {
    bridge: CollectArrowBridge,
    writer: ClickHouseWriter,
    rows_pushed: u64,
    batches_enqueued: u64,
    rows_enqueued: u64,
    enqueue_errors: u64,
}

impl TelemetryPipeline {
    /// Create a new pipeline.
    ///
    /// # Arguments
    ///
    /// - `schema` - Arrow schema for the data rows.
    /// - `batch_size` - number of rows per Arrow batch.
    /// - `ch_config` - ClickHouse connection and retry configuration.
    /// - `channel_capacity` - bounded channel size (backpressure threshold).
    pub fn new(
        schema: arrow_schema::SchemaRef,
        batch_size: usize,
        ch_config: ClickHouseConfig,
        channel_capacity: usize,
    ) -> Result<Self, TelemetryError> {
        let bridge = CollectArrowBridge::new(schema, batch_size)?;
        let writer = ClickHouseWriter::new(ch_config, channel_capacity)?;
        Ok(Self {
            bridge,
            writer,
            rows_pushed: 0,
            batches_enqueued: 0,
            rows_enqueued: 0,
            enqueue_errors: 0,
        })
    }

    /// Push a single row of agent/model data.
    ///
    /// Automatically drains completed batches to the writer.
    pub fn push_row(&mut self, values: &[ArrowValue]) -> Result<(), TelemetryError> {
        self.bridge.push_row(values)?;
        self.rows_pushed += 1;
        self.drain_ready_batches()?;
        Ok(())
    }

    /// Force any currently buffered partial batch to be flushed and enqueued.
    pub fn flush(&mut self) -> Result<(), TelemetryError> {
        self.bridge.flush()?;
        self.drain_ready_batches()?;
        Ok(())
    }

    /// Snapshot current pipeline metrics without shutting the pipeline down.
    pub fn stats(&self) -> TelemetryPipelineStats {
        TelemetryPipelineStats {
            rows_pushed: self.rows_pushed,
            batches_enqueued: self.batches_enqueued,
            rows_enqueued: self.rows_enqueued,
            enqueue_errors: self.enqueue_errors,
            bridge: self.bridge.metrics(),
            writer: self.writer.metrics(),
        }
    }

    /// Flush any remaining rows, attempt to enqueue them, then shut down.
    ///
    /// Returns an end-to-end delivery summary even when some batches could not
    /// be handed off to the background writer during shutdown.
    pub fn shutdown(mut self) -> Result<TelemetryShutdownStats, TelemetryError> {
        let batches = self.bridge.take_batches()?;
        let mut batches_unsubmitted = 0u64;
        let mut rows_unsubmitted = 0u64;
        let mut pending = batches.into_iter();

        while let Some(batch) = pending.next() {
            let rows = batch.num_rows() as u64;
            match self.writer.send(batch) {
                Ok(()) => {
                    self.batches_enqueued += 1;
                    self.rows_enqueued += rows;
                }
                Err(e) => {
                    self.enqueue_errors += 1;
                    rows_unsubmitted += rows;
                    batches_unsubmitted += 1;
                    for remaining in pending {
                        rows_unsubmitted += remaining.num_rows() as u64;
                        batches_unsubmitted += 1;
                    }
                    warn!(error = %e, batches_unsubmitted, rows_unsubmitted, "failed to enqueue remaining telemetry batches during shutdown");
                    break;
                }
            }
        }

        let writer = self.writer.shutdown();
        Ok(TelemetryShutdownStats {
            rows_pushed: self.rows_pushed,
            batches_enqueued: self.batches_enqueued,
            rows_enqueued: self.rows_enqueued,
            enqueue_errors: self.enqueue_errors,
            batches_unsubmitted,
            rows_unsubmitted,
            writer,
        })
    }

    fn drain_ready_batches(&mut self) -> Result<(), TelemetryError> {
        let drained = self.bridge.drain_completed();
        let mut pending = drained.into_iter();
        let mut enqueued = 0usize;

        while let Some(batch) = pending.next() {
            let rows = batch.num_rows() as u64;
            if let Err(e) = self.writer.send(batch.clone()) {
                self.enqueue_errors += 1;
                warn!(error = %e, "failed to send batch to ClickHouse writer");
                let mut unsent = Vec::new();
                unsent.push(batch);
                unsent.extend(pending);
                self.bridge.restore_completed(unsent);
                return Err(e.into());
            }
            self.batches_enqueued += 1;
            self.rows_enqueued += rows;
            enqueued += 1;
            debug!(
                rows,
                batches_enqueued = self.batches_enqueued,
                "drained completed batch to writer"
            );
        }

        if enqueued > 0 {
            debug!(
                enqueued,
                "completed telemetry batches enqueued successfully"
            );
        }

        Ok(())
    }
}