use std::num::{NonZeroU32, NonZeroUsize};
use bytes::{buf::Writer, BufMut, Bytes, BytesMut};
use metrics::gauge;
use rand::{prelude::SliceRandom, Rng};
use crate::payload::{self, Serialize, TraceAgent};
#[derive(Debug, PartialEq, Eq, Clone, Copy, thiserror::Error)]
pub enum Error {
#[error("Chunk error: {0}")]
Chunk(ChunkError),
}
impl From<ChunkError> for Error {
fn from(error: ChunkError) -> Self {
Error::Chunk(error)
}
}
#[derive(Debug)]
pub(crate) struct Block {
pub(crate) total_bytes: NonZeroU32,
pub(crate) bytes: Bytes,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ChunkError {
EmptyBlockBytes,
InsufficientTotalBytes,
}
impl std::fmt::Display for ChunkError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
ChunkError::EmptyBlockBytes => write!(
f,
"the slice of byte sizes given to `chunk_bytes` was empty"
),
ChunkError::InsufficientTotalBytes => {
write!(f, "the `total_bytes` parameter is insufficient")
}
}
}
}
impl std::error::Error for ChunkError {}
pub(crate) fn chunk_bytes<R>(
rng: &mut R,
total_bytes: NonZeroUsize,
block_byte_sizes: &[NonZeroUsize],
) -> Result<Vec<usize>, Error>
where
R: Rng + Sized,
{
if block_byte_sizes.is_empty() {
return Err(ChunkError::EmptyBlockBytes.into());
}
for bb in block_byte_sizes {
if *bb > total_bytes {
return Err(ChunkError::InsufficientTotalBytes.into());
}
}
let mut chunks = Vec::new();
let mut bytes_remaining = total_bytes.get();
let minimum = block_byte_sizes.iter().min().unwrap().get();
let maximum = block_byte_sizes.iter().max().unwrap().get();
while bytes_remaining > minimum {
let bytes_max = std::cmp::min(maximum, bytes_remaining);
let block_bytes = block_byte_sizes.choose(rng).unwrap().get();
if block_bytes > bytes_max {
continue;
}
chunks.push(block_bytes);
bytes_remaining = bytes_remaining.saturating_sub(block_bytes);
}
Ok(chunks)
}
pub(crate) fn construct_block_cache<R>(
mut rng: R,
payload: &payload::Config,
block_chunks: &[usize],
labels: &Vec<(String, String)>,
) -> Vec<Block>
where
R: Rng,
{
match payload {
payload::Config::TraceAgent(enc) => {
let ta = match enc {
payload::Encoding::Json => TraceAgent::json(),
payload::Encoding::MsgPack => TraceAgent::msg_pack(),
};
construct_block_cache_inner(&mut rng, &ta, block_chunks, labels)
}
payload::Config::Syslog5424 => construct_block_cache_inner(
&mut rng,
&payload::Syslog5424::default(),
block_chunks,
labels,
),
payload::Config::DogStatsD(payload::dogstatsd::Config {
metric_names_minimum,
metric_names_maximum,
tag_keys_minimum,
tag_keys_maximum,
kind_weights,
metric_weights,
metric_multivalue,
}) => {
let mn_range = *metric_names_minimum..*metric_names_maximum;
let tg_range = *tag_keys_minimum..*tag_keys_maximum;
let serializer = payload::DogStatsD::new(
mn_range,
tg_range,
*kind_weights,
*metric_weights,
metric_multivalue,
&mut rng,
);
construct_block_cache_inner(&mut rng, &serializer, block_chunks, labels)
}
payload::Config::Fluent => {
construct_block_cache_inner(&mut rng, &payload::Fluent::default(), block_chunks, labels)
}
payload::Config::SplunkHec { encoding } => construct_block_cache_inner(
&mut rng,
&payload::SplunkHec::new(*encoding),
block_chunks,
labels,
),
payload::Config::ApacheCommon => construct_block_cache_inner(
&mut rng,
&payload::ApacheCommon::default(),
block_chunks,
labels,
),
payload::Config::Ascii => {
construct_block_cache_inner(&mut rng, &payload::Ascii::default(), block_chunks, labels)
}
payload::Config::DatadogLog => {
let serializer = payload::DatadogLog::new(&mut rng);
construct_block_cache_inner(&mut rng, &serializer, block_chunks, labels)
}
payload::Config::Json => {
construct_block_cache_inner(&mut rng, &payload::Json, block_chunks, labels)
}
payload::Config::Static { ref static_path } => construct_block_cache_inner(
&mut rng,
&payload::Static::new(static_path),
block_chunks,
labels,
),
payload::Config::OpentelemetryTraces => {
construct_block_cache_inner(rng, &payload::OpentelemetryTraces, block_chunks, labels)
}
payload::Config::OpentelemetryLogs => {
construct_block_cache_inner(rng, &payload::OpentelemetryLogs, block_chunks, labels)
}
payload::Config::OpentelemetryMetrics => {
construct_block_cache_inner(rng, &payload::OpentelemetryMetrics, block_chunks, labels)
}
}
}
#[allow(clippy::ptr_arg)]
#[allow(clippy::cast_precision_loss)]
#[inline]
fn construct_block_cache_inner<R, S>(
mut rng: R,
serializer: &S,
block_chunks: &[usize],
labels: &Vec<(String, String)>,
) -> Vec<Block>
where
S: Serialize,
R: Rng,
{
let mut block_cache: Vec<Block> = Vec::with_capacity(block_chunks.len());
for block_size in block_chunks {
let mut block: Writer<BytesMut> = BytesMut::with_capacity(*block_size).writer();
serializer
.to_bytes(&mut rng, *block_size, &mut block)
.unwrap();
let bytes: Bytes = block.into_inner().freeze();
if bytes.is_empty() {
continue;
}
let total_bytes = NonZeroU32::new(bytes.len().try_into().unwrap()).unwrap();
block_cache.push(Block { total_bytes, bytes });
}
assert!(!block_cache.is_empty());
gauge!("block_construction_complete", 1.0, labels);
block_cache
}
#[cfg(test)]
mod test {
use std::num::NonZeroUsize;
use proptest::{collection, prelude::*};
use rand::{rngs::SmallRng, SeedableRng};
use crate::block::{chunk_bytes, ChunkError, Error};
fn total_bytes_and_block_bytes() -> impl Strategy<Value = (NonZeroUsize, Vec<NonZeroUsize>)> {
(1..usize::MAX).prop_flat_map(|total_bytes| {
(
Just(NonZeroUsize::new(total_bytes).unwrap()),
collection::vec(
(1..total_bytes).prop_map(|i| NonZeroUsize::new(i).unwrap()),
1..1_000,
),
)
})
}
proptest! {
#[test]
fn chunk_never_size_zero(seed: u64, (total_bytes, block_bytes_sizes) in total_bytes_and_block_bytes()) {
let mut rng = SmallRng::seed_from_u64(seed);
let chunks = chunk_bytes(&mut rng, total_bytes, &block_bytes_sizes).unwrap();
for chunk in chunks {
prop_assert!(chunk > 0);
}
}
}
proptest! {
#[test]
fn chunks_never_empty(seed: u64, (total_bytes, block_bytes_sizes) in total_bytes_and_block_bytes()) {
let mut rng = SmallRng::seed_from_u64(seed);
let chunks = chunk_bytes(&mut rng, total_bytes, &block_bytes_sizes).unwrap();
prop_assert!(!chunks.is_empty());
}
}
proptest! {
#[test]
fn chunks_empty_trigger_error(seed: u64, total_bytes in (1..usize::MAX).prop_map(|i| NonZeroUsize::new(i).unwrap())) {
let mut rng = SmallRng::seed_from_u64(seed);
prop_assert_eq!(Err(Error::Chunk(ChunkError::EmptyBlockBytes)), chunk_bytes(&mut rng, total_bytes, &[]));
}
}
}