use std::sync::Arc;
use opentelemetry::metrics::{Gauge, Meter, MeterProvider};
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use buswatch_types::Snapshot;
#[derive(Debug, Clone)]
pub struct OtelConfig {
pub endpoint: String,
pub service_name: String,
}
impl OtelConfig {
pub fn builder() -> OtelConfigBuilder {
OtelConfigBuilder::default()
}
}
#[derive(Debug, Default)]
pub struct OtelConfigBuilder {
endpoint: Option<String>,
service_name: Option<String>,
}
impl OtelConfigBuilder {
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
pub fn service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = Some(name.into());
self
}
pub fn build(self) -> OtelConfig {
OtelConfig {
endpoint: self
.endpoint
.unwrap_or_else(|| "http://localhost:4318".to_string()),
service_name: self.service_name.unwrap_or_else(|| "buswatch".to_string()),
}
}
}
pub struct OtelExporter {
meter: Meter,
_provider: Arc<SdkMeterProvider>,
read_count: Gauge<u64>,
read_backlog: Gauge<u64>,
read_pending: Gauge<u64>,
read_rate: Gauge<f64>,
write_count: Gauge<u64>,
write_pending: Gauge<u64>,
write_rate: Gauge<f64>,
}
impl OtelExporter {
pub fn new(config: &OtelConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
use opentelemetry_otlp::MetricExporter;
use opentelemetry_sdk::metrics::PeriodicReader;
use opentelemetry_sdk::Resource;
let exporter = MetricExporter::builder()
.with_http()
.with_endpoint(format!("{}/v1/metrics", config.endpoint))
.build()?;
let reader = PeriodicReader::builder(exporter).build();
let resource = Resource::builder()
.with_service_name(config.service_name.clone())
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
let meter = provider.meter("buswatch");
let provider = Arc::new(provider);
let read_count = meter
.u64_gauge("buswatch.read.count")
.with_description("Total messages read from a topic")
.build();
let read_backlog = meter
.u64_gauge("buswatch.read.backlog")
.with_description("Estimated message backlog (unread messages)")
.build();
let read_pending = meter
.u64_gauge("buswatch.read.pending")
.with_description("Number of pending read operations")
.build();
let read_rate = meter
.f64_gauge("buswatch.read.rate")
.with_description("Read rate in messages per second")
.build();
let write_count = meter
.u64_gauge("buswatch.write.count")
.with_description("Total messages written to a topic")
.build();
let write_pending = meter
.u64_gauge("buswatch.write.pending")
.with_description("Number of pending write operations")
.build();
let write_rate = meter
.f64_gauge("buswatch.write.rate")
.with_description("Write rate in messages per second")
.build();
Ok(Self {
meter,
_provider: provider,
read_count,
read_backlog,
read_pending,
read_rate,
write_count,
write_pending,
write_rate,
})
}
pub fn record(&self, snapshot: &Snapshot) {
for (module_name, module_metrics) in &snapshot.modules {
for (topic, read_metrics) in &module_metrics.reads {
let attributes = [
KeyValue::new("module", module_name.clone()),
KeyValue::new("topic", topic.clone()),
];
self.read_count.record(read_metrics.count, &attributes);
if let Some(backlog) = read_metrics.backlog {
self.read_backlog.record(backlog, &attributes);
}
if let Some(pending) = read_metrics.pending {
self.read_pending.record(pending.as_micros(), &attributes);
}
if let Some(rate) = read_metrics.rate {
self.read_rate.record(rate, &attributes);
}
}
for (topic, write_metrics) in &module_metrics.writes {
let attributes = [
KeyValue::new("module", module_name.clone()),
KeyValue::new("topic", topic.clone()),
];
self.write_count.record(write_metrics.count, &attributes);
if let Some(pending) = write_metrics.pending {
self.write_pending.record(pending.as_micros(), &attributes);
}
if let Some(rate) = write_metrics.rate {
self.write_rate.record(rate, &attributes);
}
}
}
}
pub fn meter(&self) -> &Meter {
&self.meter
}
}
impl std::fmt::Debug for OtelExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtelExporter")
.field("meter", &"Meter { ... }")
.finish()
}
}