prometheus-32bitfix 0.13.1

Prometheus instrumentation library for Rust applications.
Documentation
// Copyright 2014 The Prometheus Authors
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::collections::btree_map::Entry as BEntry;
use std::collections::hash_map::Entry as HEntry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use parking_lot::RwLock;

use crate::errors::{Error, Result};
use crate::metrics::Collector;
use crate::proto;

use cfg_if::cfg_if;
use lazy_static::lazy_static;

#[derive(Default)]
struct RegistryCore {
    pub collectors_by_id: HashMap<u64, Box<dyn Collector>>,
    pub dim_hashes_by_name: HashMap<String, u64>,
    pub desc_ids: HashSet<u64>,
    /// Optional common labels for all registered collectors.
    pub labels: Option<HashMap<String, String>>,
    /// Optional common namespace for all registered collectors.
    pub prefix: Option<String>,
}

impl std::fmt::Debug for RegistryCore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "RegistryCore ({} collectors)",
            self.collectors_by_id.keys().len()
        )
    }
}

impl RegistryCore {
    fn register(&mut self, c: Box<dyn Collector>) -> Result<()> {
        let mut desc_id_set = HashSet::new();
        let mut collector_id: u64 = 0;

        for desc in c.desc() {
            // Is the desc_id unique?
            // (In other words: Is the fqName + constLabel combination unique?)
            if self.desc_ids.contains(&desc.id) {
                return Err(Error::AlreadyReg);
            }

            if let Some(hash) = self.dim_hashes_by_name.get(&desc.fq_name) {
                if *hash != desc.dim_hash {
                    return Err(Error::Msg(format!(
                        "a previously registered descriptor with the \
                         same fully-qualified name as {:?} has \
                         different label names or a different help \
                         string",
                        desc
                    )));
                }
            }

            self.dim_hashes_by_name
                .insert(desc.fq_name.clone(), desc.dim_hash);

            // If it is not a duplicate desc in this collector, add it to
            // the collector_id.
            if desc_id_set.insert(desc.id) {
                // The set did not have this value present, true is returned.
                collector_id = collector_id.wrapping_add(desc.id);
            } else {
                // The set did have this value present, false is returned.
                //
                // TODO: Should we allow duplicate descs within the same collector?
                return Err(Error::Msg(format!(
                    "a duplicate descriptor within the same \
                     collector the same fully-qualified name: {:?}",
                    desc.fq_name
                )));
            }
        }

        match self.collectors_by_id.entry(collector_id) {
            HEntry::Vacant(vc) => {
                self.desc_ids.extend(desc_id_set);
                vc.insert(c);
                Ok(())
            }
            HEntry::Occupied(_) => Err(Error::AlreadyReg),
        }
    }

    fn unregister(&mut self, c: Box<dyn Collector>) -> Result<()> {
        let mut id_set = Vec::new();
        let mut collector_id: u64 = 0;
        for desc in c.desc() {
            if !id_set.iter().any(|id| *id == desc.id) {
                id_set.push(desc.id);
                collector_id = collector_id.wrapping_add(desc.id);
            }
        }

        if self.collectors_by_id.remove(&collector_id).is_none() {
            return Err(Error::Msg(format!(
                "collector {:?} is not registered",
                c.desc()
            )));
        }

        for id in id_set {
            self.desc_ids.remove(&id);
        }

        // dim_hashes_by_name is left untouched as those must be consistent
        // throughout the lifetime of a program.
        Ok(())
    }

    fn gather(&self) -> Vec<proto::MetricFamily> {
        let mut mf_by_name = BTreeMap::new();

        for c in self.collectors_by_id.values() {
            let mfs = c.collect();
            for mut mf in mfs {
                // Prune empty MetricFamilies.
                if mf.get_metric().is_empty() {
                    continue;
                }

                let name = mf.get_name().to_owned();
                match mf_by_name.entry(name) {
                    BEntry::Vacant(entry) => {
                        entry.insert(mf);
                    }
                    BEntry::Occupied(mut entry) => {
                        let existent_mf = entry.get_mut();
                        let existent_metrics = existent_mf.mut_metric();

                        // TODO: check type.
                        // TODO: check consistency.
                        for metric in mf.take_metric().into_iter() {
                            existent_metrics.push(metric);
                        }
                    }
                }
            }
        }

        // TODO: metric_family injection hook.

        // Now that MetricFamilies are all set, sort their Metrics
        // lexicographically by their label values.
        for mf in mf_by_name.values_mut() {
            mf.mut_metric().sort_by(|m1, m2| {
                let lps1 = m1.get_label();
                let lps2 = m2.get_label();

                if lps1.len() != lps2.len() {
                    // This should not happen. The metrics are
                    // inconsistent. However, we have to deal with the fact, as
                    // people might use custom collectors or metric family injection
                    // to create inconsistent metrics. So let's simply compare the
                    // number of labels in this case. That will still yield
                    // reproducible sorting.
                    return lps1.len().cmp(&lps2.len());
                }

                for (lp1, lp2) in lps1.iter().zip(lps2.iter()) {
                    if lp1.get_value() != lp2.get_value() {
                        return lp1.get_value().cmp(lp2.get_value());
                    }
                }

                // We should never arrive here. Multiple metrics with the same
                // label set in the same scrape will lead to undefined ingestion
                // behavior. However, as above, we have to provide stable sorting
                // here, even for inconsistent metrics. So sort equal metrics
                // by their timestamp, with missing timestamps (implying "now")
                // coming last.
                m1.get_timestamp_ms().cmp(&m2.get_timestamp_ms())
            });
        }

        // Write out MetricFamilies sorted by their name.
        mf_by_name
            .into_iter()
            .map(|(_, mut m)| {
                // Add registry namespace prefix, if any.
                if let Some(ref namespace) = self.prefix {
                    let prefixed = format!("{}_{}", namespace, m.get_name());
                    m.set_name(prefixed);
                }

                // Add registry common labels, if any.
                if let Some(ref hmap) = self.labels {
                    let pairs: Vec<proto::LabelPair> = hmap
                        .iter()
                        .map(|(k, v)| {
                            let mut label = proto::LabelPair::default();
                            label.set_name(k.to_string());
                            label.set_value(v.to_string());
                            label
                        })
                        .collect();

                    for metric in m.mut_metric().iter_mut() {
                        let mut labels: Vec<_> = metric.take_label().into();
                        labels.append(&mut pairs.clone());
                        metric.set_label(labels.into());
                    }
                }
                m
            })
            .collect()
    }
}

/// A struct for registering Prometheus collectors, collecting their metrics, and gathering
/// them into `MetricFamilies` for exposition.
#[derive(Clone, Default, Debug)]
pub struct Registry {
    r: Arc<RwLock<RegistryCore>>,
}

impl Registry {
    /// `new` creates a Registry.
    pub fn new() -> Registry {
        Default::default()
    }

    /// Create a new registry, with optional custom prefix and labels.
    pub fn new_custom(
        prefix: Option<String>,
        labels: Option<HashMap<String, String>>,
    ) -> Result<Registry> {
        if let Some(ref namespace) = prefix {
            if namespace.is_empty() {
                return Err(Error::Msg("empty prefix namespace".to_string()));
            }
        }

        let reg = Registry::default();
        {
            let mut core = reg.r.write();
            core.prefix = prefix;
            core.labels = labels;
        }
        Ok(reg)
    }

    /// `register` registers a new [`Collector`] to be included in metrics
    /// collection. It returns an error if the descriptors provided by the
    /// [`Collector`] are invalid or if they — in combination with descriptors of
    /// already registered Collectors — do not fulfill the consistency and
    /// uniqueness criteria described in the documentation of [`Desc`](crate::core::Desc).
    ///
    /// If the provided [`Collector`] is equal to a [`Collector`] already registered
    /// (which includes the case of re-registering the same [`Collector`]), the
    /// AlreadyReg error returns.
    pub fn register(&self, c: Box<dyn Collector>) -> Result<()> {
        self.r.write().register(c)
    }

    /// `unregister` unregisters the [`Collector`] that equals the [`Collector`] passed
    /// in as an argument.  (Two Collectors are considered equal if their
    /// Describe method yields the same set of descriptors.) The function
    /// returns error when the [`Collector`] is not registered.
    pub fn unregister(&self, c: Box<dyn Collector>) -> Result<()> {
        self.r.write().unregister(c)
    }

    /// `gather` calls the Collect method of the registered Collectors and then
    /// gathers the collected metrics into a lexicographically sorted slice
    /// of MetricFamily protobufs.
    pub fn gather(&self) -> Vec<proto::MetricFamily> {
        self.r.read().gather()
    }
}

cfg_if! {
    if #[cfg(all(feature = "process", target_os="linux"))] {
        fn register_default_process_collector(reg: &Registry) -> Result<()> {
            use crate::process_collector::ProcessCollector;

            let pc = ProcessCollector::for_self();
            reg.register(Box::new(pc))
        }
    } else {
        fn register_default_process_collector(_: &Registry) -> Result<()> {
            Ok(())
        }
    }
}

// Default registry for rust-prometheus.
lazy_static! {
    static ref DEFAULT_REGISTRY: Registry = {
        let reg = Registry::default();

        // Register a default process collector.
        register_default_process_collector(&reg).unwrap();

        reg
    };
}

/// Default registry (global static).
pub fn default_registry() -> &'static Registry {
    lazy_static::initialize(&DEFAULT_REGISTRY);
    &DEFAULT_REGISTRY
}

/// Registers a new [`Collector`] to be included in metrics collection. It
/// returns an error if the descriptors provided by the [`Collector`] are invalid or
/// if they - in combination with descriptors of already registered Collectors -
/// do not fulfill the consistency and uniqueness criteria described in the
/// [`Desc`](crate::core::Desc) documentation.
pub fn register(c: Box<dyn Collector>) -> Result<()> {
    DEFAULT_REGISTRY.register(c)
}

/// Unregisters the [`Collector`] that equals the [`Collector`] passed in as
/// an argument. (Two Collectors are considered equal if their Describe method
/// yields the same set of descriptors.) The function returns an error if a
/// [`Collector`] was not registered.
pub fn unregister(c: Box<dyn Collector>) -> Result<()> {
    DEFAULT_REGISTRY.unregister(c)
}

/// Return all `MetricFamily` of `DEFAULT_REGISTRY`.
pub fn gather() -> Vec<proto::MetricFamily> {
    DEFAULT_REGISTRY.gather()
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use std::thread;

    use super::*;
    use crate::counter::{Counter, CounterVec};
    use crate::desc::Desc;
    use crate::metrics::{Collector, Opts};
    use crate::proto;

    #[test]
    fn test_registry() {
        let r = Registry::new();

        let counter = Counter::new("test", "test help").unwrap();
        r.register(Box::new(counter.clone())).unwrap();
        counter.inc();

        let r1 = r.clone();
        let handler = thread::spawn(move || {
            let metric_families = r1.gather();
            assert_eq!(metric_families.len(), 1);
        });

        assert!(handler.join().is_ok());

        assert!(r.register(Box::new(counter.clone())).is_err());
        assert!(r.unregister(Box::new(counter.clone())).is_ok());
        assert!(r.unregister(Box::new(counter.clone())).is_err());
        assert!(r.register(Box::new(counter)).is_ok());

        let counter_vec =
            CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();

        r.register(Box::new(counter_vec.clone())).unwrap();
        counter_vec.with_label_values(&["1", "2"]).inc();
    }

    #[test]
    fn test_default_registry() {
        let counter = Counter::new("test", "test help").unwrap();

        assert!(register(Box::new(counter.clone())).is_ok());
        assert_ne!(gather().len(), 0);
        assert_ne!(default_registry().gather().len(), 0);
        assert_eq!(gather().len(), default_registry().gather().len());

        assert!(unregister(Box::new(counter.clone())).is_ok());
        assert!(unregister(Box::new(counter.clone())).is_err());
        assert!(default_registry()
            .unregister(Box::new(counter.clone()))
            .is_err());
        assert!(register(Box::new(counter)).is_ok());
    }

    #[test]
    fn test_gather_order() {
        let r = Registry::new();

        let counter_a = Counter::new("test_a_counter", "test help").unwrap();
        let counter_b = Counter::new("test_b_counter", "test help").unwrap();
        let counter_2 = Counter::new("test_2_counter", "test help").unwrap();
        r.register(Box::new(counter_b)).unwrap();
        r.register(Box::new(counter_2)).unwrap();
        r.register(Box::new(counter_a)).unwrap();

        let mfs = r.gather();
        assert_eq!(mfs.len(), 3);
        assert_eq!(mfs[0].get_name(), "test_2_counter");
        assert_eq!(mfs[1].get_name(), "test_a_counter");
        assert_eq!(mfs[2].get_name(), "test_b_counter");

        let r = Registry::new();
        let opts = Opts::new("test", "test help")
            .const_label("a", "1")
            .const_label("b", "2");
        let counter_vec = CounterVec::new(opts, &["cc", "c1", "a2", "c0"]).unwrap();
        r.register(Box::new(counter_vec.clone())).unwrap();

        let mut map1 = HashMap::new();
        map1.insert("cc", "12");
        map1.insert("c1", "a1");
        map1.insert("a2", "0");
        map1.insert("c0", "hello");
        counter_vec.with(&map1).inc();

        let mut map2 = HashMap::new();
        map2.insert("cc", "12");
        map2.insert("c1", "0");
        map2.insert("a2", "0");
        map2.insert("c0", "hello");
        counter_vec.with(&map2).inc();
        counter_vec.with(&map2).inc();

        let mut map3 = HashMap::new();
        map3.insert("cc", "12");
        map3.insert("c1", "0");
        map3.insert("a2", "da");
        map3.insert("c0", "hello");
        counter_vec.with(&map3).inc();
        counter_vec.with(&map3).inc();
        counter_vec.with(&map3).inc();

        let mut map4 = HashMap::new();
        map4.insert("cc", "12");
        map4.insert("c1", "0");
        map4.insert("a2", "da");
        map4.insert("c0", "你好");
        counter_vec.with(&map4).inc();
        counter_vec.with(&map4).inc();
        counter_vec.with(&map4).inc();
        counter_vec.with(&map4).inc();

        // # HELP test test help
        // # TYPE test counter
        // test{a="1",a2="0",b="2",c0="hello",c1="0",cc="12"} 2
        // test{a="1",a2="0",b="2",c0="hello",c1="a1",cc="12"} 1
        // test{a="1",a2="da",b="2",c0="hello",c1="0",cc="12"} 3
        // test{a="1",a2="da",b="2",c0="你好",c1="0",cc="12"} 4

        let mfs = r.gather();
        assert_eq!(mfs.len(), 1);
        let ms = mfs[0].get_metric();
        assert_eq!(ms.len(), 4);
        assert_eq!(ms[0].get_counter().get_value() as u64, 2);
        assert_eq!(ms[1].get_counter().get_value() as u64, 1);
        assert_eq!(ms[2].get_counter().get_value() as u64, 3);
        assert_eq!(ms[3].get_counter().get_value() as u64, 4);
    }

    #[test]
    fn test_with_prefix_gather() {
        assert!(Registry::new_custom(Some("".to_string()), None).is_err());

        let r = Registry::new_custom(Some("common_prefix".to_string()), None).unwrap();
        let counter_a = Counter::new("test_a_counter", "test help").unwrap();
        r.register(Box::new(counter_a)).unwrap();

        let mfs = r.gather();
        assert_eq!(mfs.len(), 1);
        assert_eq!(mfs[0].get_name(), "common_prefix_test_a_counter");
    }

    #[test]
    fn test_with_labels_gather() {
        let mut labels = HashMap::new();
        labels.insert("tkey".to_string(), "tvalue".to_string());

        let r = Registry::new_custom(None, Some(labels)).unwrap();
        let counter_a = Counter::new("test_a_counter", "test help").unwrap();
        r.register(Box::new(counter_a)).unwrap();
        let counter_vec =
            CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
        r.register(Box::new(counter_vec.clone())).unwrap();

        counter_vec.with_label_values(&["one", "two"]).inc();
        counter_vec.with_label_values(&["three", "four"]).inc();

        let mfs = r.gather();
        assert_eq!(mfs.len(), 2);
        assert_eq!(mfs[0].get_name(), "test_a_counter");
        assert_eq!(mfs[1].get_name(), "test_vec");

        let mut needle = proto::LabelPair::default();
        needle.set_name("tkey".to_string());
        needle.set_value("tvalue".to_string());
        let metrics = mfs[0].get_metric();
        for m in metrics {
            assert!(m.get_label().contains(&needle));
        }
        let metrics = mfs[1].get_metric();
        for m in metrics {
            assert!(m.get_label().contains(&needle));
        }
    }

    struct MultipleCollector {
        descs: Vec<Desc>,
        counters: Vec<Counter>,
    }

    impl Collector for MultipleCollector {
        fn desc(&self) -> Vec<&Desc> {
            self.descs.iter().collect()
        }

        fn collect(&self) -> Vec<proto::MetricFamily> {
            self.counters
                .iter()
                .inspect(|c| c.inc())
                .map(|c| c.collect())
                .fold(Vec::new(), |mut acc, mfs| {
                    acc.extend(mfs);
                    acc
                })
        }
    }

    #[test]
    fn test_register_multiplecollector() {
        let counters = vec![
            Counter::new("c1", "c1 is a counter").unwrap(),
            Counter::new("c2", "c2 is a counter").unwrap(),
        ];

        let descs = counters.iter().map(|c| c.desc().into_iter().cloned()).fold(
            Vec::new(),
            |mut acc, ds| {
                acc.extend(ds);
                acc
            },
        );

        let mc = MultipleCollector { descs, counters };

        let r = Registry::new();
        r.register(Box::new(mc)).unwrap();
    }

    #[test]
    fn test_prune_empty_metric_family() {
        let counter_vec =
            CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
        let r = Registry::new();
        r.register(Box::new(counter_vec.clone())).unwrap();
        assert!(r.gather().is_empty());
        counter_vec.with_label_values(&["1", "2"]).inc();
        assert!(!r.gather().is_empty());
    }
}