tako/plugins/
metrics.rs

1#![cfg_attr(docsrs, doc(cfg(feature = "plugins")))]
2//! Metrics and tracing plugin for integrating Tako's signal system with
3//! backends like Prometheus or OpenTelemetry.
4//!
5//! This plugin listens to application-level and route-level signals and
6//! updates metrics using an injected backend implementation. When the
7//! `metrics-prometheus` or `metrics-opentelemetry` features are enabled,
8//! a concrete backend is provided based on the selected feature, while
9//! the core plugin logic remains backend-agnostic.
10
11use anyhow::Result;
12use std::sync::Arc;
13
14#[cfg(feature = "signals")]
15use crate::signals::{Signal, app_events, ids};
16use crate::{plugins::TakoPlugin, router::Router};
17
18#[cfg(feature = "metrics-prometheus")]
19use crate::{Method, extractors::state::State, responder::Responder};
20#[cfg(feature = "metrics-prometheus")]
21use prometheus::{Encoder, Registry, TextEncoder};
22
23/// Common interface for metrics backends used by the metrics plugin.
24///
25/// Backend implementations translate Tako signals into metrics updates
26/// or tracing events in external systems.
27#[cfg(feature = "signals")]
28pub trait MetricsBackend: Send + Sync + 'static {
29  /// Called when a request is completed at the app level.
30  fn on_request_completed(&self, signal: &Signal);
31
32  /// Called when a route-level request is completed.
33  fn on_route_request_completed(&self, signal: &Signal);
34
35  /// Called when a connection is opened.
36  fn on_connection_opened(&self, signal: &Signal);
37
38  /// Called when a connection is closed.
39  fn on_connection_closed(&self, signal: &Signal);
40}
41
42/// Metrics plugin that subscribes to Tako's signal bus and forwards
43/// events to a configurable metrics backend.
44#[cfg(feature = "signals")]
45pub struct MetricsPlugin<B: MetricsBackend> {
46  backend: Arc<B>,
47}
48
49#[cfg(feature = "signals")]
50impl<B: MetricsBackend> Clone for MetricsPlugin<B> {
51  fn clone(&self) -> Self {
52    Self {
53      backend: Arc::clone(&self.backend),
54    }
55  }
56}
57
58#[cfg(feature = "signals")]
59impl<B: MetricsBackend> MetricsPlugin<B> {
60  /// Creates a new metrics plugin using the provided backend.
61  pub fn new(backend: B) -> Self {
62    Self {
63      backend: Arc::new(backend),
64    }
65  }
66}
67
68#[cfg(feature = "signals")]
69impl<B: MetricsBackend> TakoPlugin for MetricsPlugin<B> {
70  fn name(&self) -> &'static str {
71    "MetricsPlugin"
72  }
73
74  #[cfg(feature = "signals")]
75  fn setup(&self, _router: &Router) -> Result<()> {
76    let backend = self.backend.clone();
77    let app_arbiter = app_events();
78
79    // App-level request.completed metrics
80    app_arbiter.on(ids::REQUEST_COMPLETED, move |signal: Signal| {
81      let backend = backend.clone();
82      async move {
83        backend.on_request_completed(&signal);
84      }
85    });
86
87    // Connection lifetime metrics
88    let backend_conn = self.backend.clone();
89    app_arbiter.on(ids::CONNECTION_OPENED, move |signal: Signal| {
90      let backend = backend_conn.clone();
91      async move {
92        backend.on_connection_opened(&signal);
93      }
94    });
95
96    let backend_close = self.backend.clone();
97    app_arbiter.on(ids::CONNECTION_CLOSED, move |signal: Signal| {
98      let backend = backend_close.clone();
99      async move {
100        backend.on_connection_closed(&signal);
101      }
102    });
103
104    // Route-level request.completed metrics via prefix subscription
105    let backend_route = self.backend.clone();
106    let mut rx = app_arbiter.subscribe_prefix("route.request.");
107    tokio::spawn(async move {
108      while let Ok(signal) = rx.recv().await {
109        backend_route.on_route_request_completed(&signal);
110      }
111    });
112
113    Ok(())
114  }
115
116  #[cfg(not(feature = "signals"))]
117  fn setup(&self, _router: &Router) -> Result<()> {
118    // Metrics plugin is a no-op when signals are disabled.
119    Ok(())
120  }
121}
122
123/// Prometheus backend implementation.
124#[cfg(feature = "metrics-prometheus")]
125pub mod prometheus_backend {
126  use super::{MetricsBackend, Signal};
127  use prometheus::{IntCounterVec, Opts, Registry};
128  use std::sync::Arc;
129
130  /// Basic Prometheus metrics backend that tracks HTTP request counts
131  /// and connection counts using labels for method, path, and status.
132  pub struct PrometheusMetricsBackend {
133    registry: Registry,
134    http_requests_total: IntCounterVec,
135    http_route_requests_total: IntCounterVec,
136    connections_opened_total: IntCounterVec,
137    connections_closed_total: IntCounterVec,
138  }
139
140  impl PrometheusMetricsBackend {
141    pub fn new(registry: Registry) -> Self {
142      let http_requests_total = IntCounterVec::new(
143        Opts::new("tako_http_requests_total", "Total HTTP requests completed"),
144        &["method", "path", "status"],
145      )
146      .expect("failed to create http_requests_total metric");
147
148      let http_route_requests_total = IntCounterVec::new(
149        Opts::new(
150          "tako_route_requests_total",
151          "Total route-level HTTP requests completed",
152        ),
153        &["method", "path", "status"],
154      )
155      .expect("failed to create route_requests_total metric");
156
157      let connections_opened_total = IntCounterVec::new(
158        Opts::new("tako_connections_opened_total", "Total connections opened"),
159        &["remote_addr"],
160      )
161      .expect("failed to create connections_opened_total metric");
162
163      let connections_closed_total = IntCounterVec::new(
164        Opts::new("tako_connections_closed_total", "Total connections closed"),
165        &["remote_addr"],
166      )
167      .expect("failed to create connections_closed_total metric");
168
169      registry
170        .register(Box::new(http_requests_total.clone()))
171        .expect("failed to register http_requests_total");
172      registry
173        .register(Box::new(http_route_requests_total.clone()))
174        .expect("failed to register http_route_requests_total");
175      registry
176        .register(Box::new(connections_opened_total.clone()))
177        .expect("failed to register connections_opened_total");
178      registry
179        .register(Box::new(connections_closed_total.clone()))
180        .expect("failed to register connections_closed_total");
181
182      Self {
183        registry,
184        http_requests_total,
185        http_route_requests_total,
186        connections_opened_total,
187        connections_closed_total,
188      }
189    }
190
191    pub fn registry(&self) -> &Registry {
192      &self.registry
193    }
194  }
195
196  impl MetricsBackend for Arc<PrometheusMetricsBackend> {
197    fn on_request_completed(&self, signal: &Signal) {
198      let method = signal
199        .metadata
200        .get("method")
201        .map(String::as_str)
202        .unwrap_or("");
203      let path = signal
204        .metadata
205        .get("path")
206        .map(String::as_str)
207        .unwrap_or("");
208      let status = signal
209        .metadata
210        .get("status")
211        .map(String::as_str)
212        .unwrap_or("");
213      self
214        .http_requests_total
215        .with_label_values(&[method, path, status])
216        .inc();
217    }
218
219    fn on_route_request_completed(&self, signal: &Signal) {
220      let method = signal
221        .metadata
222        .get("method")
223        .map(String::as_str)
224        .unwrap_or("");
225      let path = signal
226        .metadata
227        .get("path")
228        .map(String::as_str)
229        .unwrap_or("");
230      let status = signal
231        .metadata
232        .get("status")
233        .map(String::as_str)
234        .unwrap_or("");
235      self
236        .http_route_requests_total
237        .with_label_values(&[method, path, status])
238        .inc();
239    }
240
241    fn on_connection_opened(&self, signal: &Signal) {
242      let addr = signal
243        .metadata
244        .get("remote_addr")
245        .map(String::as_str)
246        .unwrap_or("");
247      self
248        .connections_opened_total
249        .with_label_values(&[addr])
250        .inc();
251    }
252
253    fn on_connection_closed(&self, signal: &Signal) {
254      let addr = signal
255        .metadata
256        .get("remote_addr")
257        .map(String::as_str)
258        .unwrap_or("");
259      self
260        .connections_closed_total
261        .with_label_values(&[addr])
262        .inc();
263    }
264  }
265}
266
267/// OpenTelemetry backend implementation.
268#[cfg(feature = "metrics-opentelemetry")]
269pub mod opentelemetry_backend {
270  use super::{MetricsBackend, Signal};
271  use opentelemetry::KeyValue;
272  use opentelemetry::metrics::{Counter, Meter};
273
274  /// Basic OpenTelemetry metrics backend that records counters using the
275  /// global meter provider. Users are expected to configure an exporter
276  /// (e.g. OTLP, Prometheus) separately.
277  pub struct OtelMetricsBackend {
278    http_requests_total: Counter<u64>,
279    http_route_requests_total: Counter<u64>,
280    connections_opened_total: Counter<u64>,
281    connections_closed_total: Counter<u64>,
282  }
283
284  impl OtelMetricsBackend {
285    pub fn new(meter: Meter) -> Self {
286      let http_requests_total = meter.u64_counter("tako_http_requests_total").init();
287      let http_route_requests_total = meter.u64_counter("tako_route_requests_total").init();
288      let connections_opened_total = meter.u64_counter("tako_connections_opened_total").init();
289      let connections_closed_total = meter.u64_counter("tako_connections_closed_total").init();
290
291      Self {
292        http_requests_total,
293        http_route_requests_total,
294        connections_opened_total,
295        connections_closed_total,
296      }
297    }
298  }
299
300  impl MetricsBackend for OtelMetricsBackend {
301    fn on_request_completed(&self, signal: &Signal) {
302      let method = signal.metadata.get("method").cloned().unwrap_or_default();
303      let path = signal.metadata.get("path").cloned().unwrap_or_default();
304      let status = signal.metadata.get("status").cloned().unwrap_or_default();
305      self.http_requests_total.add(
306        1,
307        &[
308          KeyValue::new("method", method),
309          KeyValue::new("path", path),
310          KeyValue::new("status", status),
311        ],
312      );
313    }
314
315    fn on_route_request_completed(&self, signal: &Signal) {
316      let method = signal.metadata.get("method").cloned().unwrap_or_default();
317      let path = signal.metadata.get("path").cloned().unwrap_or_default();
318      let status = signal.metadata.get("status").cloned().unwrap_or_default();
319      self.http_route_requests_total.add(
320        1,
321        &[
322          KeyValue::new("method", method),
323          KeyValue::new("path", path),
324          KeyValue::new("status", status),
325        ],
326      );
327    }
328
329    fn on_connection_opened(&self, signal: &Signal) {
330      let addr = signal
331        .metadata
332        .get("remote_addr")
333        .cloned()
334        .unwrap_or_default();
335      self
336        .connections_opened_total
337        .add(1, &[KeyValue::new("remote_addr", addr)]);
338    }
339
340    fn on_connection_closed(&self, signal: &Signal) {
341      let addr = signal
342        .metadata
343        .get("remote_addr")
344        .cloned()
345        .unwrap_or_default();
346      self
347        .connections_closed_total
348        .add(1, &[KeyValue::new("remote_addr", addr)]);
349    }
350  }
351}
352
353#[cfg(feature = "metrics-prometheus")]
354#[derive(Clone)]
355pub struct PrometheusMetricsConfig {
356  /// HTTP path where the Prometheus scrape endpoint will be exposed.
357  pub endpoint_path: String,
358}
359
360#[cfg(feature = "metrics-prometheus")]
361impl Default for PrometheusMetricsConfig {
362  fn default() -> Self {
363    Self {
364      endpoint_path: "/metrics".to_string(),
365    }
366  }
367}
368
369#[cfg(feature = "metrics-prometheus")]
370impl PrometheusMetricsConfig {
371  /// Installs a Prometheus metrics backend and a scrape endpoint on the router.
372  pub fn install(self, router: &mut Router) -> Arc<Registry> {
373    let registry = Arc::new(Registry::new());
374    let backend = prometheus_backend::PrometheusMetricsBackend::new((*registry).clone());
375    let plugin = MetricsPlugin::new(Arc::new(backend));
376
377    router.plugin(plugin);
378    router.state(registry.clone());
379
380    let path = self.endpoint_path;
381    router.route(Method::GET, &path, prometheus_metrics_handler);
382
383    registry
384  }
385}
386
387#[cfg(feature = "metrics-prometheus")]
388async fn prometheus_metrics_handler(State(registry): State<Arc<Registry>>) -> impl Responder {
389  let encoder = TextEncoder::new();
390  let metric_families = registry.gather();
391
392  let mut buf = Vec::new();
393  if encoder.encode(&metric_families, &mut buf).is_err() {
394    return "failed to encode metrics".to_string();
395  }
396
397  String::from_utf8(buf).unwrap_or_default()
398}
399
400#[cfg(feature = "metrics-opentelemetry")]
401#[derive(Clone)]
402pub struct OtelMetricsConfig {
403  /// Name for the OpenTelemetry meter used by Tako.
404  pub meter_name: &'static str,
405}
406
407#[cfg(feature = "metrics-opentelemetry")]
408impl Default for OtelMetricsConfig {
409  fn default() -> Self {
410    Self { meter_name: "tako" }
411  }
412}
413
414#[cfg(feature = "metrics-opentelemetry")]
415impl OtelMetricsConfig {
416  /// Installs an OpenTelemetry metrics backend using the global meter provider.
417  pub fn install(self, router: &mut Router) {
418    use opentelemetry::global;
419
420    let meter = global::meter(self.meter_name);
421    let backend = opentelemetry_backend::OtelMetricsBackend::new(meter);
422    let plugin = MetricsPlugin::new(backend);
423
424    router.plugin(plugin);
425  }
426}