use std::collections::VecDeque;
use std::io::{Cursor, Read as _};
use itertools::Itertools as _;
use re_build_info::CrateVersion;
use crate::rrd::MessageHeader;
use crate::{
CachingApplicationIdInjector, CodecError, Decodable as _, DecodeError, DecoderEntrypoint,
EncodingOptions, RawRrdManifest, Serializer, StreamFooter, StreamFooterEntry, StreamHeader,
ToApplication as _,
};
pub type DecoderTransport = Decoder<re_protos::log_msg::v1alpha1::log_msg::Msg>;
pub type DecoderApp = Decoder<re_log_types::LogMsg>;
pub struct Decoder<T> {
pub(crate) version: Option<CrateVersion>,
pub(crate) options: EncodingOptions,
pub(crate) byte_chunks: ByteChunkBuffer,
pub(crate) state: DecoderState,
pub(crate) app_id_cache: CachingApplicationIdInjector,
rrd_manifests: Vec<re_protos::log_msg::v1alpha1::RrdManifest>,
_decodable: std::marker::PhantomData<T>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DecoderState {
WaitingForStreamHeader,
WaitingForMessageHeader,
WaitingForMessagePayload(MessageHeader),
WaitingForStreamFooter,
Aborted,
}
impl<T: DecoderEntrypoint> Decoder<T> {
#[expect(clippy::new_without_default)]
pub fn new() -> Self {
Self {
version: None,
options: EncodingOptions::PROTOBUF_UNCOMPRESSED,
byte_chunks: ByteChunkBuffer::new(),
state: DecoderState::WaitingForStreamHeader,
app_id_cache: CachingApplicationIdInjector::default(),
rrd_manifests: Vec::new(),
_decodable: std::marker::PhantomData::<T>,
}
}
pub fn push_byte_chunk(&mut self, byte_chunk: Vec<u8>) {
self.byte_chunks.push(byte_chunk);
}
pub fn rrd_manifests(&self) -> Result<Vec<RawRrdManifest>, DecodeError> {
re_tracing::profile_function!();
self.rrd_manifests
.iter()
.map(|m| m.to_application(()).map_err(Into::into))
.collect()
}
pub fn try_read(&mut self) -> Result<Option<T>, DecodeError> {
loop {
let result = self.try_read_impl();
if let Err(DecodeError::Codec(CodecError::StoreIdMissingApplicationId {
store_kind,
recording_id,
})) = result
{
re_log::warn_once!(
"dropping message without application id which arrived before `SetStoreInfo` \
(kind: {store_kind}, recording id: {recording_id}."
);
} else {
return result;
}
}
}
fn try_read_impl(&mut self) -> Result<Option<T>, DecodeError> {
if false {
use bytes::Buf as _;
let num_bytes = self
.byte_chunks
.queue
.iter()
.map(|v| v.remaining())
.sum::<usize>();
eprintln!("state: {:?} (bytes available: {num_bytes})", self.state);
let mut peeked = [0u8; 32];
self.byte_chunks.try_peek(&mut peeked);
let peeked = peeked
.into_iter()
.map(|b| match b {
b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' => String::from(b as char),
v => v.to_string(),
})
.join(", ");
eprintln!("upcoming 32 bytes: [{peeked}]");
}
match self.state {
DecoderState::WaitingForStreamHeader => {
let is_first_header = self.byte_chunks.num_read() == 0;
let position = self.byte_chunks.num_read();
if let Some(header_data) =
self.byte_chunks.try_read(StreamHeader::ENCODED_SIZE_BYTES)
{
let version_and_options = StreamHeader::from_rrd_bytes(&header_data)
.and_then(|h| h.to_version_and_options());
let (version, options) = match version_and_options {
Ok(ok) => ok,
Err(err) => {
if is_first_header {
return Err(err.into());
} else {
re_log::error!(
is_first_header,
position,
"trailing bytes in rrd stream: {header_data:?} ({err})"
);
self.state = DecoderState::Aborted;
return Ok(None);
}
}
};
re_log::trace!(
version = version.to_string(),
?options,
"found StreamHeader"
);
self.version = Some(version);
self.options = options;
match self.options.serializer {
Serializer::Protobuf => self.state = DecoderState::WaitingForMessageHeader,
}
return self.try_read();
}
}
DecoderState::WaitingForMessageHeader => {
let mut peeked = [0u8; MessageHeader::ENCODED_SIZE_BYTES];
if self.byte_chunks.try_peek(&mut peeked) == peeked.len() {
let header = match MessageHeader::from_rrd_bytes(&peeked) {
Ok(header) => header,
Err(crate::rrd::CodecError::FrameDecoding(_)) => {
self.state = DecoderState::WaitingForStreamHeader;
return self.try_read();
}
err @ Err(_) => err?,
};
self.byte_chunks
.try_read(MessageHeader::ENCODED_SIZE_BYTES)
.expect("reading cannot fail if peeking worked");
re_log::trace!(?header, "found MessageHeader");
self.state = DecoderState::WaitingForMessagePayload(header);
return self.try_read();
}
}
DecoderState::WaitingForMessagePayload(header) => {
let start_offset = self.byte_chunks.num_read() as u64;
if let Some(bytes) = self.byte_chunks.try_read(header.len as usize) {
let bytes_len = bytes.len() as u64;
let byte_span = re_chunk::Span {
start: start_offset,
len: bytes_len,
};
let message = match T::decode(
bytes.clone(),
byte_span,
header.kind,
&mut self.app_id_cache,
self.version,
) {
Ok(msg) => msg,
Err(err) => {
self.state = DecoderState::WaitingForMessageHeader;
return Err(err.into());
}
};
if let Some(message) = message {
self.state = DecoderState::WaitingForMessageHeader;
return Ok(Some(message));
} else {
re_log::trace!(
"End of stream - expecting either a StreamFooter or a new Streamheader"
);
if !bytes.is_empty() {
let rrd_footer =
re_protos::log_msg::v1alpha1::RrdFooter::from_rrd_bytes(&bytes)?;
self.rrd_manifests.extend(rrd_footer.manifests);
self.state = DecoderState::WaitingForStreamFooter;
} else {
self.state = DecoderState::WaitingForStreamFooter;
}
return self.try_read();
}
}
}
DecoderState::WaitingForStreamFooter => {
let position = self.byte_chunks.num_read();
if let Some(bytes) = self
.byte_chunks
.try_read(crate::rrd::StreamFooter::ENCODED_SIZE_BYTES)
{
match crate::rrd::StreamFooter::from_rrd_bytes(&bytes) {
Ok(footer) => {
re_log::trace!(?footer, "found StreamFooter");
let StreamFooter {
fourcc: _,
identifier: _,
entries,
} = &footer;
for entry in entries {
let StreamFooterEntry {
rrd_footer_byte_span_from_start_excluding_header,
crc_excluding_header: _,
} = entry;
let rrd_footer_end =
rrd_footer_byte_span_from_start_excluding_header.end();
if rrd_footer_end > position as u64 {
re_log::error!(
position,
bytes = ?bytes,
?footer,
err = "offsets are invalid",
"corrupt footer in rrd stream"
);
}
}
self.state = DecoderState::WaitingForStreamHeader;
return self.try_read();
}
Err(err) => {
re_log::error!(
position,
bytes = ?bytes,
%err,
"corrupt footer in rrd stream"
);
self.state = DecoderState::Aborted;
return Ok(None);
}
}
}
}
DecoderState::Aborted => return Ok(None),
}
Ok(None) }
}
type ByteChunk = Cursor<Vec<u8>>;
pub struct ByteChunkBuffer {
queue: VecDeque<ByteChunk>,
buffer: Vec<u8>,
buffer_fill: usize,
num_read: usize,
}
impl ByteChunkBuffer {
fn new() -> Self {
Self {
queue: VecDeque::with_capacity(16),
buffer: Vec::with_capacity(1024),
buffer_fill: 0,
num_read: 0,
}
}
fn push(&mut self, byte_chunk: Vec<u8>) {
if byte_chunk.is_empty() {
return;
}
self.queue.push_back(ByteChunk::new(byte_chunk));
}
pub fn num_read(&self) -> usize {
self.num_read
}
fn try_read(&mut self, n: usize) -> Option<bytes::Bytes> {
if self.buffer.len() != n {
assert_eq!(
self.buffer_fill, 0,
"`try_read` called with different `n` for incomplete read"
);
self.buffer.resize(n, 0);
self.buffer_fill = 0;
}
while self.buffer_fill != n {
if let Some(byte_chunk) = self.queue.front_mut() {
let remainder = &mut self.buffer[self.buffer_fill..];
self.buffer_fill += byte_chunk
.read(remainder)
.expect("failed to read from byte chunk");
if is_byte_chunk_empty(byte_chunk) {
self.queue.pop_front();
}
} else {
break;
}
}
if self.buffer_fill == n {
self.buffer_fill = 0;
self.num_read += n;
Some(std::mem::take(&mut self.buffer).into())
} else {
None
}
}
fn try_peek(&self, out: &mut [u8]) -> usize {
use std::io::Write as _;
let target_len = out.len();
let mut out = std::io::Cursor::new(out);
let mut n = 0;
if target_len == self.buffer.len() {
n += out
.write(&self.buffer[..self.buffer_fill])
.expect("memcpy, cannot fail");
}
for byte_chunk in &self.queue {
if n == target_len {
return n;
}
let pos = byte_chunk.position() as usize;
n += out
.write(&byte_chunk.get_ref()[pos..])
.expect("memcpy, cannot fail");
}
n
}
}
fn is_byte_chunk_empty(byte_chunk: &ByteChunk) -> bool {
byte_chunk.position() >= byte_chunk.get_ref().len() as u64
}
#[cfg(test)]
mod tests {
use re_chunk::RowId;
use re_log_types::{LogMsg, SetStoreInfo, StoreInfo};
use super::*;
use crate::Encoder;
use crate::rrd::EncodingOptions;
fn fake_log_msg() -> LogMsg {
LogMsg::SetStoreInfo(SetStoreInfo {
row_id: *RowId::ZERO,
info: StoreInfo {
store_version: Some(CrateVersion::LOCAL), ..StoreInfo::testing_with_recording_id("test_recording")
},
})
}
fn test_data(options: EncodingOptions, n: usize) -> (Vec<LogMsg>, Vec<u8>) {
let messages: Vec<_> = (0..n).map(|_| fake_log_msg()).collect();
let mut data = Vec::new();
Encoder::encode_into(
CrateVersion::LOCAL,
options,
messages.clone().into_iter().map(Ok),
&mut data,
)
.unwrap();
(messages, data)
}
macro_rules! assert_message_ok {
($message:expr) => {{
match $message {
Ok(Some(message)) => {
assert_eq!(&fake_log_msg(), &message);
message
}
Ok(None) => {
panic!("failed to read message: message could not be read in full");
}
Err(err) => {
panic!("failed to read message: {err}");
}
}
}};
}
macro_rules! assert_message_incomplete {
($message:expr) => {{
match $message {
Ok(None) => {}
Ok(Some(message)) => {
panic!("expected message to be incomplete, instead received: {message:?}");
}
Err(err) => {
panic!("failed to read message: {err}");
}
}
}};
}
#[test]
fn stream_whole_chunks_uncompressed_protobuf() {
let (input, data) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
let mut decoder = DecoderApp::new();
assert_message_incomplete!(decoder.try_read());
decoder.push_byte_chunk(data);
let decoded_messages: Vec<_> = (0..16)
.map(|_| assert_message_ok!(decoder.try_read()))
.collect();
assert_eq!(input, decoded_messages);
}
#[test]
fn stream_byte_chunks_uncompressed_protobuf() {
let (input, data) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
let mut decoder = DecoderApp::new();
assert_message_incomplete!(decoder.try_read());
for byte_chunk in data.chunks(1) {
decoder.push_byte_chunk(byte_chunk.to_vec());
}
let decoded_messages: Vec<_> = (0..16)
.map(|_| assert_message_ok!(decoder.try_read()))
.collect();
assert_eq!(input, decoded_messages);
}
#[test]
fn two_concatenated_streams_protobuf() {
let (input1, data1) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
let (input2, data2) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
let input = input1.into_iter().chain(input2).collect::<Vec<_>>();
let mut decoder = DecoderApp::new();
assert_message_incomplete!(decoder.try_read());
decoder.push_byte_chunk(data1);
decoder.push_byte_chunk(data2);
let decoded_messages: Vec<_> = (0..32)
.map(|_| assert_message_ok!(decoder.try_read()))
.collect();
assert_eq!(input, decoded_messages);
}
#[test]
fn stream_whole_chunks_compressed_protobuf() {
let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
let mut decoder = DecoderApp::new();
assert_message_incomplete!(decoder.try_read());
decoder.push_byte_chunk(data);
let decoded_messages: Vec<_> = (0..16)
.map(|_| assert_message_ok!(decoder.try_read()))
.collect();
assert_eq!(input, decoded_messages);
}
#[test]
fn stream_byte_chunks_compressed_protobuf() {
let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
let mut decoder = DecoderApp::new();
assert_message_incomplete!(decoder.try_read());
for byte_chunk in data.chunks(1) {
decoder.push_byte_chunk(byte_chunk.to_vec());
}
let decoded_messages: Vec<_> = (0..16)
.map(|_| assert_message_ok!(decoder.try_read()))
.collect();
assert_eq!(input, decoded_messages);
}
#[test]
fn stream_3x16_chunks_protobuf() {
let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
let mut decoder = DecoderApp::new();
let mut decoded_messages = vec![];
let mut byte_chunks = data.chunks(16).peekable();
while byte_chunks.peek().is_some() {
for _ in 0..3 {
if let Some(byte_chunk) = byte_chunks.next() {
decoder.push_byte_chunk(byte_chunk.to_vec());
} else {
break;
}
}
if let Some(message) = decoder.try_read().unwrap() {
decoded_messages.push(message);
}
}
assert_eq!(input, decoded_messages);
}
#[test]
fn stream_irregular_chunks_protobuf() {
let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
let mut data = Cursor::new(data);
let mut decoder = DecoderApp::new();
let mut decoded_messages = vec![];
let pattern = [0, 3, 4, 70, 31];
let mut pattern_index = 0;
let mut temp = [0_u8; 71];
while data.position() < data.get_ref().len() as u64 {
for _ in 0..2 {
let n = data.read(&mut temp[..pattern[pattern_index]]).unwrap();
pattern_index = (pattern_index + 1) % pattern.len();
decoder.push_byte_chunk(temp[..n].to_vec());
}
while let Some(message) = decoder.try_read().unwrap() {
decoded_messages.push(message);
}
}
assert_eq!(input, decoded_messages);
}
#[test]
fn chunk_buffer_read_single_chunk() {
let mut buffer = ByteChunkBuffer::new();
let data = &[0, 1, 2, 3, 4];
assert_eq!(None, buffer.try_read(1));
buffer.push(data.to_vec());
assert_eq!(Some(&data[..3]), buffer.try_read(3).as_deref());
assert_eq!(Some(&data[3..]), buffer.try_read(2).as_deref());
assert_eq!(None, buffer.try_read(1));
}
#[test]
fn chunk_buffer_read_multi_chunk() {
let mut buffer = ByteChunkBuffer::new();
let byte_chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]];
assert_eq!(None, buffer.try_read(1));
buffer.push(byte_chunks[0].to_vec());
assert_eq!(None, buffer.try_read(5));
buffer.push(byte_chunks[1].to_vec());
assert_eq!(Some(&[0, 1, 2, 3, 4][..]), buffer.try_read(5).as_deref());
assert_eq!(None, buffer.try_read(1));
}
#[test]
fn chunk_buffer_read_same_n() {
let mut buffer = ByteChunkBuffer::new();
let data = &[0, 1, 2, 3];
buffer.push(data.to_vec());
assert_eq!(data, buffer.try_read(4).as_deref().unwrap());
assert_eq!(None, buffer.try_read(4));
let data = &[4, 5, 6, 7];
buffer.push(data.to_vec());
assert_eq!(data, buffer.try_read(4).as_deref().unwrap());
assert_eq!(None, buffer.try_read(4));
}
}