1#![cfg_attr(docsrs, feature(doc_cfg))]
21#![deny(missing_docs)]
22
23use opendal_core::raw::*;
24use opendal_layer_observe_metrics_common as observe;
25use prometheus_client::encoding::EncodeLabel;
26use prometheus_client::encoding::EncodeLabelSet;
27use prometheus_client::encoding::LabelSetEncoder;
28use prometheus_client::metrics::counter::Counter;
29use prometheus_client::metrics::family::Family;
30use prometheus_client::metrics::family::MetricConstructor;
31use prometheus_client::metrics::gauge::Gauge;
32use prometheus_client::metrics::histogram::Histogram;
33use prometheus_client::registry::Metric;
34use prometheus_client::registry::Registry;
35use prometheus_client::registry::Unit;
36
37#[derive(Clone)]
80pub struct PrometheusClientLayer {
81 interceptor: PrometheusClientInterceptor,
82}
83
84impl PrometheusClientLayer {
85 pub fn builder() -> PrometheusClientLayerBuilder {
87 PrometheusClientLayerBuilder::default()
88 }
89}
90
91impl<A: Access> Layer<A> for PrometheusClientLayer {
92 type LayeredAccess = observe::MetricsAccessor<A, PrometheusClientInterceptor>;
93
94 fn layer(&self, inner: A) -> Self::LayeredAccess {
95 observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
96 }
97}
98
99pub struct PrometheusClientLayerBuilder {
101 bytes_buckets: Vec<f64>,
102 bytes_rate_buckets: Vec<f64>,
103 entries_buckets: Vec<f64>,
104 entries_rate_buckets: Vec<f64>,
105 duration_seconds_buckets: Vec<f64>,
106 ttfb_buckets: Vec<f64>,
107 disable_label_root: bool,
108}
109
110impl Default for PrometheusClientLayerBuilder {
111 fn default() -> Self {
112 Self {
113 bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
114 bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
115 entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
116 entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
117 duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
118 ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
119 disable_label_root: false,
120 }
121 }
122}
123
124impl PrometheusClientLayerBuilder {
125 pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
127 if !buckets.is_empty() {
128 self.bytes_buckets = buckets;
129 }
130 self
131 }
132
133 pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
135 if !buckets.is_empty() {
136 self.bytes_rate_buckets = buckets;
137 }
138 self
139 }
140
141 pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
143 if !buckets.is_empty() {
144 self.entries_buckets = buckets;
145 }
146 self
147 }
148
149 pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
151 if !buckets.is_empty() {
152 self.entries_rate_buckets = buckets;
153 }
154 self
155 }
156
157 pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
159 if !buckets.is_empty() {
160 self.duration_seconds_buckets = buckets;
161 }
162 self
163 }
164
165 pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
167 if !buckets.is_empty() {
168 self.ttfb_buckets = buckets;
169 }
170 self
171 }
172
173 pub fn disable_label_root(mut self, disable: bool) -> Self {
176 self.disable_label_root = disable;
177 self
178 }
179
180 pub fn register(self, registry: &mut Registry) -> PrometheusClientLayer {
203 let operation_bytes =
204 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
205 buckets: self.bytes_buckets.clone(),
206 });
207 let operation_bytes_rate =
208 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
209 buckets: self.bytes_rate_buckets.clone(),
210 });
211 let operation_entries =
212 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
213 buckets: self.entries_buckets.clone(),
214 });
215 let operation_entries_rate =
216 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
217 buckets: self.entries_rate_buckets.clone(),
218 });
219 let operation_duration_seconds =
220 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
221 buckets: self.duration_seconds_buckets.clone(),
222 });
223 let operation_errors_total = Family::<OperationLabels, Counter>::default();
224 let operation_executing = Family::<OperationLabels, Gauge>::default();
225 let operation_ttfb_seconds =
226 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
227 buckets: self.ttfb_buckets.clone(),
228 });
229
230 let http_executing = Family::<OperationLabels, Gauge>::default();
231 let http_request_bytes =
232 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
233 buckets: self.bytes_buckets.clone(),
234 });
235 let http_request_bytes_rate =
236 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
237 buckets: self.bytes_rate_buckets.clone(),
238 });
239 let http_request_duration_seconds =
240 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
241 buckets: self.duration_seconds_buckets.clone(),
242 });
243 let http_response_bytes =
244 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
245 buckets: self.bytes_buckets.clone(),
246 });
247 let http_response_bytes_rate =
248 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
249 buckets: self.bytes_rate_buckets.clone(),
250 });
251 let http_response_duration_seconds =
252 Family::<OperationLabels, Histogram, _>::new_with_constructor(HistogramConstructor {
253 buckets: self.duration_seconds_buckets.clone(),
254 });
255 let http_connection_errors_total = Family::<OperationLabels, Counter>::default();
256 let http_status_errors_total = Family::<OperationLabels, Counter>::default();
257
258 register_metric(
259 registry,
260 operation_bytes.clone(),
261 observe::MetricValue::OperationBytes(0),
262 );
263 register_metric(
264 registry,
265 operation_bytes_rate.clone(),
266 observe::MetricValue::OperationBytesRate(0.0),
267 );
268 register_metric(
269 registry,
270 operation_entries.clone(),
271 observe::MetricValue::OperationEntries(0),
272 );
273 register_metric(
274 registry,
275 operation_entries_rate.clone(),
276 observe::MetricValue::OperationEntriesRate(0.0),
277 );
278 register_metric(
279 registry,
280 operation_duration_seconds.clone(),
281 observe::MetricValue::OperationDurationSeconds(Duration::default()),
282 );
283 register_metric(
284 registry,
285 operation_errors_total.clone(),
286 observe::MetricValue::OperationErrorsTotal,
287 );
288 register_metric(
289 registry,
290 operation_executing.clone(),
291 observe::MetricValue::OperationExecuting(0),
292 );
293 register_metric(
294 registry,
295 operation_ttfb_seconds.clone(),
296 observe::MetricValue::OperationTtfbSeconds(Duration::default()),
297 );
298
299 register_metric(
300 registry,
301 http_executing.clone(),
302 observe::MetricValue::HttpExecuting(0),
303 );
304 register_metric(
305 registry,
306 http_request_bytes.clone(),
307 observe::MetricValue::HttpRequestBytes(0),
308 );
309 register_metric(
310 registry,
311 http_request_bytes_rate.clone(),
312 observe::MetricValue::HttpRequestBytesRate(0.0),
313 );
314 register_metric(
315 registry,
316 http_request_duration_seconds.clone(),
317 observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
318 );
319 register_metric(
320 registry,
321 http_response_bytes.clone(),
322 observe::MetricValue::HttpResponseBytes(0),
323 );
324 register_metric(
325 registry,
326 http_response_bytes_rate.clone(),
327 observe::MetricValue::HttpResponseBytesRate(0.0),
328 );
329 register_metric(
330 registry,
331 http_response_duration_seconds.clone(),
332 observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
333 );
334 register_metric(
335 registry,
336 http_connection_errors_total.clone(),
337 observe::MetricValue::HttpConnectionErrorsTotal,
338 );
339 register_metric(
340 registry,
341 http_status_errors_total.clone(),
342 observe::MetricValue::HttpStatusErrorsTotal,
343 );
344
345 PrometheusClientLayer {
346 interceptor: PrometheusClientInterceptor {
347 operation_bytes,
348 operation_bytes_rate,
349 operation_entries,
350 operation_entries_rate,
351 operation_duration_seconds,
352 operation_errors_total,
353 operation_executing,
354 operation_ttfb_seconds,
355
356 http_executing,
357 http_request_bytes,
358 http_request_bytes_rate,
359 http_request_duration_seconds,
360 http_response_bytes,
361 http_response_bytes_rate,
362 http_response_duration_seconds,
363 http_connection_errors_total,
364 http_status_errors_total,
365
366 disable_label_root: self.disable_label_root,
367 },
368 }
369 }
370}
371
372#[derive(Clone)]
373struct HistogramConstructor {
374 buckets: Vec<f64>,
375}
376
377impl MetricConstructor<Histogram> for HistogramConstructor {
378 fn new_metric(&self) -> Histogram {
379 Histogram::new(self.buckets.iter().cloned())
380 }
381}
382
383#[doc(hidden)]
384#[derive(Clone, Debug)]
385pub struct PrometheusClientInterceptor {
386 operation_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
387 operation_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
388 operation_entries: Family<OperationLabels, Histogram, HistogramConstructor>,
389 operation_entries_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
390 operation_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
391 operation_errors_total: Family<OperationLabels, Counter>,
392 operation_executing: Family<OperationLabels, Gauge>,
393 operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
394
395 http_executing: Family<OperationLabels, Gauge>,
396 http_request_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
397 http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
398 http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
399 http_response_bytes: Family<OperationLabels, Histogram, HistogramConstructor>,
400 http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramConstructor>,
401 http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramConstructor>,
402 http_connection_errors_total: Family<OperationLabels, Counter>,
403 http_status_errors_total: Family<OperationLabels, Counter>,
404
405 disable_label_root: bool,
406}
407
408impl observe::MetricsIntercept for PrometheusClientInterceptor {
409 fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
410 let labels = OperationLabels {
411 labels,
412 disable_label_root: self.disable_label_root,
413 };
414 match value {
415 observe::MetricValue::OperationBytes(v) => self
416 .operation_bytes
417 .get_or_create(&labels)
418 .observe(v as f64),
419 observe::MetricValue::OperationBytesRate(v) => {
420 self.operation_bytes_rate.get_or_create(&labels).observe(v)
421 }
422 observe::MetricValue::OperationEntries(v) => self
423 .operation_entries
424 .get_or_create(&labels)
425 .observe(v as f64),
426 observe::MetricValue::OperationEntriesRate(v) => self
427 .operation_entries_rate
428 .get_or_create(&labels)
429 .observe(v),
430 observe::MetricValue::OperationDurationSeconds(v) => self
431 .operation_duration_seconds
432 .get_or_create(&labels)
433 .observe(v.as_secs_f64()),
434 observe::MetricValue::OperationErrorsTotal => {
435 self.operation_errors_total.get_or_create(&labels).inc();
436 }
437 observe::MetricValue::OperationExecuting(v) => {
438 self.operation_executing
439 .get_or_create(&labels)
440 .inc_by(v as i64);
441 }
442 observe::MetricValue::OperationTtfbSeconds(v) => self
443 .operation_ttfb_seconds
444 .get_or_create(&labels)
445 .observe(v.as_secs_f64()),
446
447 observe::MetricValue::HttpExecuting(v) => {
448 self.http_executing.get_or_create(&labels).inc_by(v as i64);
449 }
450 observe::MetricValue::HttpRequestBytes(v) => self
451 .http_request_bytes
452 .get_or_create(&labels)
453 .observe(v as f64),
454 observe::MetricValue::HttpRequestBytesRate(v) => self
455 .http_request_bytes_rate
456 .get_or_create(&labels)
457 .observe(v),
458 observe::MetricValue::HttpRequestDurationSeconds(v) => self
459 .http_request_duration_seconds
460 .get_or_create(&labels)
461 .observe(v.as_secs_f64()),
462 observe::MetricValue::HttpResponseBytes(v) => self
463 .http_response_bytes
464 .get_or_create(&labels)
465 .observe(v as f64),
466 observe::MetricValue::HttpResponseBytesRate(v) => self
467 .http_response_bytes_rate
468 .get_or_create(&labels)
469 .observe(v),
470 observe::MetricValue::HttpResponseDurationSeconds(v) => self
471 .http_response_duration_seconds
472 .get_or_create(&labels)
473 .observe(v.as_secs_f64()),
474 observe::MetricValue::HttpConnectionErrorsTotal => {
475 self.http_connection_errors_total
476 .get_or_create(&labels)
477 .inc();
478 }
479 observe::MetricValue::HttpStatusErrorsTotal => {
480 self.http_status_errors_total.get_or_create(&labels).inc();
481 }
482 _ => {}
483 };
484 }
485}
486
487#[derive(Clone, Debug, PartialEq, Eq, Hash)]
488struct OperationLabels {
489 labels: observe::MetricLabels,
490 disable_label_root: bool,
491}
492
493impl EncodeLabelSet for OperationLabels {
494 fn encode(&self, encoder: &mut LabelSetEncoder<'_>) -> std::fmt::Result {
495 (observe::LABEL_SCHEME, self.labels.scheme).encode(encoder.encode_label())?;
496 (observe::LABEL_NAMESPACE, self.labels.namespace.as_ref())
497 .encode(encoder.encode_label())?;
498 if !self.disable_label_root {
499 (observe::LABEL_ROOT, self.labels.root.as_ref()).encode(encoder.encode_label())?;
500 }
501 (observe::LABEL_OPERATION, self.labels.operation).encode(encoder.encode_label())?;
502
503 if let Some(error) = &self.labels.error {
504 (observe::LABEL_ERROR, error.into_static()).encode(encoder.encode_label())?;
505 }
506 if let Some(code) = &self.labels.status_code {
507 (observe::LABEL_STATUS_CODE, code.as_str()).encode(encoder.encode_label())?;
508 }
509 if let Some(service_operation) = self.labels.service_operation {
510 (observe::LABEL_SERVICE_OPERATION, service_operation).encode(encoder.encode_label())?;
511 }
512 Ok(())
513 }
514}
515
516fn register_metric(registry: &mut Registry, metric: impl Metric, value: observe::MetricValue) {
517 let ((name, unit), help) = (value.name_with_unit(), value.help());
518
519 if let Some(unit) = unit {
520 registry.register_with_unit(name, help, Unit::Other(unit.to_string()), metric);
521 } else {
522 registry.register(name, help, metric);
523 }
524}