1#![cfg_attr(docsrs, doc(cfg(feature = "plugins")))]
2use anyhow::Result;
12use std::sync::Arc;
13
14#[cfg(feature = "signals")]
15use crate::signals::{Signal, app_events, ids};
16use crate::{plugins::TakoPlugin, router::Router};
17
18#[cfg(feature = "metrics-prometheus")]
19use crate::{Method, extractors::state::State, responder::Responder};
20#[cfg(feature = "metrics-prometheus")]
21use prometheus::{Encoder, Registry, TextEncoder};
22
23#[cfg(feature = "signals")]
28pub trait MetricsBackend: Send + Sync + 'static {
29 fn on_request_completed(&self, signal: &Signal);
31
32 fn on_route_request_completed(&self, signal: &Signal);
34
35 fn on_connection_opened(&self, signal: &Signal);
37
38 fn on_connection_closed(&self, signal: &Signal);
40}
41
42#[cfg(feature = "signals")]
45pub struct MetricsPlugin<B: MetricsBackend> {
46 backend: Arc<B>,
47}
48
49#[cfg(feature = "signals")]
50impl<B: MetricsBackend> Clone for MetricsPlugin<B> {
51 fn clone(&self) -> Self {
52 Self {
53 backend: Arc::clone(&self.backend),
54 }
55 }
56}
57
58#[cfg(feature = "signals")]
59impl<B: MetricsBackend> MetricsPlugin<B> {
60 pub fn new(backend: B) -> Self {
62 Self {
63 backend: Arc::new(backend),
64 }
65 }
66}
67
68#[cfg(feature = "signals")]
69impl<B: MetricsBackend> TakoPlugin for MetricsPlugin<B> {
70 fn name(&self) -> &'static str {
71 "MetricsPlugin"
72 }
73
74 #[cfg(feature = "signals")]
75 fn setup(&self, _router: &Router) -> Result<()> {
76 let backend = self.backend.clone();
77 let app_arbiter = app_events();
78
79 app_arbiter.on(ids::REQUEST_COMPLETED, move |signal: Signal| {
81 let backend = backend.clone();
82 async move {
83 backend.on_request_completed(&signal);
84 }
85 });
86
87 let backend_conn = self.backend.clone();
89 app_arbiter.on(ids::CONNECTION_OPENED, move |signal: Signal| {
90 let backend = backend_conn.clone();
91 async move {
92 backend.on_connection_opened(&signal);
93 }
94 });
95
96 let backend_close = self.backend.clone();
97 app_arbiter.on(ids::CONNECTION_CLOSED, move |signal: Signal| {
98 let backend = backend_close.clone();
99 async move {
100 backend.on_connection_closed(&signal);
101 }
102 });
103
104 let backend_route = self.backend.clone();
106 let mut rx = app_arbiter.subscribe_prefix("route.request.");
107 tokio::spawn(async move {
108 while let Ok(signal) = rx.recv().await {
109 backend_route.on_route_request_completed(&signal);
110 }
111 });
112
113 Ok(())
114 }
115
116 #[cfg(not(feature = "signals"))]
117 fn setup(&self, _router: &Router) -> Result<()> {
118 Ok(())
120 }
121}
122
123#[cfg(feature = "metrics-prometheus")]
125pub mod prometheus_backend {
126 use super::{MetricsBackend, Signal};
127 use prometheus::{IntCounterVec, Opts, Registry};
128 use std::sync::Arc;
129
130 pub struct PrometheusMetricsBackend {
133 registry: Registry,
134 http_requests_total: IntCounterVec,
135 http_route_requests_total: IntCounterVec,
136 connections_opened_total: IntCounterVec,
137 connections_closed_total: IntCounterVec,
138 }
139
140 impl PrometheusMetricsBackend {
141 pub fn new(registry: Registry) -> Self {
142 let http_requests_total = IntCounterVec::new(
143 Opts::new("tako_http_requests_total", "Total HTTP requests completed"),
144 &["method", "path", "status"],
145 )
146 .expect("failed to create http_requests_total metric");
147
148 let http_route_requests_total = IntCounterVec::new(
149 Opts::new(
150 "tako_route_requests_total",
151 "Total route-level HTTP requests completed",
152 ),
153 &["method", "path", "status"],
154 )
155 .expect("failed to create route_requests_total metric");
156
157 let connections_opened_total = IntCounterVec::new(
158 Opts::new("tako_connections_opened_total", "Total connections opened"),
159 &["remote_addr"],
160 )
161 .expect("failed to create connections_opened_total metric");
162
163 let connections_closed_total = IntCounterVec::new(
164 Opts::new("tako_connections_closed_total", "Total connections closed"),
165 &["remote_addr"],
166 )
167 .expect("failed to create connections_closed_total metric");
168
169 registry
170 .register(Box::new(http_requests_total.clone()))
171 .expect("failed to register http_requests_total");
172 registry
173 .register(Box::new(http_route_requests_total.clone()))
174 .expect("failed to register http_route_requests_total");
175 registry
176 .register(Box::new(connections_opened_total.clone()))
177 .expect("failed to register connections_opened_total");
178 registry
179 .register(Box::new(connections_closed_total.clone()))
180 .expect("failed to register connections_closed_total");
181
182 Self {
183 registry,
184 http_requests_total,
185 http_route_requests_total,
186 connections_opened_total,
187 connections_closed_total,
188 }
189 }
190
191 pub fn registry(&self) -> &Registry {
192 &self.registry
193 }
194 }
195
196 impl MetricsBackend for Arc<PrometheusMetricsBackend> {
197 fn on_request_completed(&self, signal: &Signal) {
198 let method = signal
199 .metadata
200 .get("method")
201 .map(String::as_str)
202 .unwrap_or("");
203 let path = signal
204 .metadata
205 .get("path")
206 .map(String::as_str)
207 .unwrap_or("");
208 let status = signal
209 .metadata
210 .get("status")
211 .map(String::as_str)
212 .unwrap_or("");
213 self
214 .http_requests_total
215 .with_label_values(&[method, path, status])
216 .inc();
217 }
218
219 fn on_route_request_completed(&self, signal: &Signal) {
220 let method = signal
221 .metadata
222 .get("method")
223 .map(String::as_str)
224 .unwrap_or("");
225 let path = signal
226 .metadata
227 .get("path")
228 .map(String::as_str)
229 .unwrap_or("");
230 let status = signal
231 .metadata
232 .get("status")
233 .map(String::as_str)
234 .unwrap_or("");
235 self
236 .http_route_requests_total
237 .with_label_values(&[method, path, status])
238 .inc();
239 }
240
241 fn on_connection_opened(&self, signal: &Signal) {
242 let addr = signal
243 .metadata
244 .get("remote_addr")
245 .map(String::as_str)
246 .unwrap_or("");
247 self
248 .connections_opened_total
249 .with_label_values(&[addr])
250 .inc();
251 }
252
253 fn on_connection_closed(&self, signal: &Signal) {
254 let addr = signal
255 .metadata
256 .get("remote_addr")
257 .map(String::as_str)
258 .unwrap_or("");
259 self
260 .connections_closed_total
261 .with_label_values(&[addr])
262 .inc();
263 }
264 }
265}
266
267#[cfg(feature = "metrics-opentelemetry")]
269pub mod opentelemetry_backend {
270 use super::{MetricsBackend, Signal};
271 use opentelemetry::KeyValue;
272 use opentelemetry::metrics::{Counter, Meter};
273
274 pub struct OtelMetricsBackend {
278 http_requests_total: Counter<u64>,
279 http_route_requests_total: Counter<u64>,
280 connections_opened_total: Counter<u64>,
281 connections_closed_total: Counter<u64>,
282 }
283
284 impl OtelMetricsBackend {
285 pub fn new(meter: Meter) -> Self {
286 let http_requests_total = meter.u64_counter("tako_http_requests_total").init();
287 let http_route_requests_total = meter.u64_counter("tako_route_requests_total").init();
288 let connections_opened_total = meter.u64_counter("tako_connections_opened_total").init();
289 let connections_closed_total = meter.u64_counter("tako_connections_closed_total").init();
290
291 Self {
292 http_requests_total,
293 http_route_requests_total,
294 connections_opened_total,
295 connections_closed_total,
296 }
297 }
298 }
299
300 impl MetricsBackend for OtelMetricsBackend {
301 fn on_request_completed(&self, signal: &Signal) {
302 let method = signal.metadata.get("method").cloned().unwrap_or_default();
303 let path = signal.metadata.get("path").cloned().unwrap_or_default();
304 let status = signal.metadata.get("status").cloned().unwrap_or_default();
305 self.http_requests_total.add(
306 1,
307 &[
308 KeyValue::new("method", method),
309 KeyValue::new("path", path),
310 KeyValue::new("status", status),
311 ],
312 );
313 }
314
315 fn on_route_request_completed(&self, signal: &Signal) {
316 let method = signal.metadata.get("method").cloned().unwrap_or_default();
317 let path = signal.metadata.get("path").cloned().unwrap_or_default();
318 let status = signal.metadata.get("status").cloned().unwrap_or_default();
319 self.http_route_requests_total.add(
320 1,
321 &[
322 KeyValue::new("method", method),
323 KeyValue::new("path", path),
324 KeyValue::new("status", status),
325 ],
326 );
327 }
328
329 fn on_connection_opened(&self, signal: &Signal) {
330 let addr = signal
331 .metadata
332 .get("remote_addr")
333 .cloned()
334 .unwrap_or_default();
335 self
336 .connections_opened_total
337 .add(1, &[KeyValue::new("remote_addr", addr)]);
338 }
339
340 fn on_connection_closed(&self, signal: &Signal) {
341 let addr = signal
342 .metadata
343 .get("remote_addr")
344 .cloned()
345 .unwrap_or_default();
346 self
347 .connections_closed_total
348 .add(1, &[KeyValue::new("remote_addr", addr)]);
349 }
350 }
351}
352
353#[cfg(feature = "metrics-prometheus")]
354#[derive(Clone)]
355pub struct PrometheusMetricsConfig {
356 pub endpoint_path: String,
358}
359
360#[cfg(feature = "metrics-prometheus")]
361impl Default for PrometheusMetricsConfig {
362 fn default() -> Self {
363 Self {
364 endpoint_path: "/metrics".to_string(),
365 }
366 }
367}
368
369#[cfg(feature = "metrics-prometheus")]
370impl PrometheusMetricsConfig {
371 pub fn install(self, router: &mut Router) -> Arc<Registry> {
373 let registry = Arc::new(Registry::new());
374 let backend = prometheus_backend::PrometheusMetricsBackend::new((*registry).clone());
375 let plugin = MetricsPlugin::new(Arc::new(backend));
376
377 router.plugin(plugin);
378 router.state(registry.clone());
379
380 let path = self.endpoint_path;
381 router.route(Method::GET, &path, prometheus_metrics_handler);
382
383 registry
384 }
385}
386
387#[cfg(feature = "metrics-prometheus")]
388async fn prometheus_metrics_handler(State(registry): State<Arc<Registry>>) -> impl Responder {
389 let encoder = TextEncoder::new();
390 let metric_families = registry.gather();
391
392 let mut buf = Vec::new();
393 if encoder.encode(&metric_families, &mut buf).is_err() {
394 return "failed to encode metrics".to_string();
395 }
396
397 String::from_utf8(buf).unwrap_or_default()
398}
399
400#[cfg(feature = "metrics-opentelemetry")]
401#[derive(Clone)]
402pub struct OtelMetricsConfig {
403 pub meter_name: &'static str,
405}
406
407#[cfg(feature = "metrics-opentelemetry")]
408impl Default for OtelMetricsConfig {
409 fn default() -> Self {
410 Self { meter_name: "tako" }
411 }
412}
413
414#[cfg(feature = "metrics-opentelemetry")]
415impl OtelMetricsConfig {
416 pub fn install(self, router: &mut Router) {
418 use opentelemetry::global;
419
420 let meter = global::meter(self.meter_name);
421 let backend = opentelemetry_backend::OtelMetricsBackend::new(meter);
422 let plugin = MetricsPlugin::new(backend);
423
424 router.plugin(plugin);
425 }
426}