warm_channels 0.3.0

Always-ready HTTP client channels for gRPC or other RPC-like requests.
Documentation
use http::{Request, Response};
use lazy_static::lazy_static;
use pin_project_lite::pin_project;
use prometheus::{register_counter_vec, register_histogram_vec};
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use std::time::Instant;
use tonic::{Code, GrpcMethod};
use tower_service::Service;

const DEFAULT_HISTOGRAM_BUCKETS: [f64; 14] = [
    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
];

lazy_static! {
    static ref COUNTER_STARTED: prometheus::CounterVec = register_counter_vec!(
        "grpc_client_started_total",
        "Total number of client RPCs started.",
        &["grpc_service", "grpc_method"],
    )
    .unwrap();
    static ref COUNTER_HANDLED: prometheus::CounterVec = register_counter_vec!(
        "grpc_client_handled_total",
        "Total number of client RPCs completed, regardless of success or failure.",
        &["grpc_service", "grpc_method", "grpc_code"],
    )
    .unwrap();
    static ref HISTOGRAM: prometheus::HistogramVec = register_histogram_vec!(
        "grpc_client_handling_seconds",
        "Histogram for tracking client RPC duration",
        &["grpc_service", "grpc_method", "grpc_code"],
        DEFAULT_HISTOGRAM_BUCKETS.to_vec(),
    )
    .unwrap();
}

pin_project! {
    pub struct MetricsChannelFuture<F> {
        service: String,
        method: String,
        started_at: Option<Instant>,
        #[pin] inner: F,
    }
}

impl<F> MetricsChannelFuture<F> {
    pub fn new(service: String, method: String, inner: F) -> Self {
        Self {
            inner,
            started_at: None,
            service,
            method,
        }
    }
}

impl<F, O, E> Future for MetricsChannelFuture<F>
where
    F: Future<Output = Result<Response<O>, E>>,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let started_at = this.started_at.get_or_insert_with(|| {
            COUNTER_STARTED
                .with_label_values(&[this.service.as_str(), this.method.as_str()])
                .inc();
            Instant::now()
        });
        let v = ready!(this.inner.poll(cx));
        let code = v.as_ref().map_or(Code::Unknown, |resp| {
            resp.headers()
                .get("grpc-status")
                .map(|s| Code::from_bytes(s.as_bytes()))
                .unwrap_or(Code::Ok)
        });
        let code_str = format!("{:?}", code);
        let elapsed = Instant::now().duration_since(*started_at).as_secs_f64();
        COUNTER_HANDLED
            .with_label_values(&[
                this.service.as_str(),
                this.method.as_str(),
                code_str.as_str(),
            ])
            .inc();
        HISTOGRAM
            .with_label_values(&[
                this.service.as_str(),
                this.method.as_str(),
                code_str.as_str(),
            ])
            .observe(elapsed);
        Poll::Ready(v)
    }
}

pub struct MetricsChannel<T> {
    inner: T,
}

impl<T> MetricsChannel<T> {
    pub fn new(inner: T) -> Self {
        Self { inner }
    }
}

impl<I, O, T> Service<Request<I>> for MetricsChannel<T>
where
    T: Service<Request<I>, Response = Response<O>>,
    T::Future: Future<Output = Result<T::Response, T::Error>>,
{
    type Response = T::Response;
    type Error = T::Error;
    type Future = MetricsChannelFuture<T::Future>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Request<I>) -> Self::Future {
        let (service, method) = req
            .extensions()
            .get::<GrpcMethod>()
            .map_or(("", ""), |gm| (gm.service(), gm.method()));
        MetricsChannelFuture::new(service.into(), method.into(), self.inner.call(req))
    }
}