zenoh-stats 1.7.1

Internal crate for zenoh.
Documentation
use std::{
    any::TypeId,
    cell::Cell,
    collections::{hash_map::Entry, HashMap},
    fmt,
    hash::Hash,
    sync::{Arc, RwLock},
    time::{Duration, Instant},
};

use prometheus_client::{
    collector::Collector,
    encoding::{DescriptorEncoder, EncodeLabelSet, MetricEncoder, NoLabelSet},
    metrics::{counter::Counter, family::MetricConstructor, TypedMetric},
    registry::Unit,
};

use crate::{
    keys::HistogramPerKey,
    labels::{DisconnectedLabels, LabelsSetRef, LinkLabels, TransportLabels},
    stats::StatsPath,
    StatsDirection,
};

const GARBAGE_COLLECTION_DELAY: Duration = Duration::from_secs(60);

#[derive(Debug)]
pub(crate) struct TransportFamily<S, M, C = fn() -> M>(Arc<RwLock<TransportFamilyInner<S, M, C>>>);

#[derive(Debug)]
struct TransportFamilyInner<S, M, C> {
    transports: HashMap<TransportLabels, TransportState<S, M>>,
    disconnected: HashMap<S, M>,
    constructor: C,
}

#[derive(Debug)]
struct TransportState<S, M> {
    links: HashMap<Option<LinkLabels>, HashMap<S, M>>,
    disconnection: Option<Instant>,
}

impl<S, M> Default for TransportState<S, M> {
    fn default() -> Self {
        Self {
            links: HashMap::new(),
            disconnection: None,
        }
    }
}

impl<S, M, C> Clone for TransportFamily<S, M, C> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<S: Clone + Hash + Eq, M: Default> Default for TransportFamily<S, M> {
    fn default() -> Self {
        Self::new_with_constructor(Default::default)
    }
}

impl<S: Clone + Hash + Eq, M, C: MetricConstructor<M>> TransportFamily<S, M, C> {
    pub fn new_with_constructor(constructor: C) -> Self {
        Self(Arc::new(RwLock::new(TransportFamilyInner {
            transports: Default::default(),
            disconnected: Default::default(),
            constructor,
        })))
    }
}

impl<S: StatsPath<M> + Clone + Hash + Eq, M: TransportMetric + Clone, C: MetricConstructor<M>>
    TransportFamily<S, M, C>
{
    pub(crate) fn get_or_create_owned(
        &self,
        transport: &TransportLabels,
        link: Option<&LinkLabels>,
        labels: &S,
    ) -> M {
        let inner = &mut *self.0.write().unwrap();
        let family_entry = inner.transports.entry(transport.clone()).or_default();
        family_entry.disconnection = None;
        let link_entry = family_entry
            .links
            .entry(link.cloned())
            .or_default()
            .entry(labels.clone());
        let new_metric = || inner.constructor.new_metric();
        link_entry.or_insert_with(new_metric).clone()
    }

    pub(crate) fn remove_link(&self, transport: &TransportLabels, link: &LinkLabels) {
        let inner = &mut *self.0.write().unwrap();
        if let Some(family) = inner.transports.get_mut(transport) {
            if let Some(metrics) = family.links.remove(&Some(link.clone())) {
                for (labels, metric) in metrics {
                    let no_link_metrics = family.links.entry(None).or_default();
                    let new_metric = || inner.constructor.new_metric();
                    metric.drain_into(no_link_metrics.entry(labels).or_insert_with(new_metric));
                }
            }
        }
    }

    pub(crate) fn remove_transport(&self, transport: &TransportLabels) {
        let inner = &mut *self.0.write().unwrap();
        if let Some(family_entry) = inner.transports.get_mut(transport) {
            family_entry.disconnection = Some(Instant::now());
        }
    }

    fn collect(&self) -> HashMap<S, (M::Collected, Vec<TransportCollected<M>>)> {
        let inner = self.0.read().unwrap();
        let now = Instant::now();
        let needs_garbage_collection = |disconnection: Option<Instant>| matches!(disconnection, Some(instant) if now.duration_since(instant) > GARBAGE_COLLECTION_DELAY);
        let mut garbage_collection: bool = false;
        let mut results = inner
            .disconnected
            .iter()
            .map(|(s, m)| (s.clone(), (m.collect(), Vec::new())))
            .collect::<HashMap<S, (M::Collected, Vec<TransportCollected<M>>)>>();
        for (transport, state) in inner.transports.iter() {
            let disconnected = state.disconnection.is_some();
            garbage_collection |= needs_garbage_collection(state.disconnection);
            if disconnected && !COLLECT_DISCONNECTED.get() {
                continue;
            }
            for (link, metrics) in state.links.iter() {
                for (labels, metric) in metrics {
                    let collected = metric.collect();
                    let transports = match results.entry(labels.clone()) {
                        Entry::Occupied(mut entry) => {
                            M::sum_collected(&mut entry.get_mut().0, &collected);
                            &mut entry.into_mut().1
                        }
                        Entry::Vacant(entry) => {
                            &mut entry.insert((collected.clone(), Vec::new())).1
                        }
                    };
                    let links = match transports.last_mut() {
                        Some(t) if t.transport == *transport => {
                            M::sum_collected(&mut t.collected, &collected);
                            &mut t.links
                        }
                        _ => {
                            transports.push(TransportCollected {
                                transport: transport.clone(),
                                disconnected: DisconnectedLabels { disconnected },
                                collected: collected.clone(),
                                links: Vec::new(),
                            });
                            &mut transports.last_mut().unwrap().links
                        }
                    };
                    if let Some(link) = link {
                        links.push((link.clone(), collected));
                    }
                }
            }
        }
        drop(inner);
        if garbage_collection {
            let inner = &mut *self.0.write().unwrap();
            inner.transports.retain(|_, family| {
                if needs_garbage_collection(family.disconnection) {
                    for (labels, metric) in family
                        .links
                        .get_mut(&None)
                        .into_iter()
                        .flat_map(HashMap::drain)
                    {
                        let new_metric = || inner.constructor.new_metric();
                        metric.drain_into(
                            inner.disconnected.entry(labels).or_insert_with(new_metric),
                        );
                    }
                    return false;
                }
                true
            });
        }
        results
    }

    pub(crate) fn merge_stats(&self, direction: StatsDirection, json: &mut serde_json::Value) {
        let inner = self.0.read().unwrap();
        for (labels, metric) in inner.disconnected.iter() {
            S::incr_stats(direction, None, None, labels, metric.collect(), json);
        }
        for (transport, family) in inner.transports.iter() {
            for (link, metrics) in family.links.iter() {
                for (labels, metric) in metrics {
                    S::incr_stats(
                        direction,
                        Some(transport),
                        link.as_ref(),
                        labels,
                        metric.collect(),
                        json,
                    );
                }
            }
        }
    }
}

pub(crate) trait TransportMetric: 'static {
    type Collected: Clone;
    fn drain_into(&self, other: &Self);
    fn collect(&self) -> Self::Collected;
    fn sum_collected(collected: &mut Self::Collected, other: &Self::Collected);
    fn encode(
        encoder: &mut MetricEncoder,
        labels: &impl EncodeLabelSet,
        collected: &Self::Collected,
    ) -> fmt::Result;
}

impl TransportMetric for Counter {
    type Collected = u64;

    fn drain_into(&self, other: &Self) {
        other.inc_by(self.get());
    }

    fn collect(&self) -> Self::Collected {
        self.get()
    }

    fn sum_collected(collected: &mut Self::Collected, other: &Self::Collected) {
        *collected += other;
    }

    fn encode(
        encoder: &mut MetricEncoder,
        labels: &impl EncodeLabelSet,
        collected: &Self::Collected,
    ) -> fmt::Result {
        encoder
            .encode_family(labels)?
            .encode_counter::<NoLabelSet, _, u64>(collected, None)
    }
}

struct TransportCollected<M: TransportMetric> {
    transport: TransportLabels,
    disconnected: DisconnectedLabels,
    collected: M::Collected,
    links: Vec<(LinkLabels, M::Collected)>,
}

thread_local! {
    pub(crate) static COLLECT_PER_TRANSPORT: Cell<bool> = const { Cell::new(false) };
    pub(crate) static COLLECT_PER_LINK: Cell<bool> = const { Cell::new(false) };
    pub(crate) static COLLECT_DISCONNECTED: Cell<bool> = const { Cell::new(false) };
    pub(crate) static COLLECT_PER_KEY: Cell<bool> = const { Cell::new(false) };
}

#[derive(Debug)]
pub(crate) struct TransportFamilyCollector<S, M, C> {
    pub(crate) name: String,
    pub(crate) help: String,
    pub(crate) unit: Option<Unit>,
    pub(crate) family: TransportFamily<S, M, C>,
}

impl<
        S: EncodeLabelSet + StatsPath<M> + Clone + Hash + Eq + fmt::Debug + Send + Sync + 'static,
        M: TypedMetric + TransportMetric + Clone + fmt::Debug + Send + Sync + 'static,
        C: MetricConstructor<M> + fmt::Debug + Send + Sync + 'static,
    > Collector for TransportFamilyCollector<S, M, C>
{
    fn encode(&self, mut encoder: DescriptorEncoder) -> fmt::Result {
        if TypeId::of::<M>() == TypeId::of::<HistogramPerKey>() && !COLLECT_PER_KEY.get() {
            return Ok(());
        }
        let collected = self.family.collect();
        let mut metric_encoder =
            encoder.encode_descriptor(&self.name, &self.help, self.unit.as_ref(), M::TYPE)?;
        for (labels, (collected, _)) in collected.iter() {
            M::encode(&mut metric_encoder, labels, collected)?
        }
        if COLLECT_PER_TRANSPORT.get() {
            let per_transport = format!("{name}_per_transport", name = self.name);
            let mut metric_encoder = encoder.encode_descriptor(
                &per_transport,
                &format!("{help} (per transport)", help = self.help),
                self.unit.as_ref(),
                M::TYPE,
            )?;
            for (labels, (_, transports)) in collected.iter() {
                for transport in transports {
                    let labels = (
                        LabelsSetRef(labels),
                        (LabelsSetRef(&transport.transport), transport.disconnected),
                    );
                    M::encode(&mut metric_encoder, &labels, &transport.collected)?
                }
            }
        }
        if COLLECT_PER_LINK.get()
            && collected
                .values()
                .flat_map(|(_, transports)| transports)
                .any(|t| !t.links.is_empty())
        {
            let per_link = format!("{name}_per_link", name = self.name);
            let mut metric_encoder = encoder.encode_descriptor(
                &per_link,
                &format!("{help} (per link)", help = self.help),
                self.unit.as_ref(),
                M::TYPE,
            )?;
            for (labels, (_, transports)) in collected.iter() {
                for transport in transports {
                    for (link, collected) in transport.links.iter() {
                        let labels = (
                            LabelsSetRef(labels),
                            (LabelsSetRef(&transport.transport), LabelsSetRef(link)),
                        );
                        M::encode(&mut metric_encoder, &labels, collected)?
                    }
                }
            }
        }
        Ok(())
    }
}