use super::rewind::{RewindState, RewindTo};
use super::{ExtraProducer, InfoMetric, info_metric, report_nonfatal_collect_error};
use crate::telemetry::settings::{MetricsSettings, ServiceNameFormat};
use crate::{Result, ServiceInfo};
use prometheus_client::encoding::text::{EncodeMetric, Encoder, SendSyncEncodeMetric, encode};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
use prometools::serde::InfoGauge;
use std::any::TypeId;
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::OnceLock;
static REGISTRIES: OnceLock<Registries> = OnceLock::new();
enum MetricsServiceName {
Prefix(String),
Label(String, String),
}
impl MetricsServiceName {
fn new(name: &str, format: ServiceNameFormat) -> Self {
let name = name.to_owned();
match format {
ServiceNameFormat::MetricPrefix => Self::Prefix(name),
ServiceNameFormat::LabelWithName(label) => Self::Label(label, name),
}
}
}
#[doc(hidden)]
pub struct Registries {
main: parking_lot::RwLock<Registry>,
opt: parking_lot::RwLock<Registry>,
pub(super) info: parking_lot::RwLock<HashMap<TypeId, Box<dyn ErasedInfoMetric>>>,
service_name: MetricsServiceName,
extra_producers: parking_lot::RwLock<Vec<Box<dyn ExtraProducer>>>,
}
impl Registries {
pub(super) fn init(service_info: &ServiceInfo, settings: &MetricsSettings) -> bool {
let service_name = MetricsServiceName::new(
&service_info.name_in_metrics,
settings.service_name_format.clone(),
);
let mut first_install = false;
REGISTRIES.get_or_init(|| {
let regs = Registries {
main: Default::default(),
opt: Default::default(),
info: Default::default(),
service_name,
extra_producers: Default::default(),
};
first_install = true;
regs
});
first_install
}
pub(super) fn collect(buffer: &mut Vec<u8>, collect_optional: bool) -> Result<()> {
let registries = Self::get();
registries.collect_info_metrics(buffer)?;
encode_registry(buffer, ®istries.main.read())?;
if collect_optional {
encode_registry(buffer, ®istries.opt.read())?;
}
for producer in registries.extra_producers.read().iter() {
producer.produce(buffer);
truncate_eof(buffer);
}
Ok(())
}
fn collect_info_metrics(&self, buffer: &mut Vec<u8>) -> Result<()> {
let info_registry = self.info.read();
let mut registry = Registry::default();
for info_metric in info_registry.values() {
let info_gauge = InfoGauge::new(&**info_metric);
let info_gauge = RewindErrorEncode(info_gauge);
registry.register(info_metric.name(), info_metric.help(), info_gauge)
}
encode_registry(buffer, ®istry)
}
pub fn get_subsystem(
subsystem: &str,
optional: bool,
with_service_prefix: bool,
) -> impl DerefMut<Target = Registry> + 'static {
let registries = Self::get();
let registry = if optional {
®istries.opt
} else {
®istries.main
};
let mut prefix = Cow::Borrowed(subsystem);
if with_service_prefix && let MetricsServiceName::Prefix(service) = ®istries.service_name
{
prefix = format!("{service}_{subsystem}").into();
}
parking_lot::RwLockWriteGuard::map(registry.write(), move |mut reg| {
if let MetricsServiceName::Label(name, val) = ®istries.service_name {
reg = reg.sub_registry_with_label((name.into(), val.into()));
}
reg.sub_registry_with_prefix(prefix)
})
}
pub fn add_extra_producer(&self, producer: Box<dyn ExtraProducer>) {
self.extra_producers.write().push(producer);
}
#[cfg(test)]
pub fn is_initialized() -> bool {
REGISTRIES.get().is_some()
}
pub(super) fn get() -> &'static Registries {
REGISTRIES.get_or_init(|| Registries {
main: Default::default(),
opt: Default::default(),
info: Default::default(),
service_name: MetricsServiceName::Prefix("undefined".to_owned()),
extra_producers: Default::default(),
})
}
}
#[info_metric(crate_path = "crate")]
pub(super) struct BuildInfo {
pub(super) version: &'static str,
}
#[info_metric(crate_path = "crate")]
pub(super) struct RuntimeInfo {
pub(super) pid: u32,
}
pub(super) trait ErasedInfoMetric: erased_serde::Serialize + Send + Sync + 'static {
fn name(&self) -> &'static str;
fn help(&self) -> &'static str;
}
erased_serde::serialize_trait_object!(ErasedInfoMetric);
impl<M> ErasedInfoMetric for M
where
M: InfoMetric,
{
fn name(&self) -> &'static str {
M::NAME
}
fn help(&self) -> &'static str {
M::HELP
}
}
std::thread_local! {
static ENCODER_REWIND_STATE: RewindState = const { RewindState::new() };
}
struct RewindErrorEncode<M>(M);
impl<M: EncodeMetric> EncodeMetric for RewindErrorEncode<M> {
fn encode(&self, encoder: Encoder) -> std::io::Result<()> {
let mut res = self.0.encode(encoder);
if res.is_err() {
let _ = ENCODER_REWIND_STATE.try_with(|s| {
if s.is_active() {
s.rewind_to(RewindTo::LastNewline);
let err = std::mem::replace(&mut res, Ok(())).unwrap_err();
report_nonfatal_collect_error(&format_args!(
"encoding metric or family: {err}"
));
}
});
}
res
}
#[inline]
fn metric_type(&self) -> MetricType {
self.0.metric_type()
}
}
pub fn wrap_metric(metric: impl SendSyncEncodeMetric + 'static) -> Box<dyn SendSyncEncodeMetric> {
Box::new(RewindErrorEncode(metric))
}
fn encode_registry(buffer: &mut Vec<u8>, registry: &Registry<impl EncodeMetric>) -> Result<()> {
ENCODER_REWIND_STATE.with(|s| {
let mut writer = s.activate(buffer);
encode(&mut writer, registry)
})?;
truncate_eof(buffer);
Ok(())
}
fn truncate_eof(buffer: &mut Vec<u8>) {
const EOF_MARKER: &[u8] = b"# EOF\n";
if buffer.ends_with(EOF_MARKER) {
buffer.truncate(buffer.len() - EOF_MARKER.len());
}
}