Skip to main content

tonic_prometheus_layer/
lib.rs

1//!
2//!
3//! # Tonic Prometheus Layer
4//! A lightweight Prometheus metrics layer for Tonic gRPC client and server
5//!
6//! It provides the following metrics:
7//! * `grpc_server_handled_total`: a **Counter** for tracking the total number of completed gRPC server calls.
8//! * `grpc_server_started_total`: a **Counter** for tracking the total number of gRPC server calls started.
9//!    The difference between this and `grpc_server_handled_total` equals the number of ongoing server requests.
10//! * `grpc_server_handling_seconds`: a **Histogram** for tracking gRPC server call duration.
11//! * `grpc_client_handled_total`: a **Counter** for tracking the total number of completed gRPC client calls.
12//! * `grpc_client_started_total`: a **Counter** for tracking the total number of gRPC client calls started.
13//!    The difference between this and `grpc_client_handled_total` equals the number of ongoing client requests.
14//! * `grpc_client_handling_seconds`: a **Histogram** for tracking gRPC client call duration.
15//!
16//! ## Usage
17//!
18//! Add `tonic_prometheus_layer` to your `Cargo.toml`.
19//! ```not_rust
20//! [dependencies]
21//! tonic_prometheus_layer = "0.3.0"
22//! ```
23//!
24//! ## Server Instrumentation
25//!
26//! Add a new layer to your tonic instance:
27//! ```rust,no_run
28//! use std::net::SocketAddr;
29//! use std::str::FromStr;
30//!
31//! use rocket::{get, routes};
32//! use rocket::http::Status;
33//! use rocket::response::content::RawText;
34//! use rocket::config::Shutdown;
35//! use rocket::response::status::Custom;
36//! use tonic_prometheus_layer::metrics::GlobalSettings;
37//!
38//! use crate::api::server;
39//! use crate::proto::service_server::ServiceServer;
40//!
41//! mod api;
42//! mod proto;
43//!
44//! #[tokio::main]
45//! async fn main() {
46//!     let addr: SocketAddr = "127.0.0.1:9090".parse().unwrap();
47//!
48//!     let service = server::Server {};
49//!
50//!     tonic_prometheus_layer::metrics::try_init_settings(GlobalSettings {
51//!         histogram_buckets: vec![0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0],
52//!         ..Default::default()
53//!     }).unwrap();
54//!
55//!     let metrics_layer = tonic_prometheus_layer::MetricsLayer::new();
56//!
57//!     tokio::spawn(async {
58//!         run_http_server("127.0.0.1:8090").await
59//!     });
60//!
61//!     tonic::transport::Server::builder()
62//!         .layer(metrics_layer)
63//!         .add_service(ServiceServer::new(service))
64//!         .serve(addr.into())
65//!         .await
66//!         .unwrap();
67//! }
68//!
69//! #[get("/metrics")]
70//! async fn metrics() -> Custom<RawText<String>> {
71//!     let body = tonic_prometheus_layer::metrics::encode_to_string().unwrap();
72//!
73//!     Custom(Status::Ok, RawText(body))
74//! }
75//!
76//! pub async fn run_http_server(addr: &str) {
77//!     let addr = SocketAddr::from_str(addr).unwrap();
78//!
79//!     let config = rocket::config::Config {
80//!         address: addr.ip(),
81//!         port: addr.port(),
82//!         shutdown: Shutdown {
83//!             ctrlc: false,
84//!             ..Default::default()
85//!         },
86//!         ..rocket::config::Config::release_default()
87//!     };
88//!
89//!     rocket::custom(config)
90//!         .mount("/", routes![metrics])
91//!         .launch()
92//!         .await
93//!         .unwrap();
94//! }
95//! ```
96//!
97//! ## Client Instrumentation
98//!
99//! Wrap each individual tonic client Channel object:
100//!
101//! ```
102//! #[tokio::main]
103//! async fn main() {
104//!     let channel = tonic::transport::Channel::from_static("http://localhost")
105//!         .connect()
106//!         .await
107//!         .unwrap();
108//!     let channel = tonic_prometheus_layer::MetricsChannel::new(channel);
109//!     let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
110//! }
111//! ```
112use std::future::Future;
113use std::num::NonZeroUsize;
114use std::pin::Pin;
115use std::task::{Context, Poll};
116use std::time::Instant;
117
118use pin_project::pin_project;
119use tonic::codegen::http::{request, response};
120use tonic::Code;
121use tower::{Layer, Service};
122
123use crate::metrics::{COUNTER_MP, GAUGE_MP, HISTOGRAM_MP};
124use crate::metrics::{COUNTER_SM, COUNTER_SMC, HISTOGRAM_SMC};
125
126mod client;
127pub mod metrics;
128
129/// Convert a `tonic::Code` to its canonical gRPC status code string.
130///
131/// Uses the standard uppercase/underscore names from the gRPC specification
132/// (<https://grpc.io/docs/guides/status-codes/>)
133/// (e.g. `OK`, `NOT_FOUND`, `INTERNAL`) instead of Rust Debug formatting
134/// (e.g. `Ok`, `NotFound`, `Internal`).
135fn grpc_code_to_str(code: Code) -> &'static str {
136    match code {
137        Code::Ok => "OK",
138        Code::Cancelled => "CANCELLED",
139        Code::Unknown => "UNKNOWN",
140        Code::InvalidArgument => "INVALID_ARGUMENT",
141        Code::DeadlineExceeded => "DEADLINE_EXCEEDED",
142        Code::NotFound => "NOT_FOUND",
143        Code::AlreadyExists => "ALREADY_EXISTS",
144        Code::PermissionDenied => "PERMISSION_DENIED",
145        Code::ResourceExhausted => "RESOURCE_EXHAUSTED",
146        Code::FailedPrecondition => "FAILED_PRECONDITION",
147        Code::Aborted => "ABORTED",
148        Code::OutOfRange => "OUT_OF_RANGE",
149        Code::Unimplemented => "UNIMPLEMENTED",
150        Code::Internal => "INTERNAL",
151        Code::Unavailable => "UNAVAILABLE",
152        Code::DataLoss => "DATA_LOSS",
153        Code::Unauthenticated => "UNAUTHENTICATED",
154    }
155}
156
157pub use client::MetricsChannel;
158
159#[derive(Clone, Default)]
160pub struct MetricsLayer {}
161
162impl MetricsLayer {
163    pub fn new() -> Self {
164        Default::default()
165    }
166}
167
168impl<S> Layer<S> for MetricsLayer {
169    type Service = MetricsService<S>;
170
171    fn layer(&self, inner: S) -> Self::Service {
172        MetricsService { service: inner }
173    }
174}
175
176#[derive(Clone)]
177pub struct MetricsService<S> {
178    service: S,
179}
180
181impl<S, B, C> Service<request::Request<B>> for MetricsService<S>
182where
183    S: Service<request::Request<B>, Response = response::Response<C>>,
184{
185    type Response = S::Response;
186    type Error = S::Error;
187    type Future = MetricsFuture<S::Future>;
188
189    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
190        self.service.poll_ready(cx)
191    }
192
193    fn call(&mut self, req: request::Request<B>) -> Self::Future {
194        let method = req.method().to_string();
195        let path = req.uri().path().to_owned();
196        let service_method_separator: Option<NonZeroUsize> = match path.chars().next() {
197            Some(first_char) if first_char == '/' => path[1..]
198                .find('/')
199                .map(|p| NonZeroUsize::new(p + 1).unwrap()),
200            _ => None,
201        };
202        let f = self.service.call(req);
203
204        MetricsFuture::new(method, path, service_method_separator, f)
205    }
206}
207
208#[pin_project]
209pub struct MetricsFuture<F> {
210    method: String,
211    path: String,
212    service_method_separator: Option<NonZeroUsize>,
213    started_at: Option<Instant>,
214    #[pin]
215    inner: F,
216}
217
218impl<F> MetricsFuture<F> {
219    pub fn new(
220        method: String,
221        path: String,
222        service_method_separator: Option<NonZeroUsize>,
223        inner: F,
224    ) -> Self {
225        Self {
226            started_at: None,
227            inner,
228            method,
229            path,
230            service_method_separator,
231        }
232    }
233}
234
235impl<F, B, E> Future for MetricsFuture<F>
236where
237    F: Future<Output = Result<response::Response<B>, E>>,
238{
239    type Output = F::Output;
240
241    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242        let this = self.project();
243
244        let (rpc_service, rpc_method) = match this.service_method_separator {
245            Some(sep) => (
246                &this.path[1..(*sep).into()],
247                &this.path[usize::from(*sep) + 1..],
248            ),
249            // If unparseable, say service is empty and method is the entire path
250            None => ("", this.path as &str),
251        };
252
253        let started_at = this.started_at.get_or_insert_with(|| {
254            GAUGE_MP
255                .with_label_values(&[this.method.as_str(), this.path.as_str()])
256                .inc();
257            COUNTER_SM
258                .with_label_values(&[rpc_service, rpc_method])
259                .inc();
260
261            Instant::now()
262        });
263
264        if let Poll::Ready(v) = this.inner.poll(cx) {
265            let code = v.as_ref().map_or(Code::Unknown, |resp| {
266                resp.headers()
267                    .get("grpc-status")
268                    .map(|s| Code::from_bytes(s.as_bytes()))
269                    .unwrap_or(Code::Ok)
270            });
271            let code_str = grpc_code_to_str(code);
272            let elapsed = Instant::now().duration_since(*started_at).as_secs_f64();
273            COUNTER_MP
274                .with_label_values(&[this.method.as_str(), this.path.as_str()])
275                .inc();
276            COUNTER_SMC
277                .with_label_values(&[rpc_service, rpc_method, code_str])
278                .inc();
279            HISTOGRAM_MP
280                .with_label_values(&[this.method.as_str(), this.path.as_str()])
281                .observe(elapsed);
282            HISTOGRAM_SMC
283                .with_label_values(&[rpc_service, rpc_method, code_str])
284                .observe(elapsed);
285            GAUGE_MP
286                .with_label_values(&[this.method.as_str(), this.path.as_str()])
287                .dec();
288
289            Poll::Ready(v)
290        } else {
291            Poll::Pending
292        }
293    }
294}