use std::io::Write;
use crate::payload::{Error, Serialize};
use opentelemetry_proto::tonic::metrics::v1::{self};
use prost::Message;
use rand::{distributions::Standard, prelude::Distribution, seq::SliceRandom, Rng};
use super::{common::AsciiString, Generator};
struct ExportMetricsServiceRequest(Vec<Metric>);
impl ExportMetricsServiceRequest {
fn into_prost_type(
self,
) -> opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest {
opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest {
resource_metrics: vec![v1::ResourceMetrics {
resource: None,
instrumentation_library_metrics: vec![v1::InstrumentationLibraryMetrics {
instrumentation_library: None,
metrics: self.0.into_iter().map(|metric| metric.0).collect(),
schema_url: String::new(),
}],
schema_url: String::new(),
}],
}
}
}
struct Metric(v1::Metric);
struct NumberDataPoint(v1::NumberDataPoint);
struct Gauge(v1::Gauge);
struct Sum(v1::Sum);
impl Distribution<NumberDataPoint> for Standard {
fn sample<R>(&self, rng: &mut R) -> NumberDataPoint
where
R: Rng + ?Sized,
{
let value = match rng.gen_range(0..=1) {
0 => v1::number_data_point::Value::AsDouble(rng.gen()),
1 => v1::number_data_point::Value::AsInt(rng.gen()),
_ => unreachable!(),
};
NumberDataPoint(v1::NumberDataPoint {
attributes: Vec::new(),
start_time_unix_nano: rng.gen(),
time_unix_nano: rng.gen(),
exemplars: Vec::new(),
flags: 0,
value: Some(value),
})
}
}
impl Distribution<Gauge> for Standard {
fn sample<R>(&self, rng: &mut R) -> Gauge
where
R: Rng + ?Sized,
{
let total = rng.gen_range(0..64);
let data_points = Standard
.sample_iter(rng)
.map(|ndp: NumberDataPoint| ndp.0)
.take(total)
.collect();
Gauge(v1::Gauge { data_points })
}
}
impl Distribution<Sum> for Standard {
fn sample<R>(&self, rng: &mut R) -> Sum
where
R: Rng + ?Sized,
{
let aggregation_temporality = *[1, 2].choose(rng).unwrap();
let is_monotonic = rng.gen();
let total = rng.gen_range(0..64);
let data_points = Standard
.sample_iter(rng)
.map(|ndp: NumberDataPoint| ndp.0)
.take(total)
.collect();
Sum(v1::Sum {
data_points,
aggregation_temporality,
is_monotonic,
})
}
}
impl Distribution<Metric> for Standard {
fn sample<R>(&self, rng: &mut R) -> Metric
where
R: Rng + ?Sized,
{
let data = match rng.gen_range(0..=1) {
0 => v1::metric::Data::Gauge(rng.gen::<Gauge>().0),
1 => v1::metric::Data::Sum(rng.gen::<Sum>().0),
_ => unreachable!(),
};
let data = Some(data);
Metric(v1::Metric {
name: AsciiString::default().generate(rng),
description: AsciiString::default().generate(rng),
unit: AsciiString::default().generate(rng),
data,
})
}
}
#[derive(Debug, Default, Clone, Copy)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub(crate) struct OpentelemetryMetrics;
impl Serialize for OpentelemetryMetrics {
fn to_bytes<W, R>(&self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), Error>
where
R: Rng + Sized,
W: Write,
{
let bytes_remaining = max_bytes.checked_sub(5 + super::div_ceil(max_bytes, 0x7F));
let Some(mut bytes_remaining) = bytes_remaining else {
return Ok(());
};
let mut acc = ExportMetricsServiceRequest(Vec::new());
loop {
let member: Metric = rng.gen();
let len = member.0.encoded_len() + 2;
match bytes_remaining.checked_sub(len) {
Some(remainder) => {
acc.0.push(member);
bytes_remaining = remainder;
}
None => break,
}
}
let buf = acc.into_prost_type().encode_to_vec();
writer.write_all(&buf)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::OpentelemetryMetrics;
use crate::payload::Serialize;
use proptest::prelude::*;
use prost::Message;
use rand::{rngs::SmallRng, SeedableRng};
proptest! {
#[test]
fn payload_not_exceed_max_bytes(seed: u64, max_bytes: u16) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryMetrics::default();
let mut bytes = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
assert!(bytes.len() <= max_bytes, "max len: {max_bytes}, actual: {}", bytes.len());
}
}
proptest! {
#[test]
fn payload_is_at_least_half_of_max_bytes(seed: u64, max_bytes in 16u16..u16::MAX) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryMetrics::default();
let mut bytes = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
assert!(!bytes.is_empty());
}
}
proptest! {
#[test]
fn payload_deserializes(seed: u64, max_bytes: u16) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryMetrics::default();
let mut bytes: Vec<u8> = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest::decode(bytes.as_slice()).unwrap();
}
}
}