tonic_prometheus_layer/lib.rs
1//!
2//!
3//! # Tonic Prometheus Layer
4//! A lightweight Prometheus metrics layer for Tonic gRPC client and server
5//!
6//! It provides the following metrics:
7//! * `grpc_server_handled_total`: a **Counter** for tracking the total number of completed gRPC server calls.
8//! * `grpc_server_started_total`: a **Counter** for tracking the total number of gRPC server calls started.
9//! The difference between this and `grpc_server_handled_total` equals the number of ongoing server requests.
10//! * `grpc_server_handling_seconds`: a **Histogram** for tracking gRPC server call duration.
11//! * `grpc_client_handled_total`: a **Counter** for tracking the total number of completed gRPC client calls.
12//! * `grpc_client_started_total`: a **Counter** for tracking the total number of gRPC client calls started.
13//! The difference between this and `grpc_client_handled_total` equals the number of ongoing client requests.
14//! * `grpc_client_handling_seconds`: a **Histogram** for tracking gRPC client call duration.
15//!
16//! ## Usage
17//!
18//! Add `tonic_prometheus_layer` to your `Cargo.toml`.
19//! ```not_rust
20//! [dependencies]
21//! tonic_prometheus_layer = "0.3.0"
22//! ```
23//!
24//! ## Server Instrumentation
25//!
26//! Add a new layer to your tonic instance:
27//! ```rust,no_run
28//! use std::net::SocketAddr;
29//! use std::str::FromStr;
30//!
31//! use rocket::{get, routes};
32//! use rocket::http::Status;
33//! use rocket::response::content::RawText;
34//! use rocket::config::Shutdown;
35//! use rocket::response::status::Custom;
36//! use tonic_prometheus_layer::metrics::GlobalSettings;
37//!
38//! use crate::api::server;
39//! use crate::proto::service_server::ServiceServer;
40//!
41//! mod api;
42//! mod proto;
43//!
44//! #[tokio::main]
45//! async fn main() {
46//! let addr: SocketAddr = "127.0.0.1:9090".parse().unwrap();
47//!
48//! let service = server::Server {};
49//!
50//! tonic_prometheus_layer::metrics::try_init_settings(GlobalSettings {
51//! histogram_buckets: vec![0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0],
52//! ..Default::default()
53//! }).unwrap();
54//!
55//! let metrics_layer = tonic_prometheus_layer::MetricsLayer::new();
56//!
57//! tokio::spawn(async {
58//! run_http_server("127.0.0.1:8090").await
59//! });
60//!
61//! tonic::transport::Server::builder()
62//! .layer(metrics_layer)
63//! .add_service(ServiceServer::new(service))
64//! .serve(addr.into())
65//! .await
66//! .unwrap();
67//! }
68//!
69//! #[get("/metrics")]
70//! async fn metrics() -> Custom<RawText<String>> {
71//! let body = tonic_prometheus_layer::metrics::encode_to_string().unwrap();
72//!
73//! Custom(Status::Ok, RawText(body))
74//! }
75//!
76//! pub async fn run_http_server(addr: &str) {
77//! let addr = SocketAddr::from_str(addr).unwrap();
78//!
79//! let config = rocket::config::Config {
80//! address: addr.ip(),
81//! port: addr.port(),
82//! shutdown: Shutdown {
83//! ctrlc: false,
84//! ..Default::default()
85//! },
86//! ..rocket::config::Config::release_default()
87//! };
88//!
89//! rocket::custom(config)
90//! .mount("/", routes![metrics])
91//! .launch()
92//! .await
93//! .unwrap();
94//! }
95//! ```
96//!
97//! ## Client Instrumentation
98//!
99//! Wrap each individual tonic client Channel object:
100//!
101//! ```
102//! #[tokio::main]
103//! async fn main() {
104//! let channel = tonic::transport::Channel::from_static("http://localhost")
105//! .connect()
106//! .await
107//! .unwrap();
108//! let channel = tonic_prometheus_layer::MetricsChannel::new(channel);
109//! let mut client = tonic_health::pb::health_client::HealthClient::new(channel);
110//! }
111//! ```
112use std::future::Future;
113use std::num::NonZeroUsize;
114use std::pin::Pin;
115use std::task::{Context, Poll};
116use std::time::Instant;
117
118use pin_project::pin_project;
119use tonic::codegen::http::{request, response};
120use tonic::Code;
121use tower::{Layer, Service};
122
123use crate::metrics::{COUNTER_MP, GAUGE_MP, HISTOGRAM_MP};
124use crate::metrics::{COUNTER_SM, COUNTER_SMC, HISTOGRAM_SMC};
125
126mod client;
127pub mod metrics;
128
129/// Convert a `tonic::Code` to its canonical gRPC status code string.
130///
131/// Uses the standard uppercase/underscore names from the gRPC specification
132/// (<https://grpc.io/docs/guides/status-codes/>)
133/// (e.g. `OK`, `NOT_FOUND`, `INTERNAL`) instead of Rust Debug formatting
134/// (e.g. `Ok`, `NotFound`, `Internal`).
135fn grpc_code_to_str(code: Code) -> &'static str {
136 match code {
137 Code::Ok => "OK",
138 Code::Cancelled => "CANCELLED",
139 Code::Unknown => "UNKNOWN",
140 Code::InvalidArgument => "INVALID_ARGUMENT",
141 Code::DeadlineExceeded => "DEADLINE_EXCEEDED",
142 Code::NotFound => "NOT_FOUND",
143 Code::AlreadyExists => "ALREADY_EXISTS",
144 Code::PermissionDenied => "PERMISSION_DENIED",
145 Code::ResourceExhausted => "RESOURCE_EXHAUSTED",
146 Code::FailedPrecondition => "FAILED_PRECONDITION",
147 Code::Aborted => "ABORTED",
148 Code::OutOfRange => "OUT_OF_RANGE",
149 Code::Unimplemented => "UNIMPLEMENTED",
150 Code::Internal => "INTERNAL",
151 Code::Unavailable => "UNAVAILABLE",
152 Code::DataLoss => "DATA_LOSS",
153 Code::Unauthenticated => "UNAUTHENTICATED",
154 }
155}
156
157pub use client::MetricsChannel;
158
159#[derive(Clone, Default)]
160pub struct MetricsLayer {}
161
162impl MetricsLayer {
163 pub fn new() -> Self {
164 Default::default()
165 }
166}
167
168impl<S> Layer<S> for MetricsLayer {
169 type Service = MetricsService<S>;
170
171 fn layer(&self, inner: S) -> Self::Service {
172 MetricsService { service: inner }
173 }
174}
175
176#[derive(Clone)]
177pub struct MetricsService<S> {
178 service: S,
179}
180
181impl<S, B, C> Service<request::Request<B>> for MetricsService<S>
182where
183 S: Service<request::Request<B>, Response = response::Response<C>>,
184{
185 type Response = S::Response;
186 type Error = S::Error;
187 type Future = MetricsFuture<S::Future>;
188
189 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
190 self.service.poll_ready(cx)
191 }
192
193 fn call(&mut self, req: request::Request<B>) -> Self::Future {
194 let method = req.method().to_string();
195 let path = req.uri().path().to_owned();
196 let service_method_separator: Option<NonZeroUsize> = match path.chars().next() {
197 Some(first_char) if first_char == '/' => path[1..]
198 .find('/')
199 .map(|p| NonZeroUsize::new(p + 1).unwrap()),
200 _ => None,
201 };
202 let f = self.service.call(req);
203
204 MetricsFuture::new(method, path, service_method_separator, f)
205 }
206}
207
208#[pin_project]
209pub struct MetricsFuture<F> {
210 method: String,
211 path: String,
212 service_method_separator: Option<NonZeroUsize>,
213 started_at: Option<Instant>,
214 #[pin]
215 inner: F,
216}
217
218impl<F> MetricsFuture<F> {
219 pub fn new(
220 method: String,
221 path: String,
222 service_method_separator: Option<NonZeroUsize>,
223 inner: F,
224 ) -> Self {
225 Self {
226 started_at: None,
227 inner,
228 method,
229 path,
230 service_method_separator,
231 }
232 }
233}
234
235impl<F, B, E> Future for MetricsFuture<F>
236where
237 F: Future<Output = Result<response::Response<B>, E>>,
238{
239 type Output = F::Output;
240
241 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242 let this = self.project();
243
244 let (rpc_service, rpc_method) = match this.service_method_separator {
245 Some(sep) => (
246 &this.path[1..(*sep).into()],
247 &this.path[usize::from(*sep) + 1..],
248 ),
249 // If unparseable, say service is empty and method is the entire path
250 None => ("", this.path as &str),
251 };
252
253 let started_at = this.started_at.get_or_insert_with(|| {
254 GAUGE_MP
255 .with_label_values(&[this.method.as_str(), this.path.as_str()])
256 .inc();
257 COUNTER_SM
258 .with_label_values(&[rpc_service, rpc_method])
259 .inc();
260
261 Instant::now()
262 });
263
264 if let Poll::Ready(v) = this.inner.poll(cx) {
265 let code = v.as_ref().map_or(Code::Unknown, |resp| {
266 resp.headers()
267 .get("grpc-status")
268 .map(|s| Code::from_bytes(s.as_bytes()))
269 .unwrap_or(Code::Ok)
270 });
271 let code_str = grpc_code_to_str(code);
272 let elapsed = Instant::now().duration_since(*started_at).as_secs_f64();
273 COUNTER_MP
274 .with_label_values(&[this.method.as_str(), this.path.as_str()])
275 .inc();
276 COUNTER_SMC
277 .with_label_values(&[rpc_service, rpc_method, code_str])
278 .inc();
279 HISTOGRAM_MP
280 .with_label_values(&[this.method.as_str(), this.path.as_str()])
281 .observe(elapsed);
282 HISTOGRAM_SMC
283 .with_label_values(&[rpc_service, rpc_method, code_str])
284 .observe(elapsed);
285 GAUGE_MP
286 .with_label_values(&[this.method.as_str(), this.path.as_str()])
287 .dec();
288
289 Poll::Ready(v)
290 } else {
291 Poll::Pending
292 }
293 }
294}