pub mod histogram;
mod registration;
pub mod status;
pub(crate) mod task;
pub(crate) const METRICS_PREFIX: &str = "runtime";
pub use commonware_runtime_macros::{EncodeLabelSet, EncodeLabelValue, EncodeStruct};
pub use prometheus_client::{
collector, encoding,
encoding::{
CounterValueEncoder, DescriptorEncoder, EncodeCounterValue, EncodeExemplarTime,
EncodeExemplarValue, EncodeGaugeValue, EncodeLabel, EncodeLabelKey,
EncodeLabelSet as EncodeLabelSetTrait, EncodeLabelValue as EncodeLabelValueTrait,
EncodeMetric, ExemplarValueEncoder, GaugeValueEncoder, LabelEncoder, LabelKeyEncoder,
LabelSetEncoder, LabelValueEncoder, MetricEncoder, NoLabelSet,
},
metrics::{MetricType, TypedMetric},
registry,
registry::Metric,
};
pub mod raw {
pub use prometheus_client::metrics::{
counter::Counter,
family::{self, Family},
gauge::Gauge,
histogram::Histogram,
};
}
use commonware_utils::sync::Mutex;
use prometheus_client::encoding::{
text::{encode, encode_eof},
MetricEncoder as PromMetricEncoder,
};
pub use registration::Registration;
use std::{
any::Any,
borrow::Cow,
collections::{BTreeMap, HashMap},
ops::Deref,
sync::{atomic::Ordering, Arc, Weak},
};
#[cfg(target_has_atomic = "64")]
pub type GaugeValue = i64;
#[cfg(not(target_has_atomic = "64"))]
pub type GaugeValue = i32;
pub type Counter = Registered<raw::Counter>;
pub type Gauge = Registered<raw::Gauge>;
pub type Histogram = Registered<raw::Histogram>;
pub type CounterFamily<L> = Registered<raw::Family<L, raw::Counter>>;
pub type GaugeFamily<L> = Registered<raw::Family<L, raw::Gauge>>;
pub trait GaugeExt {
fn try_set<T: TryInto<GaugeValue>>(&self, value: T) -> Result<GaugeValue, T::Error>;
fn try_set_max<T: TryInto<GaugeValue> + Copy>(&self, value: T) -> Result<GaugeValue, T::Error>;
}
impl GaugeExt for raw::Gauge {
fn try_set<T: TryInto<GaugeValue>>(&self, value: T) -> Result<GaugeValue, T::Error> {
let value = value.try_into()?;
Ok(self.set(value))
}
fn try_set_max<T: TryInto<GaugeValue> + Copy>(&self, value: T) -> Result<GaugeValue, T::Error> {
let value = value.try_into()?;
Ok(self.inner().fetch_max(value, Ordering::Relaxed))
}
}
pub use histogram::HistogramExt;
pub trait MetricsExt: crate::Metrics {
fn counter<N: Into<String>, H: Into<String>>(&self, name: N, help: H) -> Counter {
self.register(name, help, raw::Counter::default())
}
fn gauge<N: Into<String>, H: Into<String>>(&self, name: N, help: H) -> Gauge {
self.register(name, help, raw::Gauge::default())
}
fn histogram<N: Into<String>, H: Into<String>, I>(
&self,
name: N,
help: H,
buckets: I,
) -> Histogram
where
I: IntoIterator<Item = f64>,
{
self.register(name, help, raw::Histogram::new(buckets))
}
fn family<N, H, S, M>(&self, name: N, help: H) -> Registered<raw::Family<S, M>>
where
N: Into<String>,
H: Into<String>,
S: Clone + std::hash::Hash + Eq,
M: Default,
raw::Family<S, M>: Metric,
{
self.register(name, help, raw::Family::<S, M>::default())
}
}
impl<T: crate::Metrics> MetricsExt for T {}
pub fn validate_label(label: &str) {
let mut chars = label.chars();
assert!(
chars.next().is_some_and(|c| c.is_ascii_alphabetic()),
"label must start with [a-zA-Z]: {label}"
);
assert!(
chars.all(|c| c.is_ascii_alphanumeric() || c == '_'),
"label must only contain [a-zA-Z0-9_]: {label}"
);
}
pub fn add_attribute(
attributes: &mut Vec<(String, String)>,
key: &str,
value: impl std::fmt::Display,
) -> bool {
let key_string = key.to_string();
let value_string = value.to_string();
match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) {
Ok(pos) => {
attributes[pos].1 = value_string;
false
}
Err(pos) => {
attributes.insert(pos, (key_string, value_string));
true
}
}
}
#[cfg(any(test, feature = "test-utils"))]
fn matches_metric_name(full: &str, name: &str) -> bool {
full == name
|| full
.strip_suffix(name)
.is_some_and(|prefix| prefix.ends_with('_'))
}
#[cfg(any(test, feature = "test-utils"))]
#[must_use]
pub fn has_metric_value(metrics: &str, name: &str, value: impl std::fmt::Display) -> bool {
let value = value.to_string();
metrics.lines().any(|line| {
let line = line.trim();
if line.starts_with('#') {
return false;
}
let Some(sample_end) = line.find(|c: char| c == '{' || c.is_whitespace()) else {
return false;
};
let sample_name = &line[..sample_end];
if !matches_metric_name(sample_name, name) {
return false;
}
let mut rest = &line[sample_end..];
if let Some(labeled) = rest.strip_prefix('{') {
let Some(labels_end) = labeled.find('}') else {
return false;
};
rest = &labeled[labels_end + 1..];
}
if !rest.chars().next().is_some_and(char::is_whitespace) {
return false;
}
rest.split_whitespace().next() == Some(value.as_str())
})
}
#[cfg(any(test, feature = "test-utils"))]
pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize {
let encoded = metrics.encode();
encoded
.lines()
.filter_map(|line| {
if !line.starts_with("runtime_tasks_running{") || !line.contains("kind=\"Task\"") {
return None;
}
let name = line.split("name=\"").nth(1)?.split('"').next()?;
if !name.starts_with(prefix) {
return None;
}
line.trim_end().rsplit(' ').next()?.parse::<usize>().ok()
})
.sum()
}
fn encode_descriptor<W>(
writer: &mut W,
name: &str,
help: &str,
metric_type: MetricType,
) -> Result<(), std::fmt::Error>
where
W: std::fmt::Write,
{
writer.write_str("# HELP ")?;
writer.write_str(name)?;
writer.write_str(" ")?;
writer.write_str(help)?;
writer.write_str("\n# TYPE ")?;
writer.write_str(name)?;
writer.write_str(" ")?;
writer.write_str(metric_type.as_str())?;
writer.write_str("\n")?;
Ok(())
}
pub(crate) fn prefixed_name(prefix: &str, name: &str) -> String {
if prefix.is_empty() {
name.to_string()
} else {
format!("{prefix}_{name}")
}
}
pub(crate) fn child_label(prefix: &str, label: &str) -> String {
validate_label(label);
let name = prefixed_name(prefix, label);
assert!(
!name.starts_with(METRICS_PREFIX),
"using runtime label is not allowed"
);
name
}
struct RegistryGuard {
id: usize,
registry: Weak<Mutex<RegistryInner>>,
}
impl Drop for RegistryGuard {
fn drop(&mut self) {
let Some(registry) = self.registry.upgrade() else {
return;
};
registry.lock().release_registration(self.id);
}
}
#[must_use = "registered metrics are removed when the returned handle is dropped"]
pub struct Registered<M> {
metric: Arc<M>,
registration: Registration,
}
impl<M> Clone for Registered<M> {
fn clone(&self) -> Self {
Self {
metric: self.metric.clone(),
registration: self.registration.clone(),
}
}
}
impl<M> Registered<M> {
pub fn with_registration(metric: M, registration: Registration) -> Self {
Self {
metric: Arc::new(metric),
registration,
}
}
pub fn metric(&self) -> &M {
self.metric.as_ref()
}
}
impl<S, M, C> Registered<raw::Family<S, M, C>>
where
S: Clone + std::hash::Hash + Eq,
C: raw::family::MetricConstructor<M>,
{
pub fn get_by<Q>(&self, label_set: &Q) -> Option<impl Deref<Target = M> + '_>
where
for<'a> S: From<&'a Q>,
{
let label_set = S::from(label_set);
self.get(&label_set)
}
pub fn get_or_create_by<Q>(&self, label_set: &Q) -> impl Deref<Target = M> + '_
where
for<'a> S: From<&'a Q>,
{
let label_set = S::from(label_set);
self.get_or_create(&label_set)
}
pub fn remove_by<Q>(&self, label_set: &Q) -> bool
where
for<'a> S: From<&'a Q>,
{
let label_set = S::from(label_set);
self.remove(&label_set)
}
}
impl<M> Deref for Registered<M> {
type Target = M;
fn deref(&self) -> &Self::Target {
self.metric()
}
}
impl<M: std::fmt::Debug> std::fmt::Debug for Registered<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Registered")
.field("metric", self.metric())
.finish_non_exhaustive()
}
}
type MetricAttributes = Vec<(Cow<'static, str>, Cow<'static, str>)>;
type MetricKey = (String, MetricAttributes);
type SampleEncoder = dyn Fn(&mut String) -> Result<(), std::fmt::Error> + Send + Sync;
struct PendingMetricEntry {
family_name: String,
attributes: MetricAttributes,
encode_samples: Box<SampleEncoder>,
metric_any: Arc<dyn Any + Send + Sync>,
}
pub(crate) struct SharedMetric<M>(pub(crate) Arc<M>);
impl<M: std::fmt::Debug> std::fmt::Debug for SharedMetric<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<M: EncodeMetric> EncodeMetric for SharedMetric<M> {
fn encode(&self, encoder: PromMetricEncoder<'_>) -> Result<(), std::fmt::Error> {
self.0.encode(encoder)
}
fn metric_type(&self) -> MetricType {
self.0.metric_type()
}
}
fn create_sample_encoder<M>(
name: String,
labels: MetricAttributes,
metric: Arc<M>,
) -> Box<SampleEncoder>
where
M: Metric,
{
let mut registry = registry::Registry::with_labels(labels.into_iter());
registry.register(name, "", SharedMetric(metric));
Box::new(move |samples| {
let mut encoded = String::new();
encode(&mut encoded, ®istry).expect("encoding temporary metric registry failed");
for line in encoded.lines() {
if line.starts_with('#') {
continue;
}
samples.push_str(line);
samples.push('\n');
}
Ok(())
})
}
fn owned_attributes(attributes: Vec<(String, String)>) -> MetricAttributes {
attributes
.into_iter()
.map(|(k, v)| (Cow::Owned(k), Cow::Owned(v)))
.collect()
}
fn normalize_help(help: String) -> String {
help + "."
}
struct MetricEntry {
family_name: String,
attributes: MetricAttributes,
encode_samples: Box<SampleEncoder>,
metric_any: Arc<dyn Any + Send + Sync>,
claims: usize,
family_index: usize,
}
#[derive(Debug)]
struct MetricFamily {
help: String,
metric_type: MetricType,
descriptor: String,
metric_ids: Vec<usize>,
}
#[derive(Clone)]
pub struct Registry {
inner: Arc<Mutex<RegistryInner>>,
}
struct RegistryInner {
metrics: Vec<Option<MetricEntry>>,
free_metric_ids: Vec<usize>,
families: BTreeMap<String, MetricFamily>,
keys: HashMap<MetricKey, usize>,
next_metric_id: usize,
}
impl Default for Registry {
fn default() -> Self {
Self::new()
}
}
impl Registry {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(RegistryInner::new())),
}
}
pub(crate) fn register<M>(
&self,
name: String,
help: String,
attributes: Vec<(String, String)>,
metric: Arc<M>,
) -> Registered<M>
where
M: Metric,
{
let mut inner = self.inner.lock();
inner.register(Arc::downgrade(&self.inner), name, help, attributes, metric)
}
pub fn encode(&self) -> String {
self.inner.lock().encode()
}
}
impl RegistryInner {
fn new() -> Self {
Self {
metrics: Vec::new(),
free_metric_ids: Vec::new(),
families: BTreeMap::new(),
keys: HashMap::new(),
next_metric_id: 0,
}
}
fn register<M>(
&mut self,
registry: Weak<Mutex<Self>>,
name: String,
help: String,
attributes: Vec<(String, String)>,
metric: Arc<M>,
) -> Registered<M>
where
M: Metric,
{
let attributes = owned_attributes(attributes);
let help = normalize_help(help);
let metric_type = metric.metric_type();
let encode_samples =
create_sample_encoder(name.clone(), attributes.clone(), metric.clone());
let key = (name.clone(), attributes.clone());
if let Some(existing_id) = self.keys.get(&key).copied() {
let entry = self.metric_ref(existing_id);
if let Some(family) = self.families.get(&name) {
assert_eq!(
family.help, help,
"metric family `{}` registered with inconsistent help text",
name
);
}
let existing_metric = Arc::clone(&entry.metric_any)
.downcast::<M>()
.unwrap_or_else(|_| {
panic!(
"duplicate metric `{}` with attributes {:?} registered with different type",
key.0, key.1
)
});
self.claim_registration(existing_id);
return Registered {
metric: existing_metric,
registration: Registration::from(RegistryGuard {
id: existing_id,
registry,
}),
};
}
self.assert_family_matches(&name, &help, metric_type);
let id = self.allocate_metric_id();
let registration = Registration::from(RegistryGuard { id, registry });
let metric_any: Arc<dyn Any + Send + Sync> = metric.clone();
self.insert_metric_entry(
id,
help,
metric_type,
PendingMetricEntry {
family_name: name,
attributes,
encode_samples,
metric_any,
},
);
Registered {
metric,
registration,
}
}
fn metric_slot_mut(&mut self, id: usize) -> &mut Option<MetricEntry> {
if id == self.metrics.len() {
self.metrics.push(None);
}
&mut self.metrics[id]
}
fn metric_ref(&self, id: usize) -> &MetricEntry {
self.metrics
.get(id)
.and_then(Option::as_ref)
.expect("metric id missing from registry")
}
fn metric_mut(&mut self, id: usize) -> &mut MetricEntry {
self.metrics
.get_mut(id)
.and_then(Option::as_mut)
.expect("metric id missing from registry")
}
fn allocate_metric_id(&mut self) -> usize {
if let Some(id) = self.free_metric_ids.pop() {
return id;
}
let id = self.next_metric_id;
self.next_metric_id = self
.next_metric_id
.checked_add(1)
.expect("metric id overflow");
id
}
fn assert_family_matches(&self, name: &str, help: &str, metric_type: MetricType) {
if let Some(family) = self.families.get(name) {
assert_eq!(
family.help, help,
"metric family `{}` registered with inconsistent help text",
name
);
assert_eq!(
family.metric_type.as_str(),
metric_type.as_str(),
"metric family `{}` registered with inconsistent metric type",
name
);
}
}
fn insert_metric_entry(
&mut self,
id: usize,
help: String,
metric_type: MetricType,
entry: PendingMetricEntry,
) {
let PendingMetricEntry {
family_name,
attributes,
encode_samples,
metric_any,
} = entry;
self.keys
.insert((family_name.clone(), attributes.clone()), id);
let family = match self.families.entry(family_name.clone()) {
std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(),
std::collections::btree_map::Entry::Vacant(entry) => {
let mut descriptor = String::new();
encode_descriptor(&mut descriptor, &family_name, &help, metric_type)
.expect("encoding cached descriptor failed");
entry.insert(MetricFamily {
help,
metric_type,
descriptor,
metric_ids: Vec::new(),
})
}
};
let family_index = family.metric_ids.len();
family.metric_ids.push(id);
self.metric_slot_mut(id).replace(MetricEntry {
family_name,
attributes,
encode_samples,
metric_any,
claims: 1,
family_index,
});
}
fn claim_registration(&mut self, id: usize) {
let entry = self.metric_mut(id);
entry.claims = entry
.claims
.checked_add(1)
.expect("registration claims overflow");
}
fn release_registration(&mut self, id: usize) {
let entry = self.metric_mut(id);
entry.claims = entry
.claims
.checked_sub(1)
.expect("registration claim count underflow");
if entry.claims > 0 {
return;
}
self.drop_metric_entry(id);
}
fn drop_metric_entry(&mut self, id: usize) {
let metric = self
.metrics
.get_mut(id)
.and_then(Option::take)
.expect("metric id missing from registry");
let MetricEntry {
family_name,
attributes,
family_index,
..
} = metric;
let key = (family_name, attributes);
if self.keys.get(&key).copied() == Some(id) {
self.keys.remove(&key);
}
let (family_name, _) = key;
let (swapped_metric_id, remove_family) = {
let family = self
.families
.get_mut(&family_name)
.expect("family missing during unregister");
let removed = family.metric_ids.swap_remove(family_index);
assert_eq!(removed, id, "family index mismatch during unregister");
let swapped = family.metric_ids.get(family_index).copied();
(swapped, family.metric_ids.is_empty())
};
if let Some(swapped_metric_id) = swapped_metric_id {
self.metric_mut(swapped_metric_id).family_index = family_index;
}
if remove_family {
self.families.remove(&family_name);
}
self.free_metric_ids.push(id);
}
pub fn encode(&self) -> String {
let mut output = String::new();
let mut samples = String::new();
for family in self.families.values() {
samples.clear();
for metric_id in &family.metric_ids {
let metric = self.metric_ref(*metric_id);
(metric.encode_samples)(&mut samples).expect("encoding live metric samples failed");
}
if samples.is_empty() {
continue;
}
output.push_str(&family.descriptor);
output.push_str(&samples);
}
encode_eof(&mut output).expect("encoding EOF failed");
output
}
}
pub(crate) struct Scope {
registry: Registry,
prefix: String,
}
pub(crate) trait Register {
fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M>;
fn sub_registry(&mut self, prefix: &str) -> Scope;
}
impl Register for Registry {
fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M> {
validate_label(name);
Self::register(
self,
name.to_string(),
help.to_string(),
Vec::new(),
Arc::new(metric),
)
}
fn sub_registry(&mut self, prefix: &str) -> Scope {
validate_label(prefix);
Scope {
registry: self.clone(),
prefix: prefix.to_string(),
}
}
}
impl Register for Scope {
fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M> {
validate_label(name);
let name = prefixed_name(&self.prefix, name);
let help = help.to_string();
let metric = Arc::new(metric);
Registry::register(&self.registry, name, help, Vec::new(), metric)
}
fn sub_registry(&mut self, prefix: &str) -> Scope {
validate_label(prefix);
Self {
registry: self.registry.clone(),
prefix: prefixed_name(&self.prefix, prefix),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{deterministic, Metrics as _, Runner, Spawner, Supervisor as _};
use commonware_macros::test_traced;
use futures::future;
use std::sync::mpsc::{self, TryRecvError};
#[test]
fn test_has_metric_value_unlabeled() {
let metrics = "# HELP storage_items_tracked items\nstorage_items_tracked 2\n";
assert!(has_metric_value(metrics, "items_tracked", 2));
assert!(has_metric_value(metrics, "storage_items_tracked", 2));
assert!(!has_metric_value(metrics, "items_tracked_extra", 2));
assert!(!has_metric_value(metrics, "items_tracked", 3));
}
#[test]
fn test_has_metric_value_labeled() {
let metrics = r#"storage_init_items_tracked{index="2"} 2"#;
assert!(has_metric_value(metrics, "items_tracked", 2));
assert!(has_metric_value(metrics, "storage_init_items_tracked", 2));
}
#[test_traced]
fn test_count_running_tasks() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
assert_eq!(
count_running_tasks(&context, "worker"),
0,
"no worker tasks initially"
);
let handle1 = context.child("worker").spawn(|_| async move {
future::pending::<()>().await;
});
let count = count_running_tasks(&context, "worker");
assert_eq!(count, 1, "worker task should be running");
assert_eq!(
count_running_tasks(&context, "other"),
0,
"no tasks with 'other' prefix"
);
let handle2 = context
.child("worker")
.child("child")
.spawn(|_| async move {
future::pending::<()>().await;
});
let count = count_running_tasks(&context, "worker");
assert_eq!(count, 2, "both worker and worker_child should be counted");
handle1.abort();
let _ = handle1.await;
let count = count_running_tasks(&context, "worker");
assert_eq!(count, 1, "only worker_child should remain");
handle2.abort();
let _ = handle2.await;
assert_eq!(
count_running_tasks(&context, "worker"),
0,
"all worker tasks should be stopped"
);
});
}
#[test_traced]
fn test_no_duplicate_metrics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let c1 = raw::Counter::<u64>::default();
let _metric_a = context.child("a").register("test", "help", c1);
let c2 = raw::Counter::<u64>::default();
let _metric_b = context.child("b").register("test", "help", c2);
});
}
#[test_traced]
fn test_duplicate_metrics_reuse_existing_handle() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let c1 = raw::Counter::<u64>::default();
let metric_a = context.child("a").register("test", "help", c1);
let c2 = raw::Counter::<u64>::default();
let metric_b = context.child("a").register("test", "help", c2);
assert!(std::ptr::eq(metric_a.metric(), metric_b.metric()));
metric_a.inc();
metric_b.inc_by(2);
let encoded = context.encode();
assert!(encoded.contains("a_test_total 3"));
});
}
#[test]
fn test_claims_track_register_calls_not_handle_clones() {
let registry = Registry::new();
let key: MetricKey = ("votes".to_string(), Vec::new());
let first = registry.register(
key.0.clone(),
"vote count".to_string(),
Vec::new(),
Arc::new(raw::Counter::<u64>::default()),
);
let first_clone = first.clone();
let id = {
let registry = registry.inner.lock();
let id = *registry.keys.get(&key).expect("metric key missing");
assert_eq!(registry.metric_ref(id).claims, 1);
id
};
let second = registry.register(
key.0,
"vote count".to_string(),
Vec::new(),
Arc::new(raw::Counter::<u64>::default()),
);
let second_clone = second.clone();
{
let registry = registry.inner.lock();
assert_eq!(registry.metric_ref(id).claims, 2);
}
drop(first);
drop(second);
{
let registry = registry.inner.lock();
assert_eq!(registry.metric_ref(id).claims, 2);
}
drop(second_clone);
{
let registry = registry.inner.lock();
assert_eq!(registry.metric_ref(id).claims, 1);
}
drop(first_clone);
let registry = registry.inner.lock();
assert!(
registry.keys.is_empty(),
"keys left behind: {:?}",
registry.keys
);
assert!(
registry.families.is_empty(),
"families left behind: {:?}",
registry.families
);
}
#[test]
#[should_panic(expected = "registered with different type")]
fn test_duplicate_metrics_different_type_panics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let counter = raw::Counter::<u64>::default();
let _metric_a = context.child("a").register("test", "help", counter);
let gauge = raw::Gauge::<i64>::default();
let _metric_b = context.child("a").register("test", "help", gauge);
});
}
#[test]
fn test_duplicate_register_acquires_during_last_drop_window() {
let registry = Registry::new();
let key: MetricKey = ("votes".to_string(), Vec::new());
let original = registry.register(
key.0.clone(),
"vote count".to_string(),
Vec::new(),
Arc::new(raw::Counter::<u64>::default()),
);
let original_metric = Arc::clone(&original.metric);
let _original = std::mem::ManuallyDrop::new(original);
let original_id = {
let registry = registry.inner.lock();
*registry.keys.get(&key).expect("metric key missing")
};
let duplicate = registry.register(
key.0,
"vote count".to_string(),
Vec::new(),
Arc::new(raw::Counter::<u64>::default()),
);
assert!(Arc::ptr_eq(&original_metric, &duplicate.metric));
registry.inner.lock().release_registration(original_id);
duplicate.inc_by(7);
let encoded = registry.encode();
assert!(
encoded.contains("votes_total 7"),
"last drop removed duplicate registration: {encoded}"
);
drop(duplicate);
let registry = registry.inner.lock();
assert!(
registry.keys.is_empty(),
"keys left behind: {:?}",
registry.keys
);
assert!(
registry.families.is_empty(),
"families left behind: {:?}",
registry.families
);
}
#[test]
fn test_registered_with_registration_notifies_on_last_drop() {
struct NotifyOnDrop(mpsc::Sender<&'static str>);
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
let _ = self.0.send("dropped");
}
}
let (tx, rx) = mpsc::channel();
let registered = Registered::with_registration(
raw::Counter::<u64>::default(),
Registration::from(NotifyOnDrop(tx)),
);
let clone = registered.clone();
drop(registered);
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
drop(clone);
assert_eq!(rx.recv().unwrap(), "dropped");
assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
}
fn register_counter(registry: &Registry, name: &str, help: &str, value: u64) -> Counter {
let counter = raw::Counter::<u64>::default();
counter.inc_by(value);
registry.register(
name.to_string(),
help.to_string(),
Vec::new(),
Arc::new(counter),
)
}
#[test]
fn test_encode_is_deterministic() {
let registry = Registry::default();
let _beta = register_counter(®istry, "beta", "beta counter", 2);
let _alpha = register_counter(®istry, "alpha", "alpha counter", 1);
let first = registry.encode();
let second = registry.encode();
assert_eq!(first, second);
let alpha = first
.find("# TYPE alpha")
.expect("alpha family header present");
let beta = first
.find("# TYPE beta")
.expect("beta family header present");
assert!(alpha < beta, "families emitted in sorted order: {first}");
}
#[test]
fn test_encode_emits_single_eof() {
let registry = Registry::default();
let _a = register_counter(®istry, "a", "help", 1);
let _b = register_counter(®istry, "b", "help", 2);
let encoded = registry.encode();
assert_eq!(encoded.matches("# EOF").count(), 1);
assert!(
encoded.ends_with("# EOF\n"),
"must terminate with EOF: {encoded}"
);
}
#[test]
fn test_encode_type_aware_suffixes() {
let registry = Registry::default();
let _requests = register_counter(®istry, "requests", "request count", 3);
let histogram = raw::Histogram::new([0.1, 1.0, 10.0]);
histogram.observe(0.5);
let _histogram = registry.register(
"latency".to_string(),
"latency seconds".to_string(),
Vec::new(),
Arc::new(histogram),
);
let encoded = registry.encode();
assert!(
encoded.contains("requests_total 3"),
"counter _total suffix: {encoded}"
);
assert!(
encoded.contains("latency_bucket"),
"histogram _bucket suffix: {encoded}"
);
assert!(
encoded.contains("latency_sum"),
"histogram _sum suffix: {encoded}"
);
assert!(
encoded.contains("latency_count"),
"histogram _count suffix: {encoded}"
);
}
#[test]
fn test_encode_shares_family_header_across_attributes() {
let registry = Registry::default();
let c1 = raw::Counter::<u64>::default();
c1.inc();
let _c1 = registry.register(
"votes".to_string(),
"vote count".to_string(),
vec![("epoch".to_string(), "1".to_string())],
Arc::new(c1),
);
let c2 = raw::Counter::<u64>::default();
c2.inc_by(2);
let _c2 = registry.register(
"votes".to_string(),
"vote count".to_string(),
vec![("epoch".to_string(), "2".to_string())],
Arc::new(c2),
);
let encoded = registry.encode();
assert_eq!(
encoded.matches("# HELP votes").count(),
1,
"single HELP: {encoded}"
);
assert_eq!(
encoded.matches("# TYPE votes").count(),
1,
"single TYPE: {encoded}"
);
assert!(encoded.contains("votes_total{epoch=\"1\"} 1"));
assert!(encoded.contains("votes_total{epoch=\"2\"} 2"));
}
#[test]
fn test_encode_registers_without_prefix() {
let registry = Registry::default();
let _registered = register_counter(®istry, "votes", "vote count", 1);
let encoded = registry.encode();
assert!(
encoded.contains("votes_total 1"),
"no prefix applied: {encoded}"
);
assert!(
encoded.starts_with("# HELP votes"),
"family header at start: {encoded}"
);
}
#[test]
fn test_encode_suppresses_empty_family() {
let registry = Registry::default();
let empty_family = raw::Family::<Vec<(String, String)>, raw::Counter>::default();
let _empty_family = registry.register(
"votes".to_string(),
"vote count".to_string(),
Vec::new(),
Arc::new(empty_family),
);
let _ticks = register_counter(®istry, "ticks", "tick count", 1);
let encoded = registry.encode();
assert!(!encoded.contains("votes"), "empty family leaked: {encoded}");
assert!(
encoded.contains("ticks_total 1"),
"populated metric missing: {encoded}"
);
assert_eq!(encoded.matches("# EOF").count(), 1);
}
#[test]
fn test_encode_matches_upstream_registry() {
let counter = raw::Counter::<u64>::default();
counter.inc_by(7);
let gauge = raw::Gauge::<i64>::default();
gauge.set(-3);
let histogram = raw::Histogram::new([0.1, 1.0]);
histogram.observe(0.5);
let ours = Registry::default();
let _latency = ours.register(
"latency".to_string(),
"request latency seconds".to_string(),
Vec::new(),
Arc::new(histogram.clone()),
);
let _level = ours.register(
"level".to_string(),
"current level".to_string(),
Vec::new(),
Arc::new(gauge.clone()),
);
let _votes = ours.register(
"votes".to_string(),
"number of votes".to_string(),
Vec::new(),
Arc::new(counter.clone()),
);
let ours_encoded = ours.encode();
let mut theirs = registry::Registry::default();
theirs.register("latency", "request latency seconds", histogram);
theirs.register("level", "current level", gauge);
theirs.register("votes", "number of votes", counter);
let mut theirs_encoded = String::new();
encode(&mut theirs_encoded, &theirs).expect("upstream encode failed");
assert_eq!(
ours_encoded, theirs_encoded,
"output diverged from upstream prometheus-client registry"
);
}
#[test]
fn test_shuffled_duplicate_drops_do_not_leave_registry_entries() {
let registry = Registry::new();
let mut handles = Vec::new();
for _ in 0..8 {
handles.push(registry.register(
"votes".to_string(),
"vote count".to_string(),
Vec::new(),
Arc::new(raw::Counter::<u64>::default()),
));
}
for index in [3, 0, 6, 1] {
let _ = handles.swap_remove(index);
handles.push(registry.register(
"votes".to_string(),
"vote count".to_string(),
Vec::new(),
Arc::new(raw::Counter::<u64>::default()),
));
}
drop(handles);
let registry = registry.inner.lock();
assert!(
registry.keys.is_empty(),
"keys left behind: {:?}",
registry.keys
);
assert!(
registry.families.is_empty(),
"families left behind: {:?}",
registry.families
);
}
}