1use std::{
6 borrow::Cow,
7 fmt::{self, Write},
8 sync::{Arc, RwLock},
9};
10
11use serde::{Deserialize, Serialize};
12
13use crate::{MetricItem, MetricType, MetricValue, MetricsGroup, MetricsSource, RwLockRegistry};
14
15pub(crate) fn write_eof(writer: &mut impl Write) -> fmt::Result {
16 writer.write_str("# EOF\n")
17}
18
19fn encode_histogram_data<'a>(
21 writer: &mut impl Write,
22 name: &str,
23 prefixes: &[impl AsRef<str>],
24 labels: &[(&'a str, &'a str)],
25 histogram_data: &HistogramData,
26) -> fmt::Result {
27 for (upper_bound, count) in &histogram_data.buckets {
29 write_prefix_name(writer, prefixes, name)?;
30 writer.write_str("_bucket")?;
31 writer.write_char('{')?;
32 for (i, (key, value)) in labels.iter().enumerate() {
33 if i > 0 {
34 writer.write_char(',')?;
35 }
36 writer.write_str(key)?;
37 writer.write_str("=\"")?;
38 writer.write_str(value)?;
39 writer.write_char('"')?;
40 }
41 if !labels.is_empty() {
42 writer.write_char(',')?;
43 }
44 writer.write_str("le=\"")?;
45 if *upper_bound == f64::INFINITY {
46 writer.write_str("+Inf")?;
47 } else {
48 writer.write_str(ryu::Buffer::new().format(*upper_bound))?;
49 }
50 writer.write_str("\"} ")?;
51 encode_u64(writer, *count)?;
52 writer.write_str("\n")?;
53 }
54
55 write_prefix_name(writer, prefixes, name)?;
57 writer.write_str("_sum")?;
58 if !labels.is_empty() {
59 write_labels(writer, labels.iter().copied())?;
60 }
61 writer.write_char(' ')?;
62 encode_f64(writer, histogram_data.sum)?;
63 writer.write_str("\n")?;
64
65 write_prefix_name(writer, prefixes, name)?;
67 writer.write_str("_count")?;
68 if !labels.is_empty() {
69 write_labels(writer, labels.iter().copied())?;
70 }
71 writer.write_char(' ')?;
72 encode_u64(writer, histogram_data.count)?;
73 writer.write_str("\n")?;
74
75 Ok(())
76}
77
78pub fn encode_openmetrics_eof(writer: &mut impl Write) -> fmt::Result {
82 write_eof(writer)
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct ItemSchema {
91 pub r#type: MetricType,
93 pub name: String,
95 pub prefixes: Vec<String>,
97 pub labels: Vec<(String, String)>,
99}
100
101impl ItemSchema {
102 pub fn prefixed_name(&self) -> String {
104 let mut out = String::new();
105 for prefix in &self.prefixes {
106 out.push_str(prefix);
107 out.push('_');
108 }
109 out.push_str(&self.name);
110 out
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct Schema {
119 pub items: Vec<ItemSchema>,
121 pub help: Option<Vec<String>>,
123}
124
125impl Schema {
126 pub fn new_without_help() -> Self {
128 Self {
129 items: Default::default(),
130 help: None,
131 }
132 }
133}
134
135impl Default for Schema {
136 fn default() -> Self {
137 Self {
138 items: Vec::new(),
139 help: Some(Vec::new()),
140 }
141 }
142}
143
144#[derive(Debug, Serialize, Clone, Deserialize)]
148pub struct HistogramData {
149 pub buckets: Vec<(f64, u64)>,
151 pub sum: f64,
153 pub count: u64,
155}
156
157#[derive(Debug, Serialize, Clone, Deserialize, Default)]
161pub struct Values {
162 pub items: Vec<MetricValue>,
164}
165
166#[derive(Debug, Serialize, Clone, Deserialize, Default)]
171pub struct Update {
172 pub schema: Option<Schema>,
174 pub values: Values,
176}
177
178#[derive(Debug)]
182pub struct Item<'a> {
183 pub schema: &'a ItemSchema,
185 pub value: &'a MetricValue,
187 pub help: Option<&'a String>,
189}
190
191impl EncodableMetric for Item<'_> {
192 fn name(&self) -> &str {
193 &self.schema.name
194 }
195
196 fn help(&self) -> &str {
197 self.help.map(|x| x.as_str()).unwrap_or_default()
198 }
199
200 fn r#type(&self) -> MetricType {
201 self.schema.r#type
202 }
203
204 fn value(&self) -> MetricValue {
205 self.value.clone()
206 }
207}
208
209impl Item<'_> {
210 pub fn encode_openmetrics(
214 &self,
215 writer: &mut impl std::fmt::Write,
216 ) -> Result<(), crate::Error> {
217 EncodableMetric::encode_openmetrics(
218 self,
219 writer,
220 self.schema.prefixes.as_slice(),
221 self.schema
222 .labels
223 .iter()
224 .map(|(a, b)| (a.as_str(), b.as_str())),
225 )?;
226 Ok(())
227 }
228}
229
230#[derive(Debug, Clone, Default)]
234pub struct Decoder {
235 schema: Option<Schema>,
236 values: Values,
237}
238
239impl Decoder {
240 pub fn import(&mut self, update: Update) {
244 if let Some(schema) = update.schema {
245 self.schema = Some(schema);
246 }
247 self.values = update.values;
248 }
249
250 pub fn import_bytes(&mut self, data: &[u8]) -> Result<(), postcard::Error> {
254 let update = postcard::from_bytes(data)?;
255 self.import(update);
256 Ok(())
257 }
258
259 pub fn iter(&self) -> DecoderIter<'_> {
263 DecoderIter {
264 pos: 0,
265 inner: self,
266 }
267 }
268}
269
270#[derive(Debug)]
274pub struct DecoderIter<'a> {
275 pos: usize,
277 inner: &'a Decoder,
279}
280
281impl<'a> Iterator for DecoderIter<'a> {
282 type Item = Item<'a>;
283
284 fn next(&mut self) -> Option<Self::Item> {
285 let schema = self.inner.schema.as_ref()?.items.get(self.pos)?;
286 let value = self.inner.values.items.get(self.pos)?;
287 let help = self
288 .inner
289 .schema
290 .as_ref()?
291 .help
292 .as_ref()
293 .and_then(|help| help.get(self.pos));
294 self.pos += 1;
295 Some(Item {
296 schema,
297 value,
298 help,
299 })
300 }
301}
302
303impl MetricsSource for Decoder {
304 fn encode_openmetrics(&self, writer: &mut impl std::fmt::Write) -> Result<(), crate::Error> {
305 for item in self.iter() {
306 item.encode_openmetrics(writer)?;
307 }
308 write_eof(writer)?;
309 Ok(())
310 }
311}
312
313impl MetricsSource for Arc<RwLock<Decoder>> {
314 fn encode_openmetrics(&self, writer: &mut impl std::fmt::Write) -> Result<(), crate::Error> {
315 self.read().expect("poisoned").encode_openmetrics(writer)
316 }
317}
318
319#[derive(Debug)]
324pub struct Encoder {
325 registry: RwLockRegistry,
327 last_schema_version: u64,
329 opts: EncoderOpts,
330}
331
332#[derive(Debug)]
334#[non_exhaustive]
335pub struct EncoderOpts {
336 pub include_help: bool,
338}
339
340impl Default for EncoderOpts {
341 fn default() -> Self {
342 Self { include_help: true }
343 }
344}
345
346impl Encoder {
347 pub fn new(registry: RwLockRegistry) -> Self {
352 Self::new_with_opts(registry, Default::default())
353 }
354
355 pub fn new_with_opts(registry: RwLockRegistry, opts: EncoderOpts) -> Self {
357 Self {
358 registry,
359 last_schema_version: 0,
360 opts,
361 }
362 }
363
364 pub fn export(&mut self) -> Update {
369 let registry = self.registry.read().expect("poisoned");
370 let current = registry.schema_version();
371 let schema = if current != self.last_schema_version {
372 self.last_schema_version = current;
373 let mut schema = if self.opts.include_help {
374 Schema::default()
375 } else {
376 Schema::new_without_help()
377 };
378 registry.encode_schema(&mut schema);
379 Some(schema)
380 } else {
381 None
382 };
383 let mut values = Values::default();
384 registry.encode_values(&mut values);
385 Update { schema, values }
386 }
387
388 pub fn export_bytes(&mut self) -> Result<Vec<u8>, postcard::Error> {
392 postcard::to_stdvec(&self.export())
393 }
394}
395
396impl dyn MetricsGroup {
397 pub(crate) fn encode_schema<'a>(
398 &self,
399 schema: &mut Schema,
400 prefix: Option<&'a str>,
401 labels: &[(Cow<'a, str>, Cow<'a, str>)],
402 ) {
403 let name = self.name();
404 let prefixes = if let Some(prefix) = prefix {
405 &[prefix, name][..]
406 } else {
407 &[name]
408 };
409 for metric in self.iter() {
410 let labels = labels.iter().map(|(k, v)| (k.as_ref(), v.as_ref()));
411 metric.encode_schema(schema, prefixes, labels);
412 }
413 }
414
415 pub(crate) fn encode_values(&self, values: &mut Values) {
416 for metric in self.iter() {
417 metric.encode_value(values);
418 }
419 }
420
421 pub(crate) fn encode_openmetrics<'a>(
422 &self,
423 writer: &'a mut impl Write,
424 prefix: Option<&'a str>,
425 labels: &[(Cow<'a, str>, Cow<'a, str>)],
426 ) -> fmt::Result {
427 let name = self.name();
428 let prefixes = if let Some(prefix) = prefix {
429 &[prefix, name] as &[&str]
430 } else {
431 &[name]
432 };
433 for metric in self.iter() {
434 let labels = labels.iter().map(|(k, v)| (k.as_ref(), v.as_ref()));
435 metric.encode_openmetrics(writer, prefixes, labels)?;
436 }
437 Ok(())
438 }
439}
440
441pub(crate) trait EncodableMetric {
443 fn name(&self) -> &str;
445
446 fn help(&self) -> &str;
448
449 fn r#type(&self) -> MetricType;
451
452 fn value(&self) -> MetricValue;
454
455 fn encode_openmetrics<'a>(
457 &self,
458 writer: &mut impl Write,
459 prefixes: &[impl AsRef<str>],
460 labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
461 ) -> fmt::Result {
462 writer.write_str("# HELP ")?;
463 write_prefix_name(writer, prefixes, self.name())?;
464 writer.write_str(" ")?;
465 writer.write_str(self.help())?;
466 writer.write_str(".\n")?;
467
468 writer.write_str("# TYPE ")?;
469 write_prefix_name(writer, prefixes, self.name())?;
470 writer.write_str(" ")?;
471 writer.write_str(self.r#type().as_str())?;
472 writer.write_str("\n")?;
473
474 match self.value() {
475 MetricValue::Histogram {
476 buckets,
477 sum,
478 count,
479 } => {
480 let labels_vec: Vec<_> = labels.collect();
481 let histogram_data = HistogramData {
482 buckets,
483 sum,
484 count,
485 };
486 encode_histogram_data(writer, self.name(), prefixes, &labels_vec, &histogram_data)?;
487 }
488 MetricValue::Counter(value) => {
489 write_prefix_name(writer, prefixes, self.name())?;
490 writer.write_str("_total")?;
491 write_labels(writer, labels)?;
492 writer.write_char(' ')?;
493 encode_u64(writer, value)?;
494 writer.write_str("\n")?;
495 }
496 MetricValue::Gauge(value) => {
497 write_prefix_name(writer, prefixes, self.name())?;
498 write_labels(writer, labels)?;
499 writer.write_char(' ')?;
500 encode_i64(writer, value)?;
501 writer.write_str("\n")?;
502 }
503 }
504 Ok(())
505 }
506}
507
508impl MetricItem<'_> {
509 pub(crate) fn encode_schema<'a>(
510 &self,
511 schema: &mut Schema,
512 prefixes: &[&str],
513 labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
514 ) {
515 let item = crate::encoding::ItemSchema {
516 name: self.name().to_string(),
517 prefixes: prefixes.iter().map(|s| s.to_string()).collect(),
518 labels: labels
519 .map(|(k, v)| (k.to_string(), v.to_string()))
520 .collect(),
521 r#type: self.r#type(),
522 };
523 schema.items.push(item);
524 if let Some(help) = schema.help.as_mut() {
525 help.push(self.help().to_string());
526 }
527 }
528
529 fn encode_value(&self, values: &mut Values) {
530 values.items.push(self.value());
531 }
532
533 pub(crate) fn encode_openmetrics<'a>(
534 &self,
535 writer: &mut impl Write,
536 prefixes: &[impl AsRef<str>],
537 labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
538 ) -> fmt::Result {
539 EncodableMetric::encode_openmetrics(self, writer, prefixes, labels)
540 }
541}
542
543fn write_labels<'a>(
544 writer: &mut impl Write,
545 labels: impl Iterator<Item = (&'a str, &'a str)> + 'a,
546) -> fmt::Result {
547 let mut is_first = true;
548 let mut labels = labels.peekable();
549 while let Some((key, value)) = labels.next() {
550 let is_last = labels.peek().is_none();
551 if is_first {
552 writer.write_char('{')?;
553 is_first = false;
554 }
555 writer.write_str(key)?;
556 writer.write_str("=\"")?;
557 writer.write_str(value)?;
558 writer.write_str("\"")?;
559 if is_last {
560 writer.write_char('}')?;
561 } else {
562 writer.write_char(',')?;
563 }
564 }
565 Ok(())
566}
567
568fn encode_u64(writer: &mut impl Write, v: u64) -> fmt::Result {
569 writer.write_str(itoa::Buffer::new().format(v))?;
570 Ok(())
571}
572
573fn encode_i64(writer: &mut impl Write, v: i64) -> fmt::Result {
574 writer.write_str(itoa::Buffer::new().format(v))?;
575 Ok(())
576}
577
578fn encode_f64(writer: &mut impl Write, v: f64) -> fmt::Result {
579 writer.write_str(ryu::Buffer::new().format(v))?;
580 Ok(())
581}
582
583fn write_prefix_name(
584 writer: &mut impl Write,
585 prefixes: &[impl AsRef<str>],
586 name: &str,
587) -> fmt::Result {
588 for prefix in prefixes {
589 writer.write_str(prefix.as_ref())?;
590 writer.write_str("_")?;
591 }
592 writer.write_str(name)?;
593 Ok(())
594}