foundations 5.7.3

A Rust service foundations library.
Documentation
use super::rewind::{RewindState, RewindTo};
use super::{ExtraProducer, InfoMetric, info_metric, report_nonfatal_collect_error};
use crate::telemetry::settings::{MetricsSettings, ServiceNameFormat};
use crate::{Result, ServiceInfo};
use prometheus_client::encoding::text::{EncodeMetric, Encoder, SendSyncEncodeMetric, encode};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
use prometools::serde::InfoGauge;
use std::any::TypeId;
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::OnceLock;

static REGISTRIES: OnceLock<Registries> = OnceLock::new();

enum MetricsServiceName {
    Prefix(String),
    Label(String, String),
}

impl MetricsServiceName {
    fn new(name: &str, format: ServiceNameFormat) -> Self {
        let name = name.to_owned();
        match format {
            ServiceNameFormat::MetricPrefix => Self::Prefix(name),
            ServiceNameFormat::LabelWithName(label) => Self::Label(label, name),
        }
    }
}

#[doc(hidden)]
pub struct Registries {
    // NOTE: we intentionally use a lock without poisoning here to not
    // panic the threads if they just share telemetry with failed thread.
    main: parking_lot::RwLock<Registry>,
    opt: parking_lot::RwLock<Registry>,
    pub(super) info: parking_lot::RwLock<HashMap<TypeId, Box<dyn ErasedInfoMetric>>>,
    service_name: MetricsServiceName,
    extra_producers: parking_lot::RwLock<Vec<Box<dyn ExtraProducer>>>,
}

impl Registries {
    pub(super) fn init(service_info: &ServiceInfo, settings: &MetricsSettings) -> bool {
        let service_name = MetricsServiceName::new(
            &service_info.name_in_metrics,
            settings.service_name_format.clone(),
        );

        let mut first_install = false;

        // FIXME(nox): Due to prometheus-client 0.18 not supporting the creation of
        // registries with specific label values, we use `MetricsServiceName::Label`
        // directly in `Registries::get_subsystem`.
        REGISTRIES.get_or_init(|| {
            let regs = Registries {
                main: Default::default(),
                opt: Default::default(),
                info: Default::default(),
                service_name,
                extra_producers: Default::default(),
            };
            first_install = true;
            regs
        });

        first_install
    }

    pub(super) fn collect(buffer: &mut Vec<u8>, collect_optional: bool) -> Result<()> {
        let registries = Self::get();

        registries.collect_info_metrics(buffer)?;

        encode_registry(buffer, &registries.main.read())?;

        if collect_optional {
            encode_registry(buffer, &registries.opt.read())?;
        }

        for producer in registries.extra_producers.read().iter() {
            producer.produce(buffer);
            truncate_eof(buffer);
        }

        Ok(())
    }

    fn collect_info_metrics(&self, buffer: &mut Vec<u8>) -> Result<()> {
        let info_registry = self.info.read();
        let mut registry = Registry::default();

        for info_metric in info_registry.values() {
            let info_gauge = InfoGauge::new(&**info_metric);
            let info_gauge = RewindErrorEncode(info_gauge);

            registry.register(info_metric.name(), info_metric.help(), info_gauge)
        }

        encode_registry(buffer, &registry)
    }

    pub fn get_subsystem(
        subsystem: &str,
        optional: bool,
        with_service_prefix: bool,
    ) -> impl DerefMut<Target = Registry> + 'static {
        let registries = Self::get();
        let registry = if optional {
            &registries.opt
        } else {
            &registries.main
        };

        let mut prefix = Cow::Borrowed(subsystem);
        if with_service_prefix && let MetricsServiceName::Prefix(service) = &registries.service_name
        {
            prefix = format!("{service}_{subsystem}").into();
        }

        parking_lot::RwLockWriteGuard::map(registry.write(), move |mut reg| {
            if let MetricsServiceName::Label(name, val) = &registries.service_name {
                reg = reg.sub_registry_with_label((name.into(), val.into()));
            }
            reg.sub_registry_with_prefix(prefix)
        })
    }

    pub fn add_extra_producer(&self, producer: Box<dyn ExtraProducer>) {
        self.extra_producers.write().push(producer);
    }

    #[cfg(test)]
    pub fn is_initialized() -> bool {
        REGISTRIES.get().is_some()
    }

    pub(super) fn get() -> &'static Registries {
        REGISTRIES.get_or_init(|| Registries {
            main: Default::default(),
            opt: Default::default(),
            info: Default::default(),
            service_name: MetricsServiceName::Prefix("undefined".to_owned()),
            extra_producers: Default::default(),
        })
    }
}

/// Build and version information
#[info_metric(crate_path = "crate")]
pub(super) struct BuildInfo {
    pub(super) version: &'static str,
}

/// Information about the process runtime
#[info_metric(crate_path = "crate")]
pub(super) struct RuntimeInfo {
    pub(super) pid: u32,
}

pub(super) trait ErasedInfoMetric: erased_serde::Serialize + Send + Sync + 'static {
    fn name(&self) -> &'static str;

    fn help(&self) -> &'static str;
}

erased_serde::serialize_trait_object!(ErasedInfoMetric);

impl<M> ErasedInfoMetric for M
where
    M: InfoMetric,
{
    fn name(&self) -> &'static str {
        M::NAME
    }

    fn help(&self) -> &'static str {
        M::HELP
    }
}

std::thread_local! {
    static ENCODER_REWIND_STATE: RewindState = const { RewindState::new() };
}

struct RewindErrorEncode<M>(M);

impl<M: EncodeMetric> EncodeMetric for RewindErrorEncode<M> {
    fn encode(&self, encoder: Encoder) -> std::io::Result<()> {
        let mut res = self.0.encode(encoder);

        // If encoding the metric failed, and we are inside a rewindable encoder,
        // discard the error and rewind the encoder to the last newline to avoid
        // garbage output.
        if res.is_err() {
            let _ = ENCODER_REWIND_STATE.try_with(|s| {
                if s.is_active() {
                    s.rewind_to(RewindTo::LastNewline);
                    let err = std::mem::replace(&mut res, Ok(())).unwrap_err();
                    report_nonfatal_collect_error(&format_args!(
                        "encoding metric or family: {err}"
                    ));
                }
            });
        }

        res
    }

    #[inline]
    fn metric_type(&self) -> MetricType {
        self.0.metric_type()
    }
}

/// Wraps a metric in our private error-handling type, without making the type public.
pub fn wrap_metric(metric: impl SendSyncEncodeMetric + 'static) -> Box<dyn SendSyncEncodeMetric> {
    Box::new(RewindErrorEncode(metric))
}

fn encode_registry(buffer: &mut Vec<u8>, registry: &Registry<impl EncodeMetric>) -> Result<()> {
    ENCODER_REWIND_STATE.with(|s| {
        let mut writer = s.activate(buffer);
        encode(&mut writer, registry)
    })?;

    truncate_eof(buffer);

    Ok(())
}

fn truncate_eof(buffer: &mut Vec<u8>) {
    const EOF_MARKER: &[u8] = b"# EOF\n";
    if buffer.ends_with(EOF_MARKER) {
        buffer.truncate(buffer.len() - EOF_MARKER.len());
    }
}