use crate::payload::{Error, Serialize};
use opentelemetry_proto::tonic::{
common::v1::{any_value, AnyValue},
logs::v1,
};
use prost::Message;
use rand::{distributions::Standard, prelude::Distribution, Rng};
use std::io::Write;
use super::{common::AsciiString, Generator};
struct ExportLogsServiceRequest(Vec<LogRecord>);
impl ExportLogsServiceRequest {
fn into_prost_type(
self,
) -> opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest {
opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest {
resource_logs: vec![v1::ResourceLogs {
resource: None,
instrumentation_library_logs: vec![v1::InstrumentationLibraryLogs {
instrumentation_library: None,
log_records: self.0.into_iter().map(|log| log.0).collect(),
schema_url: String::new(),
}],
schema_url: String::new(),
}],
}
}
}
struct LogRecord(v1::LogRecord);
impl Distribution<LogRecord> for Standard {
fn sample<R>(&self, rng: &mut R) -> LogRecord
where
R: Rng + ?Sized,
{
let body = Some(AnyValue {
value: Some(any_value::Value::StringValue(
AsciiString::default().generate(rng),
)),
});
#[allow(deprecated)]
LogRecord(v1::LogRecord {
time_unix_nano: rng.gen(),
observed_time_unix_nano: rng.gen(),
severity_number: rng.gen_range(1..=24),
severity_text: String::new(),
name: String::new(),
body,
attributes: Vec::new(),
dropped_attributes_count: 0,
flags: 0,
trace_id: Vec::new(),
span_id: Vec::new(),
})
}
}
#[derive(Debug, Default, Clone, Copy)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub(crate) struct OpentelemetryLogs;
impl Serialize for OpentelemetryLogs {
fn to_bytes<W, R>(&self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), Error>
where
R: Rng + Sized,
W: Write,
{
let bytes_remaining = max_bytes.checked_sub(5 + super::div_ceil(max_bytes, 0x7F));
let Some(mut bytes_remaining) = bytes_remaining else {
return Ok(());
};
let mut acc = ExportLogsServiceRequest(Vec::new());
loop {
let member: LogRecord = rng.gen();
let len = member.0.encoded_len() + 2;
match bytes_remaining.checked_sub(len) {
Some(remainder) => {
acc.0.push(member);
bytes_remaining = remainder;
}
None => break,
}
}
let buf = acc.into_prost_type().encode_to_vec();
writer.write_all(&buf)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::OpentelemetryLogs;
use crate::payload::Serialize;
use proptest::prelude::*;
use prost::Message;
use rand::{rngs::SmallRng, SeedableRng};
proptest! {
#[test]
fn payload_not_exceed_max_bytes(seed: u64, max_bytes: u16) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryLogs::default();
let mut bytes = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
assert!(bytes.len() <= max_bytes, "max len: {max_bytes}, actual: {}", bytes.len());
}
}
proptest! {
#[test]
fn payload_is_at_least_half_of_max_bytes(seed: u64, max_bytes in 16u16..u16::MAX) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryLogs::default();
let mut bytes = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
assert!(!bytes.is_empty());
}
}
proptest! {
#[test]
fn payload_deserializes(seed: u64, max_bytes: u16) {
let max_bytes = max_bytes as usize;
let rng = SmallRng::seed_from_u64(seed);
let logs = OpentelemetryLogs::default();
let mut bytes: Vec<u8> = Vec::with_capacity(max_bytes);
logs.to_bytes(rng, max_bytes, &mut bytes).unwrap();
opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest::decode(bytes.as_slice()).unwrap();
}
}
}