use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use prometheus::{
Encoder, HistogramOpts, HistogramVec, IntCounterVec, Opts, Registry, TextEncoder,
};
use crate::runtime::{
Context, Handler, HandlerResult, Layer, Outgoing, PublishMiddleware, PublishNext,
};
const DURATION_BUCKETS: &[f64] = &[
0.000_5, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
];
type PublishFut<'a> =
Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'a>>;
struct Inner {
registry: Registry,
consumed: IntCounterVec,
consume_duration: HistogramVec,
published: IntCounterVec,
}
#[derive(Clone)]
pub struct Metrics {
inner: Arc<Inner>,
}
impl std::fmt::Debug for Metrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Metrics").finish_non_exhaustive()
}
}
impl Metrics {
pub fn new() -> Result<Self, prometheus::Error> {
Self::with_registry(prometheus::default_registry().clone())
}
pub fn with_registry(registry: Registry) -> Result<Self, prometheus::Error> {
let consumed = IntCounterVec::new(
Opts::new(
"ruststream_messages_consumed_total",
"Messages handled, by name and outcome.",
),
&["name", "status"],
)?;
let consume_duration = HistogramVec::new(
HistogramOpts::new(
"ruststream_consume_duration_seconds",
"Handler execution time, by name.",
)
.buckets(DURATION_BUCKETS.to_vec()),
&["name"],
)?;
let published = IntCounterVec::new(
Opts::new(
"ruststream_messages_published_total",
"Messages published, by name and outcome.",
),
&["name", "status"],
)?;
registry.register(Box::new(consumed.clone()))?;
registry.register(Box::new(consume_duration.clone()))?;
registry.register(Box::new(published.clone()))?;
Ok(Self {
inner: Arc::new(Inner {
registry,
consumed,
consume_duration,
published,
}),
})
}
#[must_use]
pub fn registry(&self) -> &Registry {
&self.inner.registry
}
#[must_use]
pub fn consume_layer(&self) -> MetricsLayer {
MetricsLayer {
inner: Arc::clone(&self.inner),
}
}
#[must_use]
pub fn publish_layer(&self) -> MetricsPublish {
MetricsPublish {
inner: Arc::clone(&self.inner),
}
}
pub fn export(&self) -> Result<String, prometheus::Error> {
let mut buf = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&self.inner.registry.gather(), &mut buf)?;
Ok(String::from_utf8_lossy(&buf).into_owned())
}
}
const fn consume_status(result: HandlerResult) -> &'static str {
match result {
HandlerResult::Ack => "ack",
HandlerResult::Nack { .. } => "nack",
}
}
#[derive(Clone)]
pub struct MetricsLayer {
inner: Arc<Inner>,
}
impl std::fmt::Debug for MetricsLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsLayer").finish_non_exhaustive()
}
}
impl<H> Layer<H> for MetricsLayer {
type Handler = MetricsHandler<H>;
fn layer(&self, inner: H) -> Self::Handler {
MetricsHandler {
inner,
metrics: Arc::clone(&self.inner),
}
}
}
#[derive(Clone)]
pub struct MetricsHandler<H> {
inner: H,
metrics: Arc<Inner>,
}
impl<H> std::fmt::Debug for MetricsHandler<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsHandler").finish_non_exhaustive()
}
}
impl<M, H> Handler<M> for MetricsHandler<H>
where
M: Sync,
H: Handler<M>,
{
fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = HandlerResult> + Send {
let name = ctx.name().to_owned();
async move {
let started = std::time::Instant::now();
let result = self.inner.handle(msg, ctx).await;
self.metrics
.consume_duration
.with_label_values(&[name.as_str()])
.observe(started.elapsed().as_secs_f64());
self.metrics
.consumed
.with_label_values(&[name.as_str(), consume_status(result)])
.inc();
result
}
}
}
#[derive(Clone)]
pub struct MetricsPublish {
inner: Arc<Inner>,
}
impl std::fmt::Debug for MetricsPublish {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsPublish").finish_non_exhaustive()
}
}
impl PublishMiddleware for MetricsPublish {
fn on_publish<'a>(&'a self, out: &'a mut Outgoing, next: PublishNext<'a>) -> PublishFut<'a> {
let name = out.name().to_owned();
Box::pin(async move {
let result = next.run(out).await;
let status = if result.is_ok() { "ok" } else { "error" };
self.inner
.published
.with_label_values(&[name.as_str(), status])
.inc();
result
})
}
}