use std::borrow::Borrow;
use std::collections::HashMap;
use re_build_info::CrateVersion;
use re_chunk::{ChunkError, ChunkResult};
use re_log_types::{LogMsg, StoreId};
use re_sorbet::SorbetError;
use crate::{
CodecError, Compression, Encodable as _, EncodingOptions, MessageHeader, MessageKind,
RrdManifestBuilder, Serializer, StreamFooter, StreamHeader, ToTransport as _,
};
#[derive(thiserror::Error, Debug)]
pub enum EncodeError {
#[error("Called append on already finished encoder")]
AlreadyFinished,
#[error("Called append on already unwrapped encoder")]
AlreadyUnwrapped,
#[error("Failed to write: {0}")]
Write(#[from] std::io::Error),
#[error("{0}")]
Codec(Box<crate::rrd::CodecError>),
#[error("Chunk error: {0}")]
Chunk(Box<ChunkError>),
#[error("Sorbet error: {0}")]
Sorbet(Box<SorbetError>),
}
const _: () = assert!(
std::mem::size_of::<EncodeError>() <= 48,
"Error type is too large. Try to reduce its size by boxing some of its variants.",
);
impl From<CodecError> for EncodeError {
fn from(err: CodecError) -> Self {
Self::Codec(Box::new(err))
}
}
impl From<ChunkError> for EncodeError {
fn from(err: ChunkError) -> Self {
Self::Chunk(Box::new(err))
}
}
impl From<SorbetError> for EncodeError {
fn from(err: SorbetError) -> Self {
Self::Sorbet(Box::new(err))
}
}
pub struct Encoder<W: std::io::Write> {
serializer: Serializer,
compression: Compression,
write: Option<W>,
num_written: u64,
scratch: Vec<u8>,
footer_state: Option<FooterState>,
is_finished: bool,
}
#[derive(Default)]
struct FooterState {
recording_id_scope: Option<re_log_types::StoreId>,
manifests: HashMap<re_log_types::StoreId, ManifestState>,
}
#[derive(Default)]
struct ManifestState {
recording_ids: Vec<re_log_types::StoreId>,
manifest: RrdManifestBuilder,
}
impl FooterState {
fn append(
&mut self,
msg: &re_log_types::LogMsg,
byte_span_excluding_header: re_span::Span<u64>,
byte_size_uncompressed: u64,
) -> Result<(), EncodeError> {
match msg {
LogMsg::SetStoreInfo(msg) => {
self.recording_id_scope = Some(msg.info.store_id.clone());
}
LogMsg::ArrowMsg(store_id, msg) => {
let chunk_batch = re_sorbet::ChunkBatch::try_from(&msg.batch)?;
let recording_id = self
.recording_id_scope
.clone()
.unwrap_or_else(|| store_id.clone());
let ManifestState {
recording_ids,
manifest,
} = self.manifests.entry(recording_id.clone()).or_default();
recording_ids.push(recording_id);
manifest.append(
&chunk_batch,
byte_span_excluding_header,
byte_size_uncompressed,
)?;
}
LogMsg::BlueprintActivationCommand(_) => {}
}
Ok(())
}
fn finish(self) -> Result<crate::RrdFooter, EncodeError> {
let manifests: Result<HashMap<StoreId, crate::RawRrdManifest>, _> = self
.manifests
.into_iter()
.map(|(store_id, state)| {
state
.manifest
.build(store_id.clone())
.map(|m| (store_id, m))
})
.collect();
Ok(crate::RrdFooter {
manifests: manifests?,
})
}
}
impl Encoder<Vec<u8>> {
pub fn local() -> Result<Self, EncodeError> {
Self::new_eager(
CrateVersion::LOCAL,
EncodingOptions::PROTOBUF_COMPRESSED,
Vec::new(),
)
}
pub fn encode(
messages: impl IntoIterator<Item = ChunkResult<impl Borrow<LogMsg>>>,
) -> Result<Vec<u8>, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Self::local()?;
for message in messages {
encoder.append(message?.borrow())?;
}
encoder.finish()?;
encoder.into_inner()
}
}
impl<W: std::io::Write> Encoder<W> {
pub fn new_eager(
version: CrateVersion,
options: EncodingOptions,
mut write: W,
) -> Result<Self, EncodeError> {
let mut out = Vec::new();
StreamHeader {
fourcc: crate::rrd::RRD_FOURCC,
version: version.to_bytes(),
options,
}
.to_rrd_bytes(&mut out)?;
write.write_all(&out)?;
Ok(Self {
serializer: options.serializer,
compression: options.compression,
write: Some(write),
num_written: out.len() as u64,
scratch: Vec::new(),
footer_state: Some(FooterState::default()),
is_finished: false,
})
}
pub fn append(&mut self, message: &re_log_types::LogMsg) -> Result<u64, EncodeError> {
if self.is_finished {
return Err(EncodeError::AlreadyFinished);
}
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};
re_tracing::profile_function!();
let transport = message.to_transport(self.compression)?;
let byte_offset_excluding_header =
self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64;
self.scratch.clear();
let n = match self.serializer {
Serializer::Protobuf => {
transport.to_rrd_bytes(&mut self.scratch)?;
let n = w
.write_all(&self.scratch)
.map(|_| self.scratch.len() as u64)
.map_err(EncodeError::Write)?;
self.num_written += n;
n
}
};
let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64;
let byte_span_excluding_header = re_span::Span {
start: byte_offset_excluding_header,
len: byte_size_excluding_header,
};
if let Some(footer_state) = self.footer_state.as_mut() {
footer_state.append(
message,
byte_span_excluding_header,
transport.byte_size_uncompressed(),
)?;
}
Ok(n)
}
pub fn do_not_emit_footer(&mut self) {
self.footer_state = None;
}
#[expect(unsafe_code)]
pub unsafe fn append_transport(
&mut self,
message: &re_protos::log_msg::v1alpha1::log_msg::Msg,
) -> Result<(re_span::Span<u64>, u64), EncodeError> {
if self.is_finished {
return Err(EncodeError::AlreadyFinished);
}
re_tracing::profile_function!();
self.do_not_emit_footer();
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};
let byte_offset_excluding_header =
self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64;
self.scratch.clear();
let n = match self.serializer {
Serializer::Protobuf => {
message.to_rrd_bytes(&mut self.scratch)?;
let n = w
.write_all(&self.scratch)
.map(|_| self.scratch.len() as u64)
.map_err(EncodeError::Write)?;
self.num_written += n;
n
}
};
let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64;
let byte_span_excluding_header = re_span::Span {
start: byte_offset_excluding_header,
len: byte_size_excluding_header,
};
Ok((byte_span_excluding_header, message.byte_size_uncompressed()))
}
#[inline]
#[expect(unsafe_code)]
pub unsafe fn finish_with_custom_footer(
&mut self,
rrd_footer: &crate::RrdFooter,
) -> Result<(), EncodeError> {
if self.is_finished {
return Ok(());
}
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};
self.is_finished = true;
re_log::debug_assert!(
self.footer_state.is_none(),
"using a custom footer in addition to a builtin one is a very bad idea",
);
Self::finish_impl(w, &mut self.num_written, rrd_footer)
}
#[inline]
pub fn finish(&mut self) -> Result<(), EncodeError> {
if self.is_finished {
return Ok(());
}
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};
self.is_finished = true;
let Some(footer_state) = self.footer_state.take() else {
return Ok(());
};
let rrd_footer = footer_state.finish()?;
Self::finish_impl(w, &mut self.num_written, &rrd_footer)
}
fn finish_impl(
w: &mut W,
num_written: &mut u64,
rrd_footer: &crate::RrdFooter,
) -> Result<(), EncodeError> {
use re_protos::external::prost::Message as _;
let rrd_footer = rrd_footer.to_transport(())?;
let mut out_header = Vec::new();
MessageHeader {
kind: MessageKind::End,
len: rrd_footer.encoded_len() as u64,
}
.to_rrd_bytes(&mut out_header)?;
w.write_all(&out_header).map_err(EncodeError::Write)?;
*num_written += out_header.len() as u64;
let end_msg_byte_offset_from_start_excluding_header = *num_written;
let mut out_rrd_footer = Vec::new();
rrd_footer.to_rrd_bytes(&mut out_rrd_footer)?;
w.write_all(&out_rrd_footer).map_err(EncodeError::Write)?;
*num_written += out_rrd_footer.len() as u64;
let mut out_stream_footer = Vec::new();
StreamFooter::from_rrd_footer_bytes(
end_msg_byte_offset_from_start_excluding_header,
&out_rrd_footer,
)
.to_rrd_bytes(&mut out_stream_footer)?;
w.write_all(&out_stream_footer)
.map_err(EncodeError::Write)?;
*num_written += out_stream_footer.len() as u64;
Ok(())
}
#[inline]
pub fn flush_blocking(&mut self) -> Result<(), EncodeError> {
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};
Ok(w.flush()?)
}
#[inline]
pub fn into_inner(mut self) -> Result<W, EncodeError> {
self.write.take().ok_or(EncodeError::AlreadyUnwrapped)
}
}
impl<W: std::io::Write> Encoder<W> {
pub fn encode_into(
version: CrateVersion,
options: EncodingOptions,
messages: impl IntoIterator<Item = ChunkResult<impl Borrow<LogMsg>>>,
write: &mut W,
) -> Result<u64, EncodeError> {
re_tracing::profile_function!();
let mut encoder = Encoder::new_eager(version, options, write)?;
let mut size_bytes = 0;
for message in messages {
size_bytes += encoder.append(message?.borrow())?;
}
Ok(size_bytes)
}
}
impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
fn drop(&mut self) {
if self.write.is_none() {
return;
}
if let Err(err) = self.finish() {
re_log::warn!("encoder couldn't be finished: {err}");
}
if let Err(err) = self.flush_blocking() {
re_log::warn!("encoder couldn't be flushed: {err}");
}
}
}