crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! KIP-714 client metrics receiver: subscription config, client-instance
//! registry, OTLP decode, and the Prometheus + OTLP sinks.

pub(crate) mod config;
pub(crate) mod manager;
pub(crate) mod otlp;
pub(crate) mod otlp_sink;
pub(crate) mod prometheus_sink;
pub(crate) use manager::ClientMetricsManager;

use std::sync::Arc;
use std::time::Duration;

use self::otlp_sink::OtlpForwarder;
use self::prometheus_sink::ClientMetricsCollector;

/// Default `telemetry.max.bytes` (1 MiB), matching Kafka.
pub(crate) const DEFAULT_TELEMETRY_MAX_BYTES: i32 = 1_048_576;
/// Staleness TTL for the Prometheus snapshot.
pub(crate) const PROM_SNAPSHOT_TTL: Duration = Duration::from_mins(5);

/// Broker-held bundle: the manager (instance state + matching) plus the two
/// sinks. The Prometheus collector is shared with the metrics registry.
pub(crate) struct ClientMetrics {
    pub manager: ClientMetricsManager,
    pub prometheus: Arc<ClientMetricsCollector>,
    pub otlp: OtlpForwarder,
}

impl ClientMetrics {
    /// `otlp_endpoint` is `None` when OTLP forwarding is disabled.
    pub(crate) fn new(telemetry_max_bytes: i32, otlp_endpoint: Option<String>) -> Self {
        let otlp = match otlp_endpoint {
            Some(ep) => OtlpForwarder::spawn(ep, 256),
            None => OtlpForwarder::disabled(),
        };
        Self {
            manager: ClientMetricsManager::new(telemetry_max_bytes),
            prometheus: Arc::new(ClientMetricsCollector::new(PROM_SNAPSHOT_TTL)),
            otlp,
        }
    }
}