pub use re_build_info::CrateVersion; pub use re_protos::common::v1alpha1::ext::Compression;
use crate::rrd::{Decodable, Encodable, OptionsError};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u8)]
pub enum Serializer {
Protobuf = 2,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EncodingOptions {
pub compression: Compression,
pub serializer: Serializer,
}
impl EncodingOptions {
pub const ENCODED_SIZE_BYTES: usize = 4;
}
impl EncodingOptions {
pub const PROTOBUF_COMPRESSED: Self = Self {
compression: Compression::LZ4,
serializer: Serializer::Protobuf,
};
pub const PROTOBUF_UNCOMPRESSED: Self = Self {
compression: Compression::Off,
serializer: Serializer::Protobuf,
};
}
impl Encodable for EncodingOptions {
fn to_rrd_bytes(&self, out: &mut Vec<u8>) -> Result<u64, crate::rrd::CodecError> {
let Self {
compression,
serializer,
} = *self;
let before = out.len() as u64;
out.extend_from_slice(&[
compression as u8,
serializer as u8,
0, 0, ]);
let n = out.len() as u64 - before;
assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n);
Ok(n)
}
}
impl Decodable for EncodingOptions {
fn from_rrd_bytes(data: &[u8]) -> Result<Self, crate::rrd::CodecError> {
match data {
&[compression, serializer, 0, 0] => {
let compression = match compression {
0 => Compression::Off,
1 => Compression::LZ4,
_ => return Err(OptionsError::UnknownCompression(compression).into()),
};
let serializer = match serializer {
1 => return Err(OptionsError::RemovedMsgPackSerializer.into()),
2 => Serializer::Protobuf,
_ => return Err(OptionsError::UnknownSerializer(serializer).into()),
};
Ok(Self {
compression,
serializer,
})
}
_ => Err(OptionsError::UnknownReservedBytes.into()),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct StreamHeader {
pub fourcc: [u8; 4],
pub version: [u8; 4],
pub options: EncodingOptions,
}
impl StreamHeader {
pub const ENCODED_SIZE_BYTES: usize = 12;
}
impl StreamHeader {
pub fn to_version_and_options(
self,
) -> Result<(CrateVersion, EncodingOptions), crate::rrd::CodecError> {
{
let encoded_version = if self.version == [0, 0, 0, 0] {
CrateVersion::new(0, 2, 0)
} else {
CrateVersion::from_bytes(self.version)
};
if encoded_version.major == 0 && encoded_version.minor < 23 {
return Err(crate::rrd::CodecError::IncompatibleRerunVersion {
file: Box::new(encoded_version),
local: Box::new(CrateVersion::LOCAL),
});
} else if encoded_version <= CrateVersion::LOCAL {
} else {
re_log::warn_once!(
"Found data stream with Rerun version {encoded_version} which is newer than the local Rerun version ({}). This file may contain data that is not compatible with this version of Rerun. Consider updating Rerun.",
CrateVersion::LOCAL
);
}
}
Ok((CrateVersion::from_bytes(self.version), self.options))
}
}
impl Encodable for StreamHeader {
fn to_rrd_bytes(&self, out: &mut Vec<u8>) -> Result<u64, crate::rrd::CodecError> {
let Self {
fourcc,
version,
options,
} = self;
let before = out.len() as u64;
out.extend_from_slice(fourcc);
out.extend_from_slice(version);
{
let mut options_out = Vec::new();
options.to_rrd_bytes(&mut options_out)?;
out.extend_from_slice(&options_out);
}
let n = out.len() as u64 - before;
assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n);
Ok(n)
}
}
impl Decodable for StreamHeader {
fn from_rrd_bytes(data: &[u8]) -> Result<Self, crate::rrd::CodecError> {
if data.len() != Self::ENCODED_SIZE_BYTES {
return Err(crate::rrd::CodecError::FrameDecoding(format!(
"invalid StreamHeader length (expected {} but got {})",
Self::ENCODED_SIZE_BYTES,
data.len()
)));
}
let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant");
let fourcc = to_array_4b(&data[0..4]);
if crate::rrd::OLD_RRD_FOURCC.contains(&fourcc) {
return Err(crate::rrd::CodecError::OldRrdVersion);
} else if fourcc != crate::rrd::RRD_FOURCC {
return Err(crate::rrd::CodecError::NotAnRrd(
crate::rrd::NotAnRrdError {
expected_fourcc: crate::rrd::RRD_FOURCC,
actual_fourcc: fourcc,
},
));
}
let version = to_array_4b(&data[4..8]);
let options = EncodingOptions::from_rrd_bytes(&data[8..])?;
Ok(Self {
fourcc,
version,
options,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamFooter {
pub fourcc: [u8; 4],
pub identifier: [u8; 4],
pub entries: Vec<StreamFooterEntry>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamFooterEntry {
pub rrd_footer_byte_span_from_start_excluding_header: re_span::Span<u64>,
pub crc_excluding_header: u32,
}
impl StreamFooter {
pub const ENCODED_SIZE_BYTES: usize =
Self::ENCODED_SIZE_BYTES_IGNORING_ENTRIES + Self::ENCODED_SIZE_BYTES_SINGLE_ENTRY;
const ENCODED_SIZE_BYTES_IGNORING_ENTRIES: usize = 12;
const ENCODED_SIZE_BYTES_SINGLE_ENTRY: usize = 20;
pub const CRC_SEED: u32 = 7850921; pub const RRD_IDENTIFIER: [u8; 4] = *b"FOOT";
pub fn new(
rrd_footer_byte_span_from_start_excluding_header: re_span::Span<u64>,
crc_excluding_header: u32,
) -> Self {
Self {
fourcc: crate::RRD_FOURCC,
identifier: Self::RRD_IDENTIFIER,
entries: vec![StreamFooterEntry {
rrd_footer_byte_span_from_start_excluding_header,
crc_excluding_header,
}],
}
}
pub fn from_rrd_footer_bytes(
rrd_footer_byte_offset_from_start_excluding_header: u64,
rrd_footer_bytes: &[u8],
) -> Self {
let crc_excluding_header = Self::compute_crc(rrd_footer_bytes);
let rrd_footer_byte_span_from_start_excluding_header = re_span::Span {
start: rrd_footer_byte_offset_from_start_excluding_header,
len: rrd_footer_bytes.len() as u64,
};
Self {
fourcc: crate::RRD_FOURCC,
identifier: Self::RRD_IDENTIFIER,
entries: vec![StreamFooterEntry {
rrd_footer_byte_span_from_start_excluding_header,
crc_excluding_header,
}],
}
}
pub fn compute_crc(rrd_footer_bytes_excluding_header: &[u8]) -> u32 {
xxhash_rust::xxh32::xxh32(rrd_footer_bytes_excluding_header, Self::CRC_SEED)
}
}
impl Encodable for StreamFooter {
fn to_rrd_bytes(&self, out: &mut Vec<u8>) -> Result<u64, crate::rrd::CodecError> {
let Self {
fourcc,
identifier,
entries,
} = self;
let num_rrd_footers = entries.len() as u32;
let before = out.len() as u64;
for entry in entries {
out.extend_from_slice(
&entry
.rrd_footer_byte_span_from_start_excluding_header
.start
.to_le_bytes(),
);
out.extend_from_slice(
&entry
.rrd_footer_byte_span_from_start_excluding_header
.len
.to_le_bytes(),
);
out.extend_from_slice(&entry.crc_excluding_header.to_le_bytes());
}
out.extend_from_slice(fourcc);
out.extend_from_slice(identifier);
out.extend_from_slice(&num_rrd_footers.to_le_bytes());
let n = out.len() as u64 - before;
assert_eq!(
Self::ENCODED_SIZE_BYTES as u64,
n,
"Stream footers always point to a single RRD footer at the moment"
);
Ok(n)
}
}
impl Decodable for StreamFooter {
fn from_rrd_bytes(data: &[u8]) -> Result<Self, crate::rrd::CodecError> {
if data.len() < Self::ENCODED_SIZE_BYTES {
return Err(crate::rrd::CodecError::FrameDecoding(format!(
"invalid StreamFooter length (expected {} but got {})",
Self::ENCODED_SIZE_BYTES,
data.len()
)));
}
let fixed_data = &data[data.len() - Self::ENCODED_SIZE_BYTES_IGNORING_ENTRIES..];
let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant");
let fourcc: [u8; 4] = to_array_4b(&fixed_data[0..4]);
if fourcc != crate::RRD_FOURCC {
return Err(crate::rrd::CodecError::FrameDecoding(format!(
"invalid StreamFooter FourCC (expected {:?} but got {:?})",
crate::RRD_FOURCC,
fourcc,
)));
}
let identifier: [u8; 4] = to_array_4b(&fixed_data[4..8]);
if identifier != Self::RRD_IDENTIFIER {
return Err(crate::rrd::CodecError::FrameDecoding(format!(
"invalid StreamFooter identifier (expected {:?} but got {:?})",
Self::RRD_IDENTIFIER,
identifier,
)));
}
let num_rrd_footers = u32::from_le_bytes(
fixed_data[8..12]
.try_into()
.expect("cannot fail, checked above"),
);
let dynamic_data = &data[data.len() - Self::ENCODED_SIZE_BYTES..];
let mut pos = 0;
let entries = (0..num_rrd_footers)
.map(|_| {
let rrd_footer_byte_span_from_start_excluding_header = re_span::Span {
start: u64::from_le_bytes(
dynamic_data[pos..pos + 8]
.try_into()
.expect("cannot fail, checked above"),
),
len: u64::from_le_bytes(
dynamic_data[pos + 8..pos + 16]
.try_into()
.expect("cannot fail, checked above"),
),
};
let crc_excluding_header = u32::from_le_bytes(
dynamic_data[pos + 16..pos + 20]
.try_into()
.expect("cannot fail, checked above"),
);
pos += Self::ENCODED_SIZE_BYTES_SINGLE_ENTRY;
StreamFooterEntry {
rrd_footer_byte_span_from_start_excluding_header,
crc_excluding_header,
}
})
.collect();
Ok(Self {
fourcc,
identifier,
entries,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u64)]
pub enum MessageKind {
End = Self::END,
SetStoreInfo = Self::SET_STORE_INFO,
ArrowMsg = Self::ARROW_MSG,
BlueprintActivationCommand = Self::BLUEPRINT_ACTIVATION_COMMAND,
}
impl MessageKind {
const END: u64 = 0;
const SET_STORE_INFO: u64 = 1;
const ARROW_MSG: u64 = 2;
const BLUEPRINT_ACTIVATION_COMMAND: u64 = 3;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MessageHeader {
pub kind: MessageKind,
pub len: u64,
}
impl MessageHeader {
pub const ENCODED_SIZE_BYTES: usize = 16;
}
impl Encodable for MessageHeader {
fn to_rrd_bytes(&self, out: &mut Vec<u8>) -> Result<u64, crate::rrd::CodecError> {
let Self { kind, len } = *self;
let before = out.len() as u64;
out.extend_from_slice(&(kind as u64).to_le_bytes());
out.extend_from_slice(&len.to_le_bytes());
let n = out.len() as u64 - before;
assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n);
Ok(n)
}
}
impl Decodable for MessageHeader {
fn from_rrd_bytes(data: &[u8]) -> Result<Self, crate::rrd::CodecError> {
if data.len() != Self::ENCODED_SIZE_BYTES {
return Err(crate::rrd::CodecError::FrameDecoding(format!(
"invalid MessageHeader length (expected {} but got {})",
Self::ENCODED_SIZE_BYTES,
data.len()
)));
}
let kind = u64::from_le_bytes(data[0..8].try_into().expect("cannot fail, checked above"));
let kind = match kind {
MessageKind::END => MessageKind::End,
MessageKind::SET_STORE_INFO => MessageKind::SetStoreInfo,
MessageKind::ARROW_MSG => MessageKind::ArrowMsg,
MessageKind::BLUEPRINT_ACTIVATION_COMMAND => MessageKind::BlueprintActivationCommand,
_ => {
return Err(crate::rrd::CodecError::FrameDecoding(format!(
"unknown MessageHeader kind: {kind:?}"
)));
}
};
let len = u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above"));
Ok(Self { kind, len })
}
}