1#![warn(
91 future_incompatible,
92 missing_debug_implementations,
93 missing_docs,
94 nonstandard_style,
95 rust_2018_idioms,
96 unreachable_pub,
97 unused
98)]
99#![cfg_attr(
100 docsrs,
101 feature(doc_cfg, doc_auto_cfg),
102 deny(rustdoc::broken_intra_doc_links)
103)]
104#![doc(
105 html_logo_url = "https://raw.githubusercontent.com/open-telemetry/opentelemetry-rust/main/assets/logo.svg"
106)]
107#![cfg_attr(test, deny(warnings))]
108
109use once_cell::sync::{Lazy, OnceCell};
110use opentelemetry::{otel_error, otel_warn, InstrumentationScope, Key, Value};
111use opentelemetry_sdk::{
112 error::OTelSdkResult,
113 metrics::{
114 data::{self, ResourceMetrics},
115 reader::MetricReader,
116 InstrumentKind, ManualReader, MetricResult, Pipeline, Temporality,
117 },
118 Resource,
119};
120use prometheus::{
121 core::Desc,
122 proto::{LabelPair, MetricFamily, MetricType},
123};
124use std::{
125 any::TypeId,
126 borrow::Cow,
127 collections::{BTreeMap, HashMap},
128 sync::{Arc, Mutex},
129};
130use std::{fmt, sync::Weak};
131
132const TARGET_INFO_NAME: &str = "target_info";
133const TARGET_INFO_DESCRIPTION: &str = "Target metadata";
134
135const SCOPE_INFO_METRIC_NAME: &str = "otel_scope_info";
136const SCOPE_INFO_DESCRIPTION: &str = "Instrumentation Scope metadata";
137
138const SCOPE_INFO_KEYS: [&str; 2] = ["otel_scope_name", "otel_scope_version"];
139
140const COUNTER_SUFFIX: &str = "_total";
143
144mod config;
145mod resource_selector;
146mod utils;
147
148pub use config::ExporterBuilder;
149pub use resource_selector::ResourceSelector;
150
151pub fn exporter() -> ExporterBuilder {
153 ExporterBuilder::default()
154}
155
156#[derive(Debug)]
158pub struct PrometheusExporter {
159 reader: Arc<ManualReader>,
160}
161
162impl MetricReader for PrometheusExporter {
163 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
164 self.reader.register_pipeline(pipeline)
165 }
166
167 fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
168 self.reader.collect(rm)
169 }
170
171 fn force_flush(&self) -> OTelSdkResult {
172 self.reader.force_flush()
173 }
174
175 fn shutdown(&self) -> OTelSdkResult {
176 self.reader.shutdown()
177 }
178
179 fn temporality(&self, _kind: InstrumentKind) -> Temporality {
182 Temporality::Cumulative
183 }
184}
185
186struct Collector {
187 reader: Arc<ManualReader>,
188 disable_target_info: bool,
189 without_units: bool,
190 without_counter_suffixes: bool,
191 disable_scope_info: bool,
192 create_target_info_once: OnceCell<MetricFamily>,
193 resource_labels_once: OnceCell<Vec<LabelPair>>,
194 namespace: Option<String>,
195 inner: Mutex<CollectorInner>,
196 resource_selector: ResourceSelector,
197}
198
199#[derive(Default)]
200struct CollectorInner {
201 scope_infos: HashMap<InstrumentationScope, MetricFamily>,
202 metric_families: HashMap<String, MetricFamily>,
203}
204
205static HISTOGRAM_TYPES: Lazy<[TypeId; 3]> = Lazy::new(|| {
208 [
209 TypeId::of::<data::Histogram<i64>>(),
210 TypeId::of::<data::Histogram<u64>>(),
211 TypeId::of::<data::Histogram<f64>>(),
212 ]
213});
214static SUM_TYPES: Lazy<[TypeId; 3]> = Lazy::new(|| {
215 [
216 TypeId::of::<data::Sum<i64>>(),
217 TypeId::of::<data::Sum<u64>>(),
218 TypeId::of::<data::Sum<f64>>(),
219 ]
220});
221static GAUGE_TYPES: Lazy<[TypeId; 3]> = Lazy::new(|| {
222 [
223 TypeId::of::<data::Gauge<i64>>(),
224 TypeId::of::<data::Gauge<u64>>(),
225 TypeId::of::<data::Gauge<f64>>(),
226 ]
227});
228
229impl Collector {
230 fn metric_type_and_name(&self, m: &data::Metric) -> Option<(MetricType, Cow<'static, str>)> {
231 let mut name = self.get_name(m);
232
233 let data = m.data.as_any();
234 let type_id = data.type_id();
235
236 if HISTOGRAM_TYPES.contains(&type_id) {
237 Some((MetricType::HISTOGRAM, name))
238 } else if GAUGE_TYPES.contains(&type_id) {
239 Some((MetricType::GAUGE, name))
240 } else if SUM_TYPES.contains(&type_id) {
241 let is_monotonic = if let Some(v) = data.downcast_ref::<data::Sum<i64>>() {
242 v.is_monotonic
243 } else if let Some(v) = data.downcast_ref::<data::Sum<u64>>() {
244 v.is_monotonic
245 } else if let Some(v) = data.downcast_ref::<data::Sum<f64>>() {
246 v.is_monotonic
247 } else {
248 false
249 };
250
251 if is_monotonic {
252 if !self.without_counter_suffixes {
253 name = format!("{name}{COUNTER_SUFFIX}").into();
254 }
255 Some((MetricType::COUNTER, name))
256 } else {
257 Some((MetricType::GAUGE, name))
258 }
259 } else {
260 None
261 }
262 }
263
264 fn get_name(&self, m: &data::Metric) -> Cow<'static, str> {
265 let name = utils::sanitize_name(&m.name);
266 let unit_suffixes = if self.without_units {
267 None
268 } else {
269 utils::get_unit_suffixes(&m.unit)
270 };
271 match (&self.namespace, unit_suffixes) {
272 (Some(namespace), Some(suffix)) => Cow::Owned(format!("{namespace}{name}_{suffix}")),
273 (Some(namespace), None) => Cow::Owned(format!("{namespace}{name}")),
274 (None, Some(suffix)) => Cow::Owned(format!("{name}_{suffix}")),
275 (None, None) => name,
276 }
277 }
278}
279
280impl prometheus::core::Collector for Collector {
281 fn desc(&self) -> Vec<&Desc> {
282 Vec::new()
283 }
284
285 fn collect(&self) -> Vec<MetricFamily> {
286 let mut inner = match self.inner.lock() {
287 Ok(guard) => guard,
288 Err(err) => {
289 otel_error!(
290 name: "MetricScrapeFailed",
291 message = err.to_string(),
292 );
293 return Vec::new();
294 }
295 };
296
297 let mut metrics = ResourceMetrics {
298 resource: Resource::builder_empty().build(),
299 scope_metrics: vec![],
300 };
301 if let Err(err) = self.reader.collect(&mut metrics) {
302 otel_error!(
303 name: "MetricScrapeFailed",
304 message = err.to_string(),
305 );
306 return vec![];
307 }
308 let mut res = Vec::with_capacity(metrics.scope_metrics.len() + 1);
309
310 let target_info = self.create_target_info_once.get_or_init(|| {
311 create_info_metric(TARGET_INFO_NAME, TARGET_INFO_DESCRIPTION, &metrics.resource)
313 });
314
315 if !self.disable_target_info && !metrics.resource.is_empty() {
316 res.push(target_info.clone())
317 }
318
319 let resource_labels = self
320 .resource_labels_once
321 .get_or_init(|| self.resource_selector.select(&metrics.resource));
322
323 for scope_metrics in metrics.scope_metrics {
324 let scope_labels = if !self.disable_scope_info {
325 if scope_metrics.scope.attributes().count() > 0 {
326 let scope_info = inner
327 .scope_infos
328 .entry(scope_metrics.scope.clone())
329 .or_insert_with_key(create_scope_info_metric);
330 res.push(scope_info.clone());
331 }
332
333 let mut labels =
334 Vec::with_capacity(1 + scope_metrics.scope.version().is_some() as usize);
335 let mut name = LabelPair::new();
336 name.set_name(SCOPE_INFO_KEYS[0].into());
337 name.set_value(scope_metrics.scope.name().to_string());
338 labels.push(name);
339 if let Some(version) = &scope_metrics.scope.version() {
340 let mut l_version = LabelPair::new();
341 l_version.set_name(SCOPE_INFO_KEYS[1].into());
342 l_version.set_value(version.to_string());
343 labels.push(l_version);
344 }
345
346 if !resource_labels.is_empty() {
347 labels.extend(resource_labels.iter().cloned());
348 }
349 labels
350 } else {
351 Vec::new()
352 };
353
354 for metrics in scope_metrics.metrics {
355 let (metric_type, name) = match self.metric_type_and_name(&metrics) {
356 Some((metric_type, name)) => (metric_type, name),
357 _ => continue,
358 };
359
360 let mfs = &mut inner.metric_families;
361 let (drop, help) = validate_metrics(&name, &metrics.description, metric_type, mfs);
362 if drop {
363 continue;
364 }
365
366 let description = help.unwrap_or_else(|| metrics.description.into());
367 let data = metrics.data.as_any();
368
369 if let Some(hist) = data.downcast_ref::<data::Histogram<i64>>() {
370 add_histogram_metric(&mut res, hist, description, &scope_labels, name);
371 } else if let Some(hist) = data.downcast_ref::<data::Histogram<u64>>() {
372 add_histogram_metric(&mut res, hist, description, &scope_labels, name);
373 } else if let Some(hist) = data.downcast_ref::<data::Histogram<f64>>() {
374 add_histogram_metric(&mut res, hist, description, &scope_labels, name);
375 } else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
376 add_sum_metric(&mut res, sum, description, &scope_labels, name);
377 } else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
378 add_sum_metric(&mut res, sum, description, &scope_labels, name);
379 } else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
380 add_sum_metric(&mut res, sum, description, &scope_labels, name);
381 } else if let Some(g) = data.downcast_ref::<data::Gauge<u64>>() {
382 add_gauge_metric(&mut res, g, description, &scope_labels, name);
383 } else if let Some(g) = data.downcast_ref::<data::Gauge<i64>>() {
384 add_gauge_metric(&mut res, g, description, &scope_labels, name);
385 } else if let Some(g) = data.downcast_ref::<data::Gauge<f64>>() {
386 add_gauge_metric(&mut res, g, description, &scope_labels, name);
387 }
388 }
389 }
390
391 res
392 }
393}
394
395fn get_attrs(kvs: &mut dyn Iterator<Item = (&Key, &Value)>, extra: &[LabelPair]) -> Vec<LabelPair> {
400 let mut keys_map = BTreeMap::<String, Vec<String>>::new();
401 for (key, value) in kvs {
402 let key = utils::sanitize_prom_kv(key.as_str());
403 keys_map
404 .entry(key)
405 .and_modify(|v| v.push(value.to_string()))
406 .or_insert_with(|| vec![value.to_string()]);
407 }
408
409 let mut res = Vec::with_capacity(keys_map.len() + extra.len());
410
411 for (key, mut values) in keys_map.into_iter() {
412 values.sort_unstable();
413
414 let mut lp = LabelPair::new();
415 lp.set_name(key);
416 lp.set_value(values.join(";"));
417 res.push(lp);
418 }
419
420 if !extra.is_empty() {
421 res.extend(&mut extra.iter().cloned());
422 }
423
424 res
425}
426
427fn validate_metrics(
428 name: &str,
429 description: &str,
430 metric_type: MetricType,
431 mfs: &mut HashMap<String, MetricFamily>,
432) -> (bool, Option<String>) {
433 if let Some(existing) = mfs.get(name) {
434 if existing.get_field_type() != metric_type {
435 otel_warn!(
436 name: "MetricValidationFailed",
437 message = "Instrument type conflict, using existing type definition",
438 metric_type = format!("Instrument {name}, Existing: {:?}, dropped: {:?}", existing.get_field_type(), metric_type).as_str(),
439 );
440 return (true, None);
441 }
442 if existing.help() != description {
443 otel_warn!(
444 name: "MetricValidationFailed",
445 message = "Instrument description conflict, using existing",
446 metric_description = format!("Instrument {name}, Existing: {:?}, dropped: {:?}", existing.help().to_string(), description.to_string()).as_str(),
447 );
448 return (false, Some(existing.help().to_string()));
449 }
450 (false, None)
451 } else {
452 let mut mf = MetricFamily::default();
453 mf.set_name(name.into());
454 mf.set_help(description.to_string());
455 mf.set_field_type(metric_type);
456 mfs.insert(name.to_string(), mf);
457
458 (false, None)
459 }
460}
461
462fn add_histogram_metric<T: Numeric>(
463 res: &mut Vec<MetricFamily>,
464 histogram: &data::Histogram<T>,
465 description: String,
466 extra: &[LabelPair],
467 name: Cow<'static, str>,
468) {
469 for dp in &histogram.data_points {
473 let kvs = get_attrs(
474 &mut dp.attributes.iter().map(|kv| (&kv.key, &kv.value)),
475 extra,
476 );
477 let bounds_len = dp.bounds.len();
478 let (bucket, _) = dp.bounds.iter().enumerate().fold(
479 (Vec::with_capacity(bounds_len), 0),
480 |(mut acc, mut count), (i, bound)| {
481 count += dp.bucket_counts[i];
482
483 let mut b = prometheus::proto::Bucket::default();
484 b.set_upper_bound(*bound);
485 b.set_cumulative_count(count);
486 acc.push(b);
487 (acc, count)
488 },
489 );
490
491 let mut h = prometheus::proto::Histogram::default();
492 h.set_sample_sum(dp.sum.as_f64());
493 h.set_sample_count(dp.count);
494 h.set_bucket(bucket);
495 let mut pm = prometheus::proto::Metric::default();
496 pm.set_label(kvs);
497 pm.set_histogram(h);
498
499 let mut mf = prometheus::proto::MetricFamily::default();
500 mf.set_name(name.to_string());
501 mf.set_help(description.clone());
502 mf.set_field_type(prometheus::proto::MetricType::HISTOGRAM);
503 mf.set_metric(vec![pm]);
504 res.push(mf);
505 }
506}
507
508fn add_sum_metric<T: Numeric>(
509 res: &mut Vec<MetricFamily>,
510 sum: &data::Sum<T>,
511 description: String,
512 extra: &[LabelPair],
513 name: Cow<'static, str>,
514) {
515 let metric_type = if sum.is_monotonic {
516 MetricType::COUNTER
517 } else {
518 MetricType::GAUGE
519 };
520
521 for dp in &sum.data_points {
522 let kvs = get_attrs(
523 &mut dp.attributes.iter().map(|kv| (&kv.key, &kv.value)),
524 extra,
525 );
526
527 let mut pm = prometheus::proto::Metric::default();
528 pm.set_label(kvs);
529
530 if sum.is_monotonic {
531 let mut c = prometheus::proto::Counter::default();
532 c.set_value(dp.value.as_f64());
533 pm.set_counter(c);
534 } else {
535 let mut g = prometheus::proto::Gauge::default();
536 g.set_value(dp.value.as_f64());
537 pm.set_gauge(g);
538 }
539
540 let mut mf = prometheus::proto::MetricFamily::default();
541 mf.set_name(name.to_string());
542 mf.set_help(description.clone());
543 mf.set_field_type(metric_type);
544 mf.set_metric(vec![pm]);
545 res.push(mf);
546 }
547}
548
549fn add_gauge_metric<T: Numeric>(
550 res: &mut Vec<MetricFamily>,
551 gauge: &data::Gauge<T>,
552 description: String,
553 extra: &[LabelPair],
554 name: Cow<'static, str>,
555) {
556 for dp in &gauge.data_points {
557 let kvs = get_attrs(
558 &mut dp.attributes.iter().map(|kv| (&kv.key, &kv.value)),
559 extra,
560 );
561
562 let mut g = prometheus::proto::Gauge::default();
563 g.set_value(dp.value.as_f64());
564 let mut pm = prometheus::proto::Metric::default();
565 pm.set_label(kvs);
566 pm.set_gauge(g);
567
568 let mut mf = prometheus::proto::MetricFamily::default();
569 mf.set_name(name.to_string());
570 mf.set_help(description.to_string());
571 mf.set_field_type(MetricType::GAUGE);
572 mf.set_metric(vec![pm]);
573 res.push(mf);
574 }
575}
576
577fn create_info_metric(
578 target_info_name: &str,
579 target_info_description: &str,
580 resource: &Resource,
581) -> MetricFamily {
582 let mut g = prometheus::proto::Gauge::default();
583 g.set_value(1.0);
584
585 let mut m = prometheus::proto::Metric::default();
586 m.set_label(get_attrs(
587 &mut resource.iter(),
588 &[],
589 ));
590 m.set_gauge(g);
591
592 let mut mf = MetricFamily::default();
593 mf.set_name(target_info_name.into());
594 mf.set_help(target_info_description.into());
595 mf.set_field_type(MetricType::GAUGE);
596 mf.set_metric(vec![m]);
597 mf
598}
599
600fn create_scope_info_metric(scope: &InstrumentationScope) -> MetricFamily {
601 let mut g = prometheus::proto::Gauge::default();
602 g.set_value(1.0);
603
604 let mut labels = Vec::with_capacity(1 + scope.version().is_some() as usize);
605 let mut name = LabelPair::new();
606 name.set_name(SCOPE_INFO_KEYS[0].into());
607 name.set_value(scope.name().to_string());
608 labels.push(name);
609 if let Some(version) = &scope.version() {
610 let mut v_label = LabelPair::new();
611 v_label.set_name(SCOPE_INFO_KEYS[1].into());
612 v_label.set_value(version.to_string());
613 labels.push(v_label);
614 }
615
616 let mut m = prometheus::proto::Metric::default();
617 m.set_label(labels);
618 m.set_gauge(g);
619
620 let mut mf = MetricFamily::default();
621 mf.set_name(SCOPE_INFO_METRIC_NAME.into());
622 mf.set_help(SCOPE_INFO_DESCRIPTION.into());
623 mf.set_field_type(MetricType::GAUGE);
624 mf.set_metric(vec![m]);
625 mf
626}
627
628trait Numeric: fmt::Debug {
629 fn as_f64(&self) -> f64;
631}
632
633impl Numeric for u64 {
634 fn as_f64(&self) -> f64 {
635 *self as f64
636 }
637}
638
639impl Numeric for i64 {
640 fn as_f64(&self) -> f64 {
641 *self as f64
642 }
643}
644
645impl Numeric for f64 {
646 fn as_f64(&self) -> f64 {
647 *self
648 }
649}