use super::super::LabelSetId;
use super::super::Sample;
use super::super::StackTraceId;
use crate::collections::identifiable::Id;
use crate::internal::Timestamp;
use crate::profiles::{DefaultObservationCodec as DefaultCodec, ObservationCodec};
use byteorder::{NativeEndian, ReadBytesExt};
use std::io::{self, Write};
pub type TimestampedObservations = TimestampedObservationsImpl<DefaultCodec>;
pub struct TimestampedObservationsImpl<C: ObservationCodec> {
compressed_timestamped_data: C::Encoder,
sample_types_len: usize,
}
pub struct TimestampedObservationsIterImpl<C: ObservationCodec> {
decoder: C::Decoder,
sample_types_len: usize,
}
impl<C: ObservationCodec> TimestampedObservationsImpl<C> {
const DEFAULT_BUFFER_SIZE: usize = 1024 * 1024;
const MAX_CAPACITY: usize = i32::MAX as usize;
pub fn try_new(sample_types_len: usize) -> io::Result<Self> {
Ok(Self {
compressed_timestamped_data: C::new_encoder(
Self::DEFAULT_BUFFER_SIZE,
Self::MAX_CAPACITY,
)?,
sample_types_len,
})
}
pub fn add(&mut self, sample: Sample, ts: Timestamp, values: &[i64]) -> anyhow::Result<()> {
let stack_trace_id: u32 = sample.stacktrace.into();
let labels_id: u32 = sample.labels.into();
let timestamp = i64::from(ts);
self.compressed_timestamped_data
.write_all(&stack_trace_id.to_ne_bytes())?;
self.compressed_timestamped_data
.write_all(&labels_id.to_ne_bytes())?;
self.compressed_timestamped_data
.write_all(×tamp.to_ne_bytes())?;
for v in values {
self.compressed_timestamped_data
.write_all(&(v).to_ne_bytes())?;
}
Ok(())
}
pub fn try_into_iter(self) -> io::Result<TimestampedObservationsIterImpl<C>> {
Ok(TimestampedObservationsIterImpl {
decoder: C::encoder_into_decoder(self.compressed_timestamped_data)?,
sample_types_len: self.sample_types_len,
})
}
}
impl<C: ObservationCodec> Iterator for TimestampedObservationsIterImpl<C> {
type Item = (Sample, Timestamp, Vec<i64>);
fn next(&mut self) -> Option<Self::Item> {
let stacktrace = self.decoder.read_u32::<NativeEndian>().ok()?;
let labels = self.decoder.read_u32::<NativeEndian>().ok()?;
let ts = self.decoder.read_i64::<NativeEndian>().ok()?;
let mut values = Vec::with_capacity(self.sample_types_len);
for _ in 0..self.sample_types_len {
values.push(self.decoder.read_i64::<NativeEndian>().ok()?);
}
Some((
Sample {
stacktrace: StackTraceId::from_offset(stacktrace as usize),
labels: LabelSetId::from_offset(labels as usize),
},
std::num::NonZeroI64::new(ts)?,
values,
))
}
}