1pub mod histogram;
4mod registration;
5pub mod status;
6pub(crate) mod task;
7
8pub(crate) const METRICS_PREFIX: &str = "runtime";
10
11pub use commonware_runtime_macros::{EncodeLabelSet, EncodeLabelValue, EncodeStruct};
12pub use prometheus_client::{
13 collector, encoding,
14 encoding::{
15 CounterValueEncoder, DescriptorEncoder, EncodeCounterValue, EncodeExemplarTime,
16 EncodeExemplarValue, EncodeGaugeValue, EncodeLabel, EncodeLabelKey,
17 EncodeLabelSet as EncodeLabelSetTrait, EncodeLabelValue as EncodeLabelValueTrait,
18 EncodeMetric, ExemplarValueEncoder, GaugeValueEncoder, LabelEncoder, LabelKeyEncoder,
19 LabelSetEncoder, LabelValueEncoder, MetricEncoder, NoLabelSet,
20 },
21 metrics::{MetricType, TypedMetric},
22 registry,
23 registry::Metric,
24};
25
26pub mod raw {
29 pub use prometheus_client::metrics::{
30 counter::Counter,
31 family::{self, Family},
32 gauge::Gauge,
33 histogram::Histogram,
34 };
35}
36
37use commonware_utils::sync::Mutex;
38use prometheus_client::encoding::{
39 text::{encode, encode_eof},
40 MetricEncoder as PromMetricEncoder,
41};
42pub use registration::Registration;
43use std::{
44 any::Any,
45 borrow::Cow,
46 collections::{BTreeMap, HashMap},
47 ops::Deref,
48 sync::{atomic::Ordering, Arc, Weak},
49};
50
51#[cfg(target_has_atomic = "64")]
56pub type GaugeValue = i64;
57#[cfg(not(target_has_atomic = "64"))]
58pub type GaugeValue = i32;
59
60pub type Counter = Registered<raw::Counter>;
62pub type Gauge = Registered<raw::Gauge>;
64pub type Histogram = Registered<raw::Histogram>;
66pub type CounterFamily<L> = Registered<raw::Family<L, raw::Counter>>;
68pub type GaugeFamily<L> = Registered<raw::Family<L, raw::Gauge>>;
70
71pub trait GaugeExt {
73 fn try_set<T: TryInto<GaugeValue>>(&self, value: T) -> Result<GaugeValue, T::Error>;
75
76 fn try_set_max<T: TryInto<GaugeValue> + Copy>(&self, value: T) -> Result<GaugeValue, T::Error>;
78}
79
80impl GaugeExt for raw::Gauge {
81 fn try_set<T: TryInto<GaugeValue>>(&self, value: T) -> Result<GaugeValue, T::Error> {
82 let value = value.try_into()?;
83 Ok(self.set(value))
84 }
85
86 fn try_set_max<T: TryInto<GaugeValue> + Copy>(&self, value: T) -> Result<GaugeValue, T::Error> {
87 let value = value.try_into()?;
88 Ok(self.inner().fetch_max(value, Ordering::Relaxed))
89 }
90}
91
92pub use histogram::HistogramExt;
93
94pub trait MetricsExt: crate::Metrics {
96 fn counter<N: Into<String>, H: Into<String>>(&self, name: N, help: H) -> Counter {
98 self.register(name, help, raw::Counter::default())
99 }
100
101 fn gauge<N: Into<String>, H: Into<String>>(&self, name: N, help: H) -> Gauge {
103 self.register(name, help, raw::Gauge::default())
104 }
105
106 fn histogram<N: Into<String>, H: Into<String>, I>(
108 &self,
109 name: N,
110 help: H,
111 buckets: I,
112 ) -> Histogram
113 where
114 I: IntoIterator<Item = f64>,
115 {
116 self.register(name, help, raw::Histogram::new(buckets))
117 }
118
119 fn family<N, H, S, M>(&self, name: N, help: H) -> Registered<raw::Family<S, M>>
121 where
122 N: Into<String>,
123 H: Into<String>,
124 S: Clone + std::hash::Hash + Eq,
125 M: Default,
126 raw::Family<S, M>: Metric,
127 {
128 self.register(name, help, raw::Family::<S, M>::default())
129 }
130}
131
132impl<T: crate::Metrics> MetricsExt for T {}
133
134pub fn validate_label(label: &str) {
141 let mut chars = label.chars();
142 assert!(
143 chars.next().is_some_and(|c| c.is_ascii_alphabetic()),
144 "label must start with [a-zA-Z]: {label}"
145 );
146 assert!(
147 chars.all(|c| c.is_ascii_alphanumeric() || c == '_'),
148 "label must only contain [a-zA-Z0-9_]: {label}"
149 );
150}
151
152pub fn add_attribute(
156 attributes: &mut Vec<(String, String)>,
157 key: &str,
158 value: impl std::fmt::Display,
159) -> bool {
160 let key_string = key.to_string();
161 let value_string = value.to_string();
162
163 match attributes.binary_search_by(|(k, _)| k.cmp(&key_string)) {
164 Ok(pos) => {
165 attributes[pos].1 = value_string;
166 false
167 }
168 Err(pos) => {
169 attributes.insert(pos, (key_string, value_string));
170 true
171 }
172 }
173}
174
175#[cfg(any(test, feature = "test-utils"))]
176fn matches_metric_name(full: &str, name: &str) -> bool {
177 full == name
178 || full
179 .strip_suffix(name)
180 .is_some_and(|prefix| prefix.ends_with('_'))
181}
182
183#[cfg(any(test, feature = "test-utils"))]
188#[must_use]
189pub fn has_metric_value(metrics: &str, name: &str, value: impl std::fmt::Display) -> bool {
190 let value = value.to_string();
191 metrics.lines().any(|line| {
192 let line = line.trim();
193 if line.starts_with('#') {
194 return false;
195 }
196
197 let Some(sample_end) = line.find(|c: char| c == '{' || c.is_whitespace()) else {
198 return false;
199 };
200 let sample_name = &line[..sample_end];
201 if !matches_metric_name(sample_name, name) {
202 return false;
203 }
204
205 let mut rest = &line[sample_end..];
206 if let Some(labeled) = rest.strip_prefix('{') {
207 let Some(labels_end) = labeled.find('}') else {
208 return false;
209 };
210 rest = &labeled[labels_end + 1..];
211 }
212 if !rest.chars().next().is_some_and(char::is_whitespace) {
213 return false;
214 }
215
216 rest.split_whitespace().next() == Some(value.as_str())
217 })
218}
219
220#[cfg(any(test, feature = "test-utils"))]
262pub fn count_running_tasks(metrics: &impl crate::Metrics, prefix: &str) -> usize {
263 let encoded = metrics.encode();
264 encoded
265 .lines()
266 .filter_map(|line| {
267 if !line.starts_with("runtime_tasks_running{") || !line.contains("kind=\"Task\"") {
268 return None;
269 }
270 let name = line.split("name=\"").nth(1)?.split('"').next()?;
271 if !name.starts_with(prefix) {
272 return None;
273 }
274 line.trim_end().rsplit(' ').next()?.parse::<usize>().ok()
275 })
276 .sum()
277}
278
279fn encode_descriptor<W>(
284 writer: &mut W,
285 name: &str,
286 help: &str,
287 metric_type: MetricType,
288) -> Result<(), std::fmt::Error>
289where
290 W: std::fmt::Write,
291{
292 writer.write_str("# HELP ")?;
293 writer.write_str(name)?;
294 writer.write_str(" ")?;
295 writer.write_str(help)?;
296 writer.write_str("\n# TYPE ")?;
297 writer.write_str(name)?;
298 writer.write_str(" ")?;
299 writer.write_str(metric_type.as_str())?;
300 writer.write_str("\n")?;
301 Ok(())
302}
303
304pub(crate) fn prefixed_name(prefix: &str, name: &str) -> String {
306 if prefix.is_empty() {
307 name.to_string()
308 } else {
309 format!("{prefix}_{name}")
310 }
311}
312
313pub(crate) fn child_label(prefix: &str, label: &str) -> String {
316 validate_label(label);
317 let name = prefixed_name(prefix, label);
318 assert!(
319 !name.starts_with(METRICS_PREFIX),
320 "using runtime label is not allowed"
321 );
322 name
323}
324
325struct RegistryGuard {
326 id: usize,
327 registry: Weak<Mutex<RegistryInner>>,
328}
329
330impl Drop for RegistryGuard {
331 fn drop(&mut self) {
332 let Some(registry) = self.registry.upgrade() else {
333 return;
334 };
335 registry.lock().release_registration(self.id);
336 }
337}
338
339#[must_use = "registered metrics are removed when the returned handle is dropped"]
341pub struct Registered<M> {
342 metric: Arc<M>,
343 registration: Registration,
344}
345
346impl<M> Clone for Registered<M> {
347 fn clone(&self) -> Self {
348 Self {
349 metric: self.metric.clone(),
350 registration: self.registration.clone(),
351 }
352 }
353}
354
355impl<M> Registered<M> {
356 pub fn with_registration(metric: M, registration: Registration) -> Self {
362 Self {
363 metric: Arc::new(metric),
364 registration,
365 }
366 }
367
368 pub fn metric(&self) -> &M {
369 self.metric.as_ref()
370 }
371}
372
373impl<S, M, C> Registered<raw::Family<S, M, C>>
374where
375 S: Clone + std::hash::Hash + Eq,
376 C: raw::family::MetricConstructor<M>,
377{
378 pub fn get_by<Q>(&self, label_set: &Q) -> Option<impl Deref<Target = M> + '_>
379 where
380 for<'a> S: From<&'a Q>,
381 {
382 let label_set = S::from(label_set);
383 self.get(&label_set)
384 }
385
386 pub fn get_or_create_by<Q>(&self, label_set: &Q) -> impl Deref<Target = M> + '_
387 where
388 for<'a> S: From<&'a Q>,
389 {
390 let label_set = S::from(label_set);
391 self.get_or_create(&label_set)
392 }
393
394 pub fn remove_by<Q>(&self, label_set: &Q) -> bool
395 where
396 for<'a> S: From<&'a Q>,
397 {
398 let label_set = S::from(label_set);
399 self.remove(&label_set)
400 }
401}
402
403impl<M> Deref for Registered<M> {
404 type Target = M;
405
406 fn deref(&self) -> &Self::Target {
407 self.metric()
408 }
409}
410
411impl<M: std::fmt::Debug> std::fmt::Debug for Registered<M> {
412 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413 f.debug_struct("Registered")
414 .field("metric", self.metric())
415 .finish_non_exhaustive()
416 }
417}
418
419type MetricAttributes = Vec<(Cow<'static, str>, Cow<'static, str>)>;
420type MetricKey = (String, MetricAttributes);
421type SampleEncoder = dyn Fn(&mut String) -> Result<(), std::fmt::Error> + Send + Sync;
422
423struct PendingMetricEntry {
424 family_name: String,
425 attributes: MetricAttributes,
426 encode_samples: Box<SampleEncoder>,
427 metric_any: Arc<dyn Any + Send + Sync>,
428}
429
430pub(crate) struct SharedMetric<M>(pub(crate) Arc<M>);
431
432impl<M: std::fmt::Debug> std::fmt::Debug for SharedMetric<M> {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 self.0.fmt(f)
435 }
436}
437
438impl<M: EncodeMetric> EncodeMetric for SharedMetric<M> {
439 fn encode(&self, encoder: PromMetricEncoder<'_>) -> Result<(), std::fmt::Error> {
440 self.0.encode(encoder)
441 }
442
443 fn metric_type(&self) -> MetricType {
444 self.0.metric_type()
445 }
446}
447
448fn create_sample_encoder<M>(
449 name: String,
450 labels: MetricAttributes,
451 metric: Arc<M>,
452) -> Box<SampleEncoder>
453where
454 M: Metric,
455{
456 let mut registry = registry::Registry::with_labels(labels.into_iter());
460 registry.register(name, "", SharedMetric(metric));
461
462 Box::new(move |samples| {
463 let mut encoded = String::new();
464 encode(&mut encoded, ®istry).expect("encoding temporary metric registry failed");
465 for line in encoded.lines() {
466 if line.starts_with('#') {
467 continue;
468 }
469 samples.push_str(line);
470 samples.push('\n');
471 }
472 Ok(())
473 })
474}
475
476fn owned_attributes(attributes: Vec<(String, String)>) -> MetricAttributes {
477 attributes
478 .into_iter()
479 .map(|(k, v)| (Cow::Owned(k), Cow::Owned(v)))
480 .collect()
481}
482
483fn normalize_help(help: String) -> String {
488 help + "."
489}
490
491struct MetricEntry {
492 family_name: String,
493 attributes: MetricAttributes,
494 encode_samples: Box<SampleEncoder>,
495 metric_any: Arc<dyn Any + Send + Sync>,
496 claims: usize,
497 family_index: usize,
498}
499
500#[derive(Debug)]
501struct MetricFamily {
502 help: String,
503 metric_type: MetricType,
504 descriptor: String,
505 metric_ids: Vec<usize>,
506}
507
508#[derive(Clone)]
510pub struct Registry {
511 inner: Arc<Mutex<RegistryInner>>,
512}
513
514struct RegistryInner {
515 metrics: Vec<Option<MetricEntry>>,
517 free_metric_ids: Vec<usize>,
519 families: BTreeMap<String, MetricFamily>,
521 keys: HashMap<MetricKey, usize>,
523 next_metric_id: usize,
525}
526
527impl Default for Registry {
528 fn default() -> Self {
529 Self::new()
530 }
531}
532
533impl Registry {
534 pub fn new() -> Self {
535 Self {
536 inner: Arc::new(Mutex::new(RegistryInner::new())),
537 }
538 }
539
540 pub(crate) fn register<M>(
541 &self,
542 name: String,
543 help: String,
544 attributes: Vec<(String, String)>,
545 metric: Arc<M>,
546 ) -> Registered<M>
547 where
548 M: Metric,
549 {
550 let mut inner = self.inner.lock();
551 inner.register(Arc::downgrade(&self.inner), name, help, attributes, metric)
552 }
553
554 pub fn encode(&self) -> String {
555 self.inner.lock().encode()
556 }
557}
558
559impl RegistryInner {
560 fn new() -> Self {
561 Self {
562 metrics: Vec::new(),
563 free_metric_ids: Vec::new(),
564 families: BTreeMap::new(),
565 keys: HashMap::new(),
566 next_metric_id: 0,
567 }
568 }
569
570 fn register<M>(
571 &mut self,
572 registry: Weak<Mutex<Self>>,
573 name: String,
574 help: String,
575 attributes: Vec<(String, String)>,
576 metric: Arc<M>,
577 ) -> Registered<M>
578 where
579 M: Metric,
580 {
581 let attributes = owned_attributes(attributes);
582 let help = normalize_help(help);
583 let metric_type = metric.metric_type();
584 let encode_samples =
585 create_sample_encoder(name.clone(), attributes.clone(), metric.clone());
586 let key = (name.clone(), attributes.clone());
587 if let Some(existing_id) = self.keys.get(&key).copied() {
588 let entry = self.metric_ref(existing_id);
589 if let Some(family) = self.families.get(&name) {
590 assert_eq!(
591 family.help, help,
592 "metric family `{}` registered with inconsistent help text",
593 name
594 );
595 }
596 let existing_metric = Arc::clone(&entry.metric_any)
597 .downcast::<M>()
598 .unwrap_or_else(|_| {
599 panic!(
600 "duplicate metric `{}` with attributes {:?} registered with different type",
601 key.0, key.1
602 )
603 });
604 self.claim_registration(existing_id);
605 return Registered {
606 metric: existing_metric,
607 registration: Registration::from(RegistryGuard {
608 id: existing_id,
609 registry,
610 }),
611 };
612 }
613 self.assert_family_matches(&name, &help, metric_type);
614
615 let id = self.allocate_metric_id();
616 let registration = Registration::from(RegistryGuard { id, registry });
617 let metric_any: Arc<dyn Any + Send + Sync> = metric.clone();
618 self.insert_metric_entry(
619 id,
620 help,
621 metric_type,
622 PendingMetricEntry {
623 family_name: name,
624 attributes,
625 encode_samples,
626 metric_any,
627 },
628 );
629 Registered {
630 metric,
631 registration,
632 }
633 }
634
635 fn metric_slot_mut(&mut self, id: usize) -> &mut Option<MetricEntry> {
636 if id == self.metrics.len() {
637 self.metrics.push(None);
638 }
639 &mut self.metrics[id]
640 }
641
642 fn metric_ref(&self, id: usize) -> &MetricEntry {
643 self.metrics
644 .get(id)
645 .and_then(Option::as_ref)
646 .expect("metric id missing from registry")
647 }
648
649 fn metric_mut(&mut self, id: usize) -> &mut MetricEntry {
650 self.metrics
651 .get_mut(id)
652 .and_then(Option::as_mut)
653 .expect("metric id missing from registry")
654 }
655
656 fn allocate_metric_id(&mut self) -> usize {
657 if let Some(id) = self.free_metric_ids.pop() {
658 return id;
659 }
660 let id = self.next_metric_id;
661 self.next_metric_id = self
662 .next_metric_id
663 .checked_add(1)
664 .expect("metric id overflow");
665 id
666 }
667
668 fn assert_family_matches(&self, name: &str, help: &str, metric_type: MetricType) {
669 if let Some(family) = self.families.get(name) {
670 assert_eq!(
671 family.help, help,
672 "metric family `{}` registered with inconsistent help text",
673 name
674 );
675 assert_eq!(
676 family.metric_type.as_str(),
677 metric_type.as_str(),
678 "metric family `{}` registered with inconsistent metric type",
679 name
680 );
681 }
682 }
683
684 fn insert_metric_entry(
685 &mut self,
686 id: usize,
687 help: String,
688 metric_type: MetricType,
689 entry: PendingMetricEntry,
690 ) {
691 let PendingMetricEntry {
692 family_name,
693 attributes,
694 encode_samples,
695 metric_any,
696 } = entry;
697 self.keys
698 .insert((family_name.clone(), attributes.clone()), id);
699 let family = match self.families.entry(family_name.clone()) {
700 std::collections::btree_map::Entry::Occupied(entry) => entry.into_mut(),
701 std::collections::btree_map::Entry::Vacant(entry) => {
702 let mut descriptor = String::new();
703 encode_descriptor(&mut descriptor, &family_name, &help, metric_type)
704 .expect("encoding cached descriptor failed");
705 entry.insert(MetricFamily {
706 help,
707 metric_type,
708 descriptor,
709 metric_ids: Vec::new(),
710 })
711 }
712 };
713 let family_index = family.metric_ids.len();
714 family.metric_ids.push(id);
715 self.metric_slot_mut(id).replace(MetricEntry {
716 family_name,
717 attributes,
718 encode_samples,
719 metric_any,
720 claims: 1,
721 family_index,
722 });
723 }
724
725 fn claim_registration(&mut self, id: usize) {
726 let entry = self.metric_mut(id);
727 entry.claims = entry
728 .claims
729 .checked_add(1)
730 .expect("registration claims overflow");
731 }
732
733 fn release_registration(&mut self, id: usize) {
734 let entry = self.metric_mut(id);
735 entry.claims = entry
736 .claims
737 .checked_sub(1)
738 .expect("registration claim count underflow");
739 if entry.claims > 0 {
740 return;
741 }
742 self.drop_metric_entry(id);
743 }
744
745 fn drop_metric_entry(&mut self, id: usize) {
746 let metric = self
747 .metrics
748 .get_mut(id)
749 .and_then(Option::take)
750 .expect("metric id missing from registry");
751 let MetricEntry {
752 family_name,
753 attributes,
754 family_index,
755 ..
756 } = metric;
757 let key = (family_name, attributes);
758 if self.keys.get(&key).copied() == Some(id) {
759 self.keys.remove(&key);
760 }
761 let (family_name, _) = key;
762 let (swapped_metric_id, remove_family) = {
763 let family = self
764 .families
765 .get_mut(&family_name)
766 .expect("family missing during unregister");
767 let removed = family.metric_ids.swap_remove(family_index);
768 assert_eq!(removed, id, "family index mismatch during unregister");
769 let swapped = family.metric_ids.get(family_index).copied();
770 (swapped, family.metric_ids.is_empty())
771 };
772 if let Some(swapped_metric_id) = swapped_metric_id {
773 self.metric_mut(swapped_metric_id).family_index = family_index;
774 }
775 if remove_family {
776 self.families.remove(&family_name);
777 }
778 self.free_metric_ids.push(id);
779 }
780
781 pub fn encode(&self) -> String {
782 let mut output = String::new();
783 let mut samples = String::new();
784 for family in self.families.values() {
785 samples.clear();
786 for metric_id in &family.metric_ids {
787 let metric = self.metric_ref(*metric_id);
788 (metric.encode_samples)(&mut samples).expect("encoding live metric samples failed");
789 }
790 if samples.is_empty() {
794 continue;
795 }
796 output.push_str(&family.descriptor);
797 output.push_str(&samples);
798 }
799
800 encode_eof(&mut output).expect("encoding EOF failed");
801 output
802 }
803}
804
805pub(crate) struct Scope {
806 registry: Registry,
807 prefix: String,
808}
809
810pub(crate) trait Register {
811 fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M>;
813
814 fn sub_registry(&mut self, prefix: &str) -> Scope;
816}
817
818impl Register for Registry {
819 fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M> {
820 validate_label(name);
821 Self::register(
822 self,
823 name.to_string(),
824 help.to_string(),
825 Vec::new(),
826 Arc::new(metric),
827 )
828 }
829
830 fn sub_registry(&mut self, prefix: &str) -> Scope {
831 validate_label(prefix);
832 Scope {
833 registry: self.clone(),
834 prefix: prefix.to_string(),
835 }
836 }
837}
838
839impl Register for Scope {
840 fn register<M: Metric>(&mut self, name: &str, help: &str, metric: M) -> Registered<M> {
841 validate_label(name);
842 let name = prefixed_name(&self.prefix, name);
843 let help = help.to_string();
844 let metric = Arc::new(metric);
845 Registry::register(&self.registry, name, help, Vec::new(), metric)
846 }
847
848 fn sub_registry(&mut self, prefix: &str) -> Scope {
849 validate_label(prefix);
850 Self {
851 registry: self.registry.clone(),
852 prefix: prefixed_name(&self.prefix, prefix),
853 }
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use super::*;
860 use crate::{deterministic, Metrics as _, Runner, Spawner, Supervisor as _};
861 use commonware_macros::test_traced;
862 use futures::future;
863 use std::sync::mpsc::{self, TryRecvError};
864
865 #[test]
866 fn test_has_metric_value_unlabeled() {
867 let metrics = "# HELP storage_items_tracked items\nstorage_items_tracked 2\n";
868 assert!(has_metric_value(metrics, "items_tracked", 2));
869 assert!(has_metric_value(metrics, "storage_items_tracked", 2));
870 assert!(!has_metric_value(metrics, "items_tracked_extra", 2));
871 assert!(!has_metric_value(metrics, "items_tracked", 3));
872 }
873
874 #[test]
875 fn test_has_metric_value_labeled() {
876 let metrics = r#"storage_init_items_tracked{index="2"} 2"#;
877 assert!(has_metric_value(metrics, "items_tracked", 2));
878 assert!(has_metric_value(metrics, "storage_init_items_tracked", 2));
879 }
880
881 #[test_traced]
882 fn test_count_running_tasks() {
883 let executor = deterministic::Runner::default();
884 executor.start(|context| async move {
885 assert_eq!(
887 count_running_tasks(&context, "worker"),
888 0,
889 "no worker tasks initially"
890 );
891
892 let handle1 = context.child("worker").spawn(|_| async move {
894 future::pending::<()>().await;
895 });
896
897 let count = count_running_tasks(&context, "worker");
899 assert_eq!(count, 1, "worker task should be running");
900
901 assert_eq!(
903 count_running_tasks(&context, "other"),
904 0,
905 "no tasks with 'other' prefix"
906 );
907
908 let handle2 = context
910 .child("worker")
911 .child("child")
912 .spawn(|_| async move {
913 future::pending::<()>().await;
914 });
915
916 let count = count_running_tasks(&context, "worker");
918 assert_eq!(count, 2, "both worker and worker_child should be counted");
919
920 handle1.abort();
922 let _ = handle1.await;
923
924 let count = count_running_tasks(&context, "worker");
926 assert_eq!(count, 1, "only worker_child should remain");
927
928 handle2.abort();
930 let _ = handle2.await;
931
932 assert_eq!(
934 count_running_tasks(&context, "worker"),
935 0,
936 "all worker tasks should be stopped"
937 );
938 });
939 }
940
941 #[test_traced]
942 fn test_no_duplicate_metrics() {
943 let executor = deterministic::Runner::default();
944 executor.start(|context| async move {
945 let c1 = raw::Counter::<u64>::default();
947 let _metric_a = context.child("a").register("test", "help", c1);
948 let c2 = raw::Counter::<u64>::default();
949 let _metric_b = context.child("b").register("test", "help", c2);
950 });
951 }
953
954 #[test_traced]
955 fn test_duplicate_metrics_reuse_existing_handle() {
956 let executor = deterministic::Runner::default();
957 executor.start(|context| async move {
958 let c1 = raw::Counter::<u64>::default();
959 let metric_a = context.child("a").register("test", "help", c1);
960 let c2 = raw::Counter::<u64>::default();
961 let metric_b = context.child("a").register("test", "help", c2);
962
963 assert!(std::ptr::eq(metric_a.metric(), metric_b.metric()));
964
965 metric_a.inc();
966 metric_b.inc_by(2);
967 let encoded = context.encode();
968 assert!(encoded.contains("a_test_total 3"));
969 });
970 }
971
972 #[test]
973 fn test_claims_track_register_calls_not_handle_clones() {
974 let registry = Registry::new();
975 let key: MetricKey = ("votes".to_string(), Vec::new());
976
977 let first = registry.register(
978 key.0.clone(),
979 "vote count".to_string(),
980 Vec::new(),
981 Arc::new(raw::Counter::<u64>::default()),
982 );
983 let first_clone = first.clone();
984 let id = {
985 let registry = registry.inner.lock();
986 let id = *registry.keys.get(&key).expect("metric key missing");
987 assert_eq!(registry.metric_ref(id).claims, 1);
988 id
989 };
990
991 let second = registry.register(
992 key.0,
993 "vote count".to_string(),
994 Vec::new(),
995 Arc::new(raw::Counter::<u64>::default()),
996 );
997 let second_clone = second.clone();
998 {
999 let registry = registry.inner.lock();
1000 assert_eq!(registry.metric_ref(id).claims, 2);
1001 }
1002
1003 drop(first);
1004 drop(second);
1005 {
1006 let registry = registry.inner.lock();
1007 assert_eq!(registry.metric_ref(id).claims, 2);
1008 }
1009
1010 drop(second_clone);
1011 {
1012 let registry = registry.inner.lock();
1013 assert_eq!(registry.metric_ref(id).claims, 1);
1014 }
1015
1016 drop(first_clone);
1017 let registry = registry.inner.lock();
1018 assert!(
1019 registry.keys.is_empty(),
1020 "keys left behind: {:?}",
1021 registry.keys
1022 );
1023 assert!(
1024 registry.families.is_empty(),
1025 "families left behind: {:?}",
1026 registry.families
1027 );
1028 }
1029
1030 #[test]
1031 #[should_panic(expected = "registered with different type")]
1032 fn test_duplicate_metrics_different_type_panics() {
1033 let executor = deterministic::Runner::default();
1034 executor.start(|context| async move {
1035 let counter = raw::Counter::<u64>::default();
1036 let _metric_a = context.child("a").register("test", "help", counter);
1037 let gauge = raw::Gauge::<i64>::default();
1038 let _metric_b = context.child("a").register("test", "help", gauge);
1039 });
1040 }
1041
1042 #[test]
1043 fn test_duplicate_register_acquires_during_last_drop_window() {
1044 let registry = Registry::new();
1045 let key: MetricKey = ("votes".to_string(), Vec::new());
1046
1047 let original = registry.register(
1048 key.0.clone(),
1049 "vote count".to_string(),
1050 Vec::new(),
1051 Arc::new(raw::Counter::<u64>::default()),
1052 );
1053 let original_metric = Arc::clone(&original.metric);
1054 let _original = std::mem::ManuallyDrop::new(original);
1055 let original_id = {
1056 let registry = registry.inner.lock();
1057 *registry.keys.get(&key).expect("metric key missing")
1058 };
1059 let duplicate = registry.register(
1063 key.0,
1064 "vote count".to_string(),
1065 Vec::new(),
1066 Arc::new(raw::Counter::<u64>::default()),
1067 );
1068 assert!(Arc::ptr_eq(&original_metric, &duplicate.metric));
1069
1070 registry.inner.lock().release_registration(original_id);
1071
1072 duplicate.inc_by(7);
1073 let encoded = registry.encode();
1074 assert!(
1075 encoded.contains("votes_total 7"),
1076 "last drop removed duplicate registration: {encoded}"
1077 );
1078
1079 drop(duplicate);
1080 let registry = registry.inner.lock();
1081 assert!(
1082 registry.keys.is_empty(),
1083 "keys left behind: {:?}",
1084 registry.keys
1085 );
1086 assert!(
1087 registry.families.is_empty(),
1088 "families left behind: {:?}",
1089 registry.families
1090 );
1091 }
1092
1093 #[test]
1094 fn test_registered_with_registration_notifies_on_last_drop() {
1095 struct NotifyOnDrop(mpsc::Sender<&'static str>);
1096
1097 impl Drop for NotifyOnDrop {
1098 fn drop(&mut self) {
1099 let _ = self.0.send("dropped");
1100 }
1101 }
1102
1103 let (tx, rx) = mpsc::channel();
1104 let registered = Registered::with_registration(
1105 raw::Counter::<u64>::default(),
1106 Registration::from(NotifyOnDrop(tx)),
1107 );
1108 let clone = registered.clone();
1109
1110 drop(registered);
1111 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
1112
1113 drop(clone);
1114 assert_eq!(rx.recv().unwrap(), "dropped");
1115 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
1116 }
1117
1118 fn register_counter(registry: &Registry, name: &str, help: &str, value: u64) -> Counter {
1119 let counter = raw::Counter::<u64>::default();
1120 counter.inc_by(value);
1121 registry.register(
1122 name.to_string(),
1123 help.to_string(),
1124 Vec::new(),
1125 Arc::new(counter),
1126 )
1127 }
1128
1129 #[test]
1130 fn test_encode_is_deterministic() {
1131 let registry = Registry::default();
1132 let _beta = register_counter(®istry, "beta", "beta counter", 2);
1133 let _alpha = register_counter(®istry, "alpha", "alpha counter", 1);
1134 let first = registry.encode();
1135 let second = registry.encode();
1136 assert_eq!(first, second);
1137 let alpha = first
1138 .find("# TYPE alpha")
1139 .expect("alpha family header present");
1140 let beta = first
1141 .find("# TYPE beta")
1142 .expect("beta family header present");
1143 assert!(alpha < beta, "families emitted in sorted order: {first}");
1144 }
1145
1146 #[test]
1147 fn test_encode_emits_single_eof() {
1148 let registry = Registry::default();
1149 let _a = register_counter(®istry, "a", "help", 1);
1150 let _b = register_counter(®istry, "b", "help", 2);
1151 let encoded = registry.encode();
1152 assert_eq!(encoded.matches("# EOF").count(), 1);
1153 assert!(
1154 encoded.ends_with("# EOF\n"),
1155 "must terminate with EOF: {encoded}"
1156 );
1157 }
1158
1159 #[test]
1160 fn test_encode_type_aware_suffixes() {
1161 let registry = Registry::default();
1162 let _requests = register_counter(®istry, "requests", "request count", 3);
1163 let histogram = raw::Histogram::new([0.1, 1.0, 10.0]);
1164 histogram.observe(0.5);
1165 let _histogram = registry.register(
1166 "latency".to_string(),
1167 "latency seconds".to_string(),
1168 Vec::new(),
1169 Arc::new(histogram),
1170 );
1171 let encoded = registry.encode();
1172 assert!(
1173 encoded.contains("requests_total 3"),
1174 "counter _total suffix: {encoded}"
1175 );
1176 assert!(
1177 encoded.contains("latency_bucket"),
1178 "histogram _bucket suffix: {encoded}"
1179 );
1180 assert!(
1181 encoded.contains("latency_sum"),
1182 "histogram _sum suffix: {encoded}"
1183 );
1184 assert!(
1185 encoded.contains("latency_count"),
1186 "histogram _count suffix: {encoded}"
1187 );
1188 }
1189
1190 #[test]
1191 fn test_encode_shares_family_header_across_attributes() {
1192 let registry = Registry::default();
1193 let c1 = raw::Counter::<u64>::default();
1194 c1.inc();
1195 let _c1 = registry.register(
1196 "votes".to_string(),
1197 "vote count".to_string(),
1198 vec![("epoch".to_string(), "1".to_string())],
1199 Arc::new(c1),
1200 );
1201 let c2 = raw::Counter::<u64>::default();
1202 c2.inc_by(2);
1203 let _c2 = registry.register(
1204 "votes".to_string(),
1205 "vote count".to_string(),
1206 vec![("epoch".to_string(), "2".to_string())],
1207 Arc::new(c2),
1208 );
1209 let encoded = registry.encode();
1210 assert_eq!(
1211 encoded.matches("# HELP votes").count(),
1212 1,
1213 "single HELP: {encoded}"
1214 );
1215 assert_eq!(
1216 encoded.matches("# TYPE votes").count(),
1217 1,
1218 "single TYPE: {encoded}"
1219 );
1220 assert!(encoded.contains("votes_total{epoch=\"1\"} 1"));
1221 assert!(encoded.contains("votes_total{epoch=\"2\"} 2"));
1222 }
1223
1224 #[test]
1225 fn test_encode_registers_without_prefix() {
1226 let registry = Registry::default();
1227 let _registered = register_counter(®istry, "votes", "vote count", 1);
1228 let encoded = registry.encode();
1229 assert!(
1230 encoded.contains("votes_total 1"),
1231 "no prefix applied: {encoded}"
1232 );
1233 assert!(
1234 encoded.starts_with("# HELP votes"),
1235 "family header at start: {encoded}"
1236 );
1237 }
1238
1239 #[test]
1240 fn test_encode_suppresses_empty_family() {
1241 let registry = Registry::default();
1245 let empty_family = raw::Family::<Vec<(String, String)>, raw::Counter>::default();
1246 let _empty_family = registry.register(
1247 "votes".to_string(),
1248 "vote count".to_string(),
1249 Vec::new(),
1250 Arc::new(empty_family),
1251 );
1252 let _ticks = register_counter(®istry, "ticks", "tick count", 1);
1253 let encoded = registry.encode();
1254 assert!(!encoded.contains("votes"), "empty family leaked: {encoded}");
1255 assert!(
1256 encoded.contains("ticks_total 1"),
1257 "populated metric missing: {encoded}"
1258 );
1259 assert_eq!(encoded.matches("# EOF").count(), 1);
1260 }
1261
1262 #[test]
1263 fn test_encode_matches_upstream_registry() {
1264 let counter = raw::Counter::<u64>::default();
1273 counter.inc_by(7);
1274 let gauge = raw::Gauge::<i64>::default();
1275 gauge.set(-3);
1276 let histogram = raw::Histogram::new([0.1, 1.0]);
1277 histogram.observe(0.5);
1278
1279 let ours = Registry::default();
1280 let _latency = ours.register(
1281 "latency".to_string(),
1282 "request latency seconds".to_string(),
1283 Vec::new(),
1284 Arc::new(histogram.clone()),
1285 );
1286 let _level = ours.register(
1287 "level".to_string(),
1288 "current level".to_string(),
1289 Vec::new(),
1290 Arc::new(gauge.clone()),
1291 );
1292 let _votes = ours.register(
1293 "votes".to_string(),
1294 "number of votes".to_string(),
1295 Vec::new(),
1296 Arc::new(counter.clone()),
1297 );
1298 let ours_encoded = ours.encode();
1299
1300 let mut theirs = registry::Registry::default();
1301 theirs.register("latency", "request latency seconds", histogram);
1302 theirs.register("level", "current level", gauge);
1303 theirs.register("votes", "number of votes", counter);
1304 let mut theirs_encoded = String::new();
1305 encode(&mut theirs_encoded, &theirs).expect("upstream encode failed");
1306
1307 assert_eq!(
1308 ours_encoded, theirs_encoded,
1309 "output diverged from upstream prometheus-client registry"
1310 );
1311 }
1312
1313 #[test]
1314 fn test_shuffled_duplicate_drops_do_not_leave_registry_entries() {
1315 let registry = Registry::new();
1316 let mut handles = Vec::new();
1317
1318 for _ in 0..8 {
1319 handles.push(registry.register(
1320 "votes".to_string(),
1321 "vote count".to_string(),
1322 Vec::new(),
1323 Arc::new(raw::Counter::<u64>::default()),
1324 ));
1325 }
1326
1327 for index in [3, 0, 6, 1] {
1328 let _ = handles.swap_remove(index);
1329 handles.push(registry.register(
1330 "votes".to_string(),
1331 "vote count".to_string(),
1332 Vec::new(),
1333 Arc::new(raw::Counter::<u64>::default()),
1334 ));
1335 }
1336
1337 drop(handles);
1338 let registry = registry.inner.lock();
1339 assert!(
1340 registry.keys.is_empty(),
1341 "keys left behind: {:?}",
1342 registry.keys
1343 );
1344 assert!(
1345 registry.families.is_empty(),
1346 "families left behind: {:?}",
1347 registry.families
1348 );
1349 }
1350}