Skip to main content

iroh_metrics/
encoding.rs

1//! Functions to encode metrics into the [OpenMetrics text format].
2//!
3//! [OpenMetrics text format]: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md
4
5use std::{
6    borrow::Cow,
7    fmt::{self, Write},
8    sync::{Arc, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12
13use crate::{MetricItem, MetricType, MetricValue, MetricsGroup, MetricsSource, RwLockRegistry};
14
15pub(crate) fn write_eof(writer: &mut impl Write) -> fmt::Result {
16    writer.write_str("# EOF\n")
17}
18
19/// Helper function to encode histogram data in OpenMetrics format.
20fn encode_histogram_data<'a>(
21    writer: &mut impl Write,
22    name: &str,
23    prefixes: &[impl AsRef<str>],
24    labels: &[(&'a str, &'a str)],
25    histogram_data: &HistogramData,
26) -> fmt::Result {
27    // Write buckets
28    for (upper_bound, count) in &histogram_data.buckets {
29        write_prefix_name(writer, prefixes, name)?;
30        writer.write_str("_bucket")?;
31        writer.write_char('{')?;
32        for (i, (key, value)) in labels.iter().enumerate() {
33            if i > 0 {
34                writer.write_char(',')?;
35            }
36            writer.write_str(key)?;
37            writer.write_str("=\"")?;
38            writer.write_str(value)?;
39            writer.write_char('"')?;
40        }
41        if !labels.is_empty() {
42            writer.write_char(',')?;
43        }
44        writer.write_str("le=\"")?;
45        if *upper_bound == f64::INFINITY {
46            writer.write_str("+Inf")?;
47        } else {
48            writer.write_str(ryu::Buffer::new().format(*upper_bound))?;
49        }
50        writer.write_str("\"} ")?;
51        encode_u64(writer, *count)?;
52        writer.write_str("\n")?;
53    }
54
55    // Write sum
56    write_prefix_name(writer, prefixes, name)?;
57    writer.write_str("_sum")?;
58    if !labels.is_empty() {
59        write_labels(writer, labels.iter().copied())?;
60    }
61    writer.write_char(' ')?;
62    encode_f64(writer, histogram_data.sum)?;
63    writer.write_str("\n")?;
64
65    // Write count
66    write_prefix_name(writer, prefixes, name)?;
67    writer.write_str("_count")?;
68    if !labels.is_empty() {
69        write_labels(writer, labels.iter().copied())?;
70    }
71    writer.write_char(' ')?;
72    encode_u64(writer, histogram_data.count)?;
73    writer.write_str("\n")?;
74
75    Ok(())
76}
77
78/// Writes `# EOF\n` to `writer`.
79///
80/// This is the expected last characters of an OpenMetrics string.
81pub fn encode_openmetrics_eof(writer: &mut impl Write) -> fmt::Result {
82    write_eof(writer)
83}
84
85/// Schema information for a single metric item.
86///
87/// Contains metadata about a metric including its type, name, help text,
88/// prefixes, and labels.
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ItemSchema {
91    /// The type of the metric (Counter, Gauge, etc.)
92    pub r#type: MetricType,
93    /// The name of the metric
94    pub name: String,
95    /// Prefixes to prepend to the metric name
96    pub prefixes: Vec<String>,
97    /// Labels associated with the metric as key-value pairs
98    pub labels: Vec<(String, String)>,
99}
100
101impl ItemSchema {
102    /// Returns the name prefixed with all prefixes.
103    pub fn prefixed_name(&self) -> String {
104        let mut out = String::new();
105        for prefix in &self.prefixes {
106            out.push_str(prefix);
107            out.push('_');
108        }
109        out.push_str(&self.name);
110        out
111    }
112}
113
114/// A collection of metric schemas.
115///
116/// Contains all the schema information for a set of metrics.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct Schema {
119    /// The individual metric schemas
120    pub items: Vec<ItemSchema>,
121    /// Help texts (may be omitted)
122    pub help: Option<Vec<String>>,
123}
124
125impl Schema {
126    /// Creates a new [`Schema`] that does not track help text.
127    pub fn new_without_help() -> Self {
128        Self {
129            items: Default::default(),
130            help: None,
131        }
132    }
133}
134
135impl Default for Schema {
136    fn default() -> Self {
137        Self {
138            items: Vec::new(),
139            help: Some(Vec::new()),
140        }
141    }
142}
143
144/// Histogram data wrapper
145///
146/// Contains the bucket counts, sum, and total count for a histogram metric.
147#[derive(Debug, Serialize, Clone, Deserialize)]
148pub struct HistogramData {
149    /// Bucket upper bounds and cumulative counts
150    pub buckets: Vec<(f64, u64)>,
151    /// Sum of all observed values
152    pub sum: f64,
153    /// Total count of observations
154    pub count: u64,
155}
156
157/// A collection of metric values.
158///
159/// Contains the actual values for a set of metrics.
160#[derive(Debug, Serialize, Clone, Deserialize, Default)]
161pub struct Values {
162    /// The individual metric values
163    pub items: Vec<MetricValue>,
164}
165
166/// An update containing schema and/or values for metrics.
167///
168/// Used to transfer metric information between encoders and decoders.
169/// The schema is optional and only included when it has changed.
170#[derive(Debug, Serialize, Clone, Deserialize, Default)]
171pub struct Update {
172    /// Optional schema information (included when schema changes)
173    pub schema: Option<Schema>,
174    /// The metric values
175    pub values: Values,
176}
177
178/// A metric item combining schema and value information.
179///
180/// Provides a unified view of a metric's metadata and current value.
181#[derive(Debug)]
182pub struct Item<'a> {
183    /// Reference to the metric's schema information
184    pub schema: &'a ItemSchema,
185    /// Reference to the metric's current value
186    pub value: &'a MetricValue,
187    /// Help text, if available
188    pub help: Option<&'a String>,
189}
190
191impl EncodableMetric for Item<'_> {
192    fn name(&self) -> &str {
193        &self.schema.name
194    }
195
196    fn help(&self) -> &str {
197        self.help.map(|x| x.as_str()).unwrap_or_default()
198    }
199
200    fn r#type(&self) -> MetricType {
201        self.schema.r#type
202    }
203
204    fn value(&self) -> MetricValue {
205        self.value.clone()
206    }
207}
208
209impl Item<'_> {
210    /// Encodes this metric item to OpenMetrics format.
211    ///
212    /// Writes the metric in OpenMetrics text format to the provided writer.
213    pub fn encode_openmetrics(
214        &self,
215        writer: &mut impl std::fmt::Write,
216    ) -> Result<(), crate::Error> {
217        EncodableMetric::encode_openmetrics(
218            self,
219            writer,
220            self.schema.prefixes.as_slice(),
221            self.schema
222                .labels
223                .iter()
224                .map(|(a, b)| (a.as_str(), b.as_str())),
225        )?;
226        Ok(())
227    }
228}
229
230/// Decoder for metrics received from an [`Encoder`]
231///
232/// Implements [`MetricsSource`] to export the decoded metrics to OpenMetrics.
233#[derive(Debug, Clone, Default)]
234pub struct Decoder {
235    schema: Option<Schema>,
236    values: Values,
237}
238
239impl Decoder {
240    /// Imports a metric update.
241    ///
242    /// Updates the decoder's schema (if provided) and values with the given update.
243    pub fn import(&mut self, update: Update) {
244        if let Some(schema) = update.schema {
245            self.schema = Some(schema);
246        }
247        self.values = update.values;
248    }
249
250    /// Imports a metric update from serialized bytes.
251    ///
252    /// Deserializes the bytes using postcard and imports the resulting update.
253    pub fn import_bytes(&mut self, data: &[u8]) -> Result<(), postcard::Error> {
254        let update = postcard::from_bytes(data)?;
255        self.import(update);
256        Ok(())
257    }
258
259    /// Creates an iterator over the decoded metric items.
260    ///
261    /// Returns an iterator that yields [`Item`] instances combining schema and value data.
262    pub fn iter(&self) -> DecoderIter<'_> {
263        DecoderIter {
264            pos: 0,
265            inner: self,
266        }
267    }
268}
269
270/// Iterator over decoded metric items.
271///
272/// Iterates through the metrics in a [`Decoder`], yielding [`Item`] instances.
273#[derive(Debug)]
274pub struct DecoderIter<'a> {
275    /// Current position in the iteration
276    pos: usize,
277    /// Reference to the decoder being iterated
278    inner: &'a Decoder,
279}
280
281impl<'a> Iterator for DecoderIter<'a> {
282    type Item = Item<'a>;
283
284    fn next(&mut self) -> Option<Self::Item> {
285        let schema = self.inner.schema.as_ref()?.items.get(self.pos)?;
286        let value = self.inner.values.items.get(self.pos)?;
287        let help = self
288            .inner
289            .schema
290            .as_ref()?
291            .help
292            .as_ref()
293            .and_then(|help| help.get(self.pos));
294        self.pos += 1;
295        Some(Item {
296            schema,
297            value,
298            help,
299        })
300    }
301}
302
303impl MetricsSource for Decoder {
304    fn encode_openmetrics(&self, writer: &mut impl std::fmt::Write) -> Result<(), crate::Error> {
305        for item in self.iter() {
306            item.encode_openmetrics(writer)?;
307        }
308        write_eof(writer)?;
309        Ok(())
310    }
311}
312
313impl MetricsSource for Arc<RwLock<Decoder>> {
314    fn encode_openmetrics(&self, writer: &mut impl std::fmt::Write) -> Result<(), crate::Error> {
315        self.read().expect("poisoned").encode_openmetrics(writer)
316    }
317}
318
319/// Encoder for converting metrics from a registry into serializable updates.
320///
321/// Tracks schema changes and generates [`Update`] objects that can be
322/// transmitted to a [`Decoder`].
323#[derive(Debug)]
324pub struct Encoder {
325    /// The metrics registry to encode from
326    registry: RwLockRegistry,
327    /// Version of the last schema that was exported
328    last_schema_version: u64,
329    opts: EncoderOpts,
330}
331
332/// Options for an [`Encoder`]
333#[derive(Debug)]
334#[non_exhaustive]
335pub struct EncoderOpts {
336    /// Whether to include the metric help text in the transmitted schema.
337    pub include_help: bool,
338}
339
340impl Default for EncoderOpts {
341    fn default() -> Self {
342        Self { include_help: true }
343    }
344}
345
346impl Encoder {
347    /// Creates a new encoder for the given registry.
348    ///
349    /// The encoder will track schema changes and only include schema
350    /// information in updates when it has changed.
351    pub fn new(registry: RwLockRegistry) -> Self {
352        Self::new_with_opts(registry, Default::default())
353    }
354
355    /// Creates a new encoder for the given registry with custom options.
356    pub fn new_with_opts(registry: RwLockRegistry, opts: EncoderOpts) -> Self {
357        Self {
358            registry,
359            last_schema_version: 0,
360            opts,
361        }
362    }
363
364    /// Exports the current state of the registry as an update.
365    ///
366    /// Returns an [`Update`] containing the current metric values and
367    /// optionally the schema (if it has changed since the last export).
368    pub fn export(&mut self) -> Update {
369        let registry = self.registry.read().expect("poisoned");
370        let current = registry.schema_version();
371        let schema = if current != self.last_schema_version {
372            self.last_schema_version = current;
373            let mut schema = if self.opts.include_help {
374                Schema::default()
375            } else {
376                Schema::new_without_help()
377            };
378            registry.encode_schema(&mut schema);
379            Some(schema)
380        } else {
381            None
382        };
383        let mut values = Values::default();
384        registry.encode_values(&mut values);
385        Update { schema, values }
386    }
387
388    /// Exports the current state of the registry as serialized bytes.
389    ///
390    /// Returns the serialized bytes of an [`Update`] using postcard encoding.
391    pub fn export_bytes(&mut self) -> Result<Vec<u8>, postcard::Error> {
392        postcard::to_stdvec(&self.export())
393    }
394}
395
396impl dyn MetricsGroup {
397    pub(crate) fn encode_schema<'a>(
398        &self,
399        schema: &mut Schema,
400        prefix: Option<&'a str>,
401        labels: &[(Cow<'a, str>, Cow<'a, str>)],
402    ) {
403        let name = self.name();
404        let prefixes = if let Some(prefix) = prefix {
405            &[prefix, name][..]
406        } else {
407            &[name]
408        };
409        for metric in self.iter() {
410            let labels = labels.iter().map(|(k, v)| (k.as_ref(), v.as_ref()));
411            metric.encode_schema(schema, prefixes, labels);
412        }
413    }
414
415    pub(crate) fn encode_values(&self, values: &mut Values) {
416        for metric in self.iter() {
417            metric.encode_value(values);
418        }
419    }
420
421    pub(crate) fn encode_openmetrics<'a>(
422        &self,
423        writer: &'a mut impl Write,
424        prefix: Option<&'a str>,
425        labels: &[(Cow<'a, str>, Cow<'a, str>)],
426    ) -> fmt::Result {
427        let name = self.name();
428        let prefixes = if let Some(prefix) = prefix {
429            &[prefix, name] as &[&str]
430        } else {
431            &[name]
432        };
433        for metric in self.iter() {
434            let labels = labels.iter().map(|(k, v)| (k.as_ref(), v.as_ref()));
435            metric.encode_openmetrics(writer, prefixes, labels)?;
436        }
437        Ok(())
438    }
439}
440
441/// Trait for types that can provide metric encoding information.
442pub(crate) trait EncodableMetric {
443    /// Returns the name of this metric item.
444    fn name(&self) -> &str;
445
446    /// Returns the help of this metric item.
447    fn help(&self) -> &str;
448
449    /// Returns the [`MetricType`] for this item.
450    fn r#type(&self) -> MetricType;
451
452    /// Returns the current value of this item.
453    fn value(&self) -> MetricValue;
454
455    /// Encode the metrics item in the OpenMetrics text format.
456    fn encode_openmetrics<'a>(
457        &self,
458        writer: &mut impl Write,
459        prefixes: &[impl AsRef<str>],
460        labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
461    ) -> fmt::Result {
462        writer.write_str("# HELP ")?;
463        write_prefix_name(writer, prefixes, self.name())?;
464        writer.write_str(" ")?;
465        writer.write_str(self.help())?;
466        writer.write_str(".\n")?;
467
468        writer.write_str("# TYPE ")?;
469        write_prefix_name(writer, prefixes, self.name())?;
470        writer.write_str(" ")?;
471        writer.write_str(self.r#type().as_str())?;
472        writer.write_str("\n")?;
473
474        match self.value() {
475            MetricValue::Histogram {
476                buckets,
477                sum,
478                count,
479            } => {
480                let labels_vec: Vec<_> = labels.collect();
481                let histogram_data = HistogramData {
482                    buckets,
483                    sum,
484                    count,
485                };
486                encode_histogram_data(writer, self.name(), prefixes, &labels_vec, &histogram_data)?;
487            }
488            MetricValue::Counter(value) => {
489                write_prefix_name(writer, prefixes, self.name())?;
490                writer.write_str("_total")?;
491                write_labels(writer, labels)?;
492                writer.write_char(' ')?;
493                encode_u64(writer, value)?;
494                writer.write_str("\n")?;
495            }
496            MetricValue::Gauge(value) => {
497                write_prefix_name(writer, prefixes, self.name())?;
498                write_labels(writer, labels)?;
499                writer.write_char(' ')?;
500                encode_i64(writer, value)?;
501                writer.write_str("\n")?;
502            }
503        }
504        Ok(())
505    }
506}
507
508impl MetricItem<'_> {
509    pub(crate) fn encode_schema<'a>(
510        &self,
511        schema: &mut Schema,
512        prefixes: &[&str],
513        labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
514    ) {
515        let item = crate::encoding::ItemSchema {
516            name: self.name().to_string(),
517            prefixes: prefixes.iter().map(|s| s.to_string()).collect(),
518            labels: labels
519                .map(|(k, v)| (k.to_string(), v.to_string()))
520                .collect(),
521            r#type: self.r#type(),
522        };
523        schema.items.push(item);
524        if let Some(help) = schema.help.as_mut() {
525            help.push(self.help().to_string());
526        }
527    }
528
529    fn encode_value(&self, values: &mut Values) {
530        values.items.push(self.value());
531    }
532
533    pub(crate) fn encode_openmetrics<'a>(
534        &self,
535        writer: &mut impl Write,
536        prefixes: &[impl AsRef<str>],
537        labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
538    ) -> fmt::Result {
539        EncodableMetric::encode_openmetrics(self, writer, prefixes, labels)
540    }
541}
542
543fn write_labels<'a>(
544    writer: &mut impl Write,
545    labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
546) -> fmt::Result {
547    let mut is_first = true;
548    let mut labels = labels.peekable();
549    while let Some((key, value)) = labels.next() {
550        let is_last = labels.peek().is_none();
551        if is_first {
552            writer.write_char('{')?;
553            is_first = false;
554        }
555        writer.write_str(key)?;
556        writer.write_str("=\"")?;
557        writer.write_str(value)?;
558        writer.write_str("\"")?;
559        if is_last {
560            writer.write_char('}')?;
561        } else {
562            writer.write_char(',')?;
563        }
564    }
565    Ok(())
566}
567
568fn encode_u64(writer: &mut impl Write, v: u64) -> fmt::Result {
569    writer.write_str(itoa::Buffer::new().format(v))?;
570    Ok(())
571}
572
573fn encode_i64(writer: &mut impl Write, v: i64) -> fmt::Result {
574    writer.write_str(itoa::Buffer::new().format(v))?;
575    Ok(())
576}
577
578fn encode_f64(writer: &mut impl Write, v: f64) -> fmt::Result {
579    writer.write_str(ryu::Buffer::new().format(v))?;
580    Ok(())
581}
582
583fn write_prefix_name(
584    writer: &mut impl Write,
585    prefixes: &[impl AsRef<str>],
586    name: &str,
587) -> fmt::Result {
588    for prefix in prefixes {
589        writer.write_str(prefix.as_ref())?;
590        writer.write_str("_")?;
591    }
592    writer.write_str(name)?;
593    Ok(())
594}