use std::pin::Pin;
use std::rc::Rc;
use futures::StreamExt;
use opentelemetry::metrics::MeterProvider as _;
use opentelemetry_otlp::{MetricExporter, WithExportConfig as _};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use crate::RunMode;
use crate::nodes::{FutStream, RunParams, StreamOperators};
use crate::types::*;
#[derive(Debug, Clone)]
pub struct OtlpConfig {
pub endpoint: String,
pub service_name: String,
}
pub trait OtlpPush<T: Element> {
#[must_use]
fn otlp_push(self: &Rc<Self>, metric_name: &str, config: OtlpConfig) -> Rc<dyn Node>;
}
impl<T: Element + Send + std::fmt::Display + 'static> OtlpPush<T> for dyn Stream<T> {
fn otlp_push(self: &Rc<Self>, metric_name: &str, config: OtlpConfig) -> Rc<dyn Node> {
let metric_name = metric_name.to_string();
let consumer = Box::new(move |ctx: RunParams, source: Pin<Box<dyn FutStream<T>>>| {
push_consumer(metric_name, config, ctx, source)
});
self.consume_async(consumer)
}
}
async fn push_consumer<T: Element + Send + std::fmt::Display>(
metric_name: String,
config: OtlpConfig,
ctx: RunParams,
mut source: Pin<Box<dyn FutStream<T>>>,
) -> anyhow::Result<()> {
if matches!(ctx.run_mode, RunMode::HistoricalFrom(_)) {
while source.next().await.is_some() {}
return Ok(());
}
let exporter = MetricExporter::builder()
.with_http()
.with_endpoint(&config.endpoint)
.build()
.map_err(|e| anyhow::anyhow!("otlp_push: failed to build exporter: {e}"))?;
let reader = PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_millis(500))
.build();
let resource = opentelemetry_sdk::Resource::builder_empty()
.with_service_name(config.service_name)
.build();
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
let meter = provider.meter("wingfoil");
let gauge = meter.f64_gauge(metric_name).build();
while let Some((_time, value)) = source.next().await {
let s = value.to_string();
let v: f64 = s.parse().unwrap_or_else(|_| {
log::warn!("otlp_push: could not parse {s:?} as f64, recording 0.0");
0.0
});
gauge.record(v, &[]);
}
drop(provider);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{NanoTime, RunFor, RunMode, nodes::*};
use std::time::Duration;
#[test]
fn historical_mode_drains_without_connecting() {
let config = OtlpConfig {
endpoint: "http://127.0.0.1:1".into(),
service_name: "test".into(),
};
let counter = ticker(Duration::from_millis(10)).count();
let node = counter.otlp_push("test_metric", config);
node.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Cycles(5))
.unwrap();
}
#[test]
fn bad_endpoint_is_handled_gracefully() {
let config = OtlpConfig {
endpoint: "http://127.0.0.1:1".into(),
service_name: "test".into(),
};
let counter = ticker(Duration::from_millis(50)).count();
let node = counter.otlp_push("bad_endpoint_metric", config);
let _ = node.run(RunMode::RealTime, RunFor::Cycles(1));
}
}