use crate::payload::{Error, Serialize};
use opentelemetry_proto::tonic::trace::v1;
use prost::Message;
use rand::{distributions::Standard, prelude::Distribution, Rng};
use std::io::Write;
use super::{common::AsciiString, Generator};
struct ExportTraceServiceRequest(Vec<Span>);
impl ExportTraceServiceRequest {
fn into_prost_type(
self,
) -> opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest {
opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest {
resource_spans: [v1::ResourceSpans {
resource: None,
instrumentation_library_spans: [v1::InstrumentationLibrarySpans {
instrumentation_library: None,
spans: self.0.into_iter().map(|span| span.0).collect(),
schema_url: String::new(),
}]
.to_vec(),
schema_url: String::new(),
}]
.to_vec(),
}
}
}
struct Span(v1::Span);
impl Distribution<Span> for Standard {
fn sample<R>(&self, mut rng: &mut R) -> Span
where
R: Rng + ?Sized,
{
let trace_id = Standard.sample_iter(&mut rng).take(16).collect();
let span_id = Standard.sample_iter(&mut rng).take(8).collect();
let start_time_unix_nano: u64 = rng.gen();
let end_time_unix_nano: u64 = rng.gen_range(start_time_unix_nano..=u64::MAX);
Span(v1::Span {
trace_id,
span_id,
trace_state: String::new(),
parent_span_id: vec![0; 8],
name: AsciiString::default().generate(rng),
kind: rng.gen_range(0..=5),
start_time_unix_nano,
end_time_unix_nano,
attributes: Vec::new(),
dropped_attributes_count: 0,
events: Vec::new(),
dropped_events_count: 0,
links: Vec::new(),
dropped_links_count: 0,
status: None,
})
}
}
#[derive(Debug, Default, Clone, Copy)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub(crate) struct OpentelemetryTraces;
impl Serialize for OpentelemetryTraces {
fn to_bytes<W, R>(&self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), Error>
where
R: Rng + Sized,
W: Write,
{
let bytes_remaining = max_bytes.checked_sub(5 + super::div_ceil(max_bytes, 0x7F));
let Some(mut bytes_remaining) = bytes_remaining else {
return Ok(());
};
let mut acc = ExportTraceServiceRequest(Vec::new());
loop {
let member: Span = rng.gen();
let len = member.0.encoded_len() + 2;
match bytes_remaining.checked_sub(len) {
Some(remainder) => {
acc.0.push(member);
bytes_remaining = remainder;
}
None => break,
}
}
let buf = acc.into_prost_type().encode_to_vec();
writer.write_all(&buf)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::OpentelemetryTraces;
use crate::payload::Serialize;
use proptest::prelude::*;
use prost::Message;
use rand::{rngs::SmallRng, SeedableRng};
proptest! {
#[test]
fn payload_not_exceed_max_bytes(seed: u64, max_bytes: u16) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let traces = OpentelemetryTraces::default();
let mut bytes = Vec::with_capacity(max_bytes);
traces.to_bytes(rng, max_bytes, &mut bytes).unwrap();
assert!(bytes.len() <= max_bytes, "max len: {max_bytes}, actual: {}", bytes.len());
}
}
proptest! {
#[test]
fn payload_is_at_least_half_of_max_bytes(seed: u64, max_bytes in 16u16..u16::MAX) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryTraces::default();
let mut bytes = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
assert!(!bytes.is_empty());
}
}
proptest! {
#[test]
fn payload_deserializes(seed: u64, max_bytes: u16) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let traces = OpentelemetryTraces::default();
let mut bytes: Vec<u8> = Vec::with_capacity(max_bytes);
traces.to_bytes(rng, max_bytes, &mut bytes).unwrap();
opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest::decode(bytes.as_slice()).unwrap();
}
}
}