re_log_encoding/
encoder.rs

1//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.
2
3use crate::codec;
4use crate::codec::file::{self, encoder};
5use crate::FileHeader;
6use crate::Serializer;
7use crate::{Compression, EncodingOptions};
8use re_build_info::CrateVersion;
9use re_chunk::{ChunkError, ChunkResult};
10use re_log_types::LogMsg;
11
12// ----------------------------------------------------------------------------
13
14/// On failure to encode or serialize a [`LogMsg`].
15#[derive(thiserror::Error, Debug)]
16pub enum EncodeError {
17    #[error("Failed to write: {0}")]
18    Write(#[from] std::io::Error),
19
20    #[error("lz4 error: {0}")]
21    Lz4(#[from] lz4_flex::block::CompressError),
22
23    #[error("Protobuf error: {0}")]
24    Protobuf(#[from] re_protos::external::prost::EncodeError),
25
26    #[error("Arrow error: {0}")]
27    Arrow(#[from] arrow::error::ArrowError),
28
29    #[error("{0}")]
30    Codec(#[from] codec::CodecError),
31
32    #[error("Chunk error: {0}")]
33    Chunk(#[from] ChunkError),
34
35    #[error("Called append on already finished encoder")]
36    AlreadyFinished,
37}
38
39// ----------------------------------------------------------------------------
40
41pub fn encode_to_bytes<'a>(
42    version: CrateVersion,
43    options: EncodingOptions,
44    msgs: impl IntoIterator<Item = &'a LogMsg>,
45) -> Result<Vec<u8>, EncodeError> {
46    let mut bytes: Vec<u8> = vec![];
47    {
48        let mut encoder = Encoder::new(version, options, std::io::Cursor::new(&mut bytes))?;
49        for msg in msgs {
50            encoder.append(msg)?;
51        }
52    }
53    Ok(bytes)
54}
55
56// ----------------------------------------------------------------------------
57
58/// An [`Encoder`] that properly closes the stream on drop.
59///
60/// When dropped, it will automatically insert an end-of-stream marker, if that wasn't already done manually.
61pub struct DroppableEncoder<W: std::io::Write> {
62    encoder: Encoder<W>,
63
64    /// Tracks whether the end-of-stream marker has been written out already.
65    is_finished: bool,
66}
67
68impl<W: std::io::Write> DroppableEncoder<W> {
69    #[inline]
70    pub fn new(
71        version: CrateVersion,
72        options: EncodingOptions,
73        write: W,
74    ) -> Result<Self, EncodeError> {
75        Ok(Self {
76            encoder: Encoder::new(version, options, write)?,
77            is_finished: false,
78        })
79    }
80
81    /// Returns the size in bytes of the encoded data.
82    #[inline]
83    pub fn append(&mut self, message: &LogMsg) -> Result<u64, EncodeError> {
84        self.encoder.append(message)
85    }
86
87    #[inline]
88    pub fn finish(&mut self) -> Result<(), EncodeError> {
89        if !self.is_finished {
90            self.encoder.finish()?;
91        }
92
93        self.is_finished = true;
94
95        Ok(())
96    }
97
98    #[inline]
99    pub fn flush_blocking(&mut self) -> std::io::Result<()> {
100        self.encoder.flush_blocking()
101    }
102}
103
104impl<W: std::io::Write> std::ops::Drop for DroppableEncoder<W> {
105    fn drop(&mut self) {
106        if !self.is_finished {
107            if let Err(err) = self.finish() {
108                re_log::warn!("encoder couldn't be finished: {err}");
109            }
110        }
111    }
112}
113
114/// Encode a stream of [`LogMsg`] into an `.rrd` file.
115///
116/// Prefer [`DroppableEncoder`] if possible, make sure to call [`Encoder::finish`] when appropriate
117/// otherwise.
118pub struct Encoder<W: std::io::Write> {
119    serializer: Serializer,
120    compression: Compression,
121    write: W,
122    scratch: Vec<u8>,
123}
124
125impl<W: std::io::Write> Encoder<W> {
126    pub fn new(
127        version: CrateVersion,
128        options: EncodingOptions,
129        mut write: W,
130    ) -> Result<Self, EncodeError> {
131        FileHeader {
132            magic: *crate::RRD_HEADER,
133            version: version.to_bytes(),
134            options,
135        }
136        .encode(&mut write)?;
137
138        Ok(Self {
139            serializer: options.serializer,
140            compression: options.compression,
141            write,
142            scratch: Vec::new(),
143        })
144    }
145
146    /// Returns the size in bytes of the encoded data.
147    pub fn append(&mut self, message: &LogMsg) -> Result<u64, EncodeError> {
148        re_tracing::profile_function!();
149
150        self.scratch.clear();
151        match self.serializer {
152            Serializer::Protobuf => {
153                encoder::encode(&mut self.scratch, message, self.compression)?;
154
155                self.write
156                    .write_all(&self.scratch)
157                    .map(|_| self.scratch.len() as _)
158                    .map_err(EncodeError::Write)
159            }
160        }
161    }
162
163    // NOTE: This cannot be done in a `Drop` implementation because of `Self::into_inner` which
164    // does a partial move.
165    #[inline]
166    pub fn finish(&mut self) -> Result<(), EncodeError> {
167        match self.serializer {
168            Serializer::Protobuf => {
169                file::MessageHeader {
170                    kind: file::MessageKind::End,
171                    len: 0,
172                }
173                .encode(&mut self.write)?;
174            }
175        }
176        Ok(())
177    }
178
179    #[inline]
180    pub fn flush_blocking(&mut self) -> std::io::Result<()> {
181        self.write.flush()
182    }
183
184    #[inline]
185    pub fn into_inner(self) -> W {
186        self.write
187    }
188}
189
190/// Returns the size in bytes of the encoded data.
191pub fn encode(
192    version: CrateVersion,
193    options: EncodingOptions,
194    messages: impl Iterator<Item = ChunkResult<LogMsg>>,
195    write: &mut impl std::io::Write,
196) -> Result<u64, EncodeError> {
197    re_tracing::profile_function!();
198    let mut encoder = DroppableEncoder::new(version, options, write)?;
199    let mut size_bytes = 0;
200    for message in messages {
201        size_bytes += encoder.append(&message?)?;
202    }
203    Ok(size_bytes)
204}
205
206/// Returns the size in bytes of the encoded data.
207pub fn encode_ref<'a>(
208    version: CrateVersion,
209    options: EncodingOptions,
210    messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
211    write: &mut impl std::io::Write,
212) -> Result<u64, EncodeError> {
213    re_tracing::profile_function!();
214    let mut encoder = DroppableEncoder::new(version, options, write)?;
215    let mut size_bytes = 0;
216    for message in messages {
217        size_bytes += encoder.append(message?)?;
218    }
219    Ok(size_bytes)
220}
221
222pub fn encode_as_bytes(
223    version: CrateVersion,
224    options: EncodingOptions,
225    messages: impl Iterator<Item = ChunkResult<LogMsg>>,
226) -> Result<Vec<u8>, EncodeError> {
227    re_tracing::profile_function!();
228    let mut bytes: Vec<u8> = vec![];
229    let mut encoder = Encoder::new(version, options, &mut bytes)?;
230    for message in messages {
231        encoder.append(&message?)?;
232    }
233    encoder.finish()?;
234    Ok(bytes)
235}
236
237#[inline]
238pub fn local_encoder() -> Result<DroppableEncoder<Vec<u8>>, EncodeError> {
239    DroppableEncoder::new(
240        CrateVersion::LOCAL,
241        EncodingOptions::PROTOBUF_COMPRESSED,
242        Vec::new(),
243    )
244}
245
246#[inline]
247pub fn local_raw_encoder() -> Result<Encoder<Vec<u8>>, EncodeError> {
248    Encoder::new(
249        CrateVersion::LOCAL,
250        EncodingOptions::PROTOBUF_COMPRESSED,
251        Vec::new(),
252    )
253}
254
255#[inline]
256pub fn encode_as_bytes_local(
257    messages: impl Iterator<Item = ChunkResult<LogMsg>>,
258) -> Result<Vec<u8>, EncodeError> {
259    let mut encoder = local_raw_encoder()?;
260    for message in messages {
261        encoder.append(&message?)?;
262    }
263    encoder.finish()?;
264    Ok(encoder.into_inner())
265}
266
267#[inline]
268pub fn encode_ref_as_bytes_local<'a>(
269    messages: impl Iterator<Item = ChunkResult<&'a LogMsg>>,
270) -> Result<Vec<u8>, EncodeError> {
271    let mut encoder = local_raw_encoder()?;
272    for message in messages {
273        encoder.append(message?)?;
274    }
275    encoder.finish()?;
276    Ok(encoder.into_inner())
277}