re_log_encoding/decoder/
mod.rs

1//! Decoding [`LogMsg`]:es from `.rrd` files/streams.
2
3pub mod stream;
4
5#[cfg(feature = "decoder")]
6pub mod streaming;
7
8use std::io::BufRead as _;
9use std::io::Read as _;
10
11use re_build_info::CrateVersion;
12use re_log_types::LogMsg;
13
14use crate::FileHeader;
15use crate::OLD_RRD_HEADERS;
16use crate::codec;
17use crate::codec::file::decoder;
18use crate::{EncodingOptions, Serializer};
19
20// ----------------------------------------------------------------------------
21
22fn warn_on_version_mismatch(encoded_version: [u8; 4]) -> Result<(), DecodeError> {
23    // We used 0000 for all .rrd files up until 2023-02-27, post 0.2.0 release:
24    let encoded_version = if encoded_version == [0, 0, 0, 0] {
25        CrateVersion::new(0, 2, 0)
26    } else {
27        CrateVersion::from_bytes(encoded_version)
28    };
29
30    if encoded_version.major == 0 && encoded_version.minor < 23 {
31        // We broke compatibility for 0.23 for (hopefully) the last time.
32        Err(DecodeError::IncompatibleRerunVersion {
33            file: encoded_version,
34            local: CrateVersion::LOCAL,
35        })
36    } else if encoded_version <= CrateVersion::LOCAL {
37        // Loading old files should be fine, and if it is not, the chunk migration in re_sorbet should already log a warning.
38        Ok(())
39    } else {
40        re_log::warn_once!(
41            "Found data stream with Rerun version {encoded_version} which is newer than the local Rerun version ({}). This file may contain data that is not compatible with this version of Rerun. Consider updating Rerun.",
42            CrateVersion::LOCAL
43        );
44        Ok(())
45    }
46}
47
48// ----------------------------------------------------------------------------
49
50/// On failure to encode or serialize a [`LogMsg`].
51#[derive(thiserror::Error, Debug)]
52pub enum DecodeError {
53    #[error("Not an .rrd file")]
54    NotAnRrd,
55
56    #[error("Data was from an old, incompatible Rerun version")]
57    OldRrdVersion,
58
59    #[error(
60        "Data from Rerun version {file}, which is incompatible with the local Rerun version {local}"
61    )]
62    IncompatibleRerunVersion {
63        file: CrateVersion,
64        local: CrateVersion,
65    },
66
67    #[error("Failed to decode the options: {0}")]
68    Options(#[from] crate::OptionsError),
69
70    #[error("Failed to read: {0}")]
71    Read(#[from] std::io::Error),
72
73    #[error("lz4 error: {0}")]
74    Lz4(#[from] lz4_flex::block::DecompressError),
75
76    #[error("Protobuf error: {0}")]
77    Protobuf(#[from] re_protos::external::prost::DecodeError),
78
79    #[error("Could not convert type from protobuf: {0}")]
80    TypeConversion(#[from] re_protos::TypeConversionError),
81
82    #[error("Sorbet error: {0}")]
83    SorbetError(#[from] re_sorbet::SorbetError),
84
85    #[error("Failed to read chunk: {0}")]
86    Chunk(#[from] re_chunk::ChunkError),
87
88    #[error("Arrow error: {0}")]
89    Arrow(#[from] arrow::error::ArrowError),
90
91    #[error("Codec error: {0}")]
92    Codec(#[from] codec::CodecError),
93}
94
95// ----------------------------------------------------------------------------
96
97pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
98    re_tracing::profile_function!();
99    let decoder = Decoder::new(std::io::Cursor::new(bytes))?;
100    let mut msgs = vec![];
101    for msg in decoder {
102        msgs.push(msg?);
103    }
104    Ok(msgs)
105}
106
107// ----------------------------------------------------------------------------
108
109/// Read encoding options from the beginning of the stream.
110pub fn read_options(
111    reader: &mut impl std::io::Read,
112) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
113    let mut data = [0_u8; FileHeader::SIZE];
114    reader.read_exact(&mut data).map_err(DecodeError::Read)?;
115
116    options_from_bytes(&data)
117}
118
119/// Read encoding options from the beginning of the stream asynchronously.
120pub async fn read_options_async(
121    reader: &mut (impl tokio::io::AsyncRead + Unpin),
122) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
123    let mut data = [0_u8; FileHeader::SIZE];
124
125    use tokio::io::AsyncReadExt as _;
126    reader
127        .read_exact(&mut data)
128        .await
129        .map_err(DecodeError::Read)?;
130
131    options_from_bytes(&data)
132}
133
134pub fn options_from_bytes(bytes: &[u8]) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
135    let mut read = std::io::Cursor::new(bytes);
136
137    let FileHeader {
138        magic,
139        version,
140        options,
141    } = FileHeader::decode(&mut read)?;
142
143    if OLD_RRD_HEADERS.contains(&magic) {
144        return Err(DecodeError::OldRrdVersion);
145    } else if &magic != crate::RRD_HEADER {
146        return Err(DecodeError::NotAnRrd);
147    }
148
149    warn_on_version_mismatch(version)?;
150
151    match options.serializer {
152        Serializer::Protobuf => {}
153    }
154
155    Ok((CrateVersion::from_bytes(version), options))
156}
157
158enum Reader<R: std::io::Read> {
159    Raw(R),
160    Buffered(std::io::BufReader<R>),
161}
162
163impl<R: std::io::Read> std::io::Read for Reader<R> {
164    #[inline]
165    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
166        match self {
167            Self::Raw(read) => read.read(buf),
168            Self::Buffered(read) => read.read(buf),
169        }
170    }
171}
172
173pub struct Decoder<R: std::io::Read> {
174    version: CrateVersion,
175    options: EncodingOptions,
176    read: Reader<R>,
177
178    /// The size in bytes of the data that has been decoded up to now.
179    size_bytes: u64,
180}
181
182impl<R: std::io::Read> Decoder<R> {
183    /// Instantiates a new decoder.
184    ///
185    /// This does not support concatenated streams (i.e. streams of bytes where multiple RRD files
186    /// -- not recordings, RRD files! -- follow each other).
187    ///
188    /// If you're not familiar with concatenated RRD streams, then this is probably the function
189    /// that you want to be using.
190    ///
191    /// See also:
192    /// * [`Decoder::new_concatenated`]
193    pub fn new(mut read: R) -> Result<Self, DecodeError> {
194        re_tracing::profile_function!();
195
196        let mut data = [0_u8; FileHeader::SIZE];
197        read.read_exact(&mut data).map_err(DecodeError::Read)?;
198
199        let (version, options) = options_from_bytes(&data)?;
200
201        Ok(Self {
202            version,
203            options,
204            read: Reader::Raw(read),
205            size_bytes: FileHeader::SIZE as _,
206        })
207    }
208
209    pub fn new_with_options(options: EncodingOptions, version: CrateVersion, read: R) -> Self {
210        Self {
211            version,
212            options,
213            read: Reader::Raw(read),
214            size_bytes: FileHeader::SIZE as _,
215        }
216    }
217
218    /// Instantiates a new concatenated decoder.
219    ///
220    /// This will gracefully handle concatenated RRD streams (i.e. streams of bytes where multiple
221    /// RRD files -- not recordings, RRD files! -- follow each other), at the cost of extra
222    /// performance overhead, by looking ahead for potential `FileHeader`s in the stream.
223    ///
224    /// The [`CrateVersion`] of the final, deconcatenated stream will correspond to the most recent
225    /// version among all the versions found in the stream.
226    ///
227    /// This is particularly useful when working with stdio streams.
228    ///
229    /// If you're not familiar with concatenated RRD streams, then you probably want to use
230    /// [`Decoder::new`] instead.
231    ///
232    /// See also:
233    /// * [`Decoder::new`]
234    pub fn new_concatenated(mut read: std::io::BufReader<R>) -> Result<Self, DecodeError> {
235        re_tracing::profile_function!();
236
237        let mut data = [0_u8; FileHeader::SIZE];
238        read.read_exact(&mut data).map_err(DecodeError::Read)?;
239
240        let (version, options) = options_from_bytes(&data)?;
241
242        Ok(Self {
243            version,
244            options,
245            read: Reader::Buffered(read),
246            size_bytes: FileHeader::SIZE as _,
247        })
248    }
249
250    /// Returns the Rerun version that was used to encode the data in the first place.
251    #[inline]
252    pub fn version(&self) -> CrateVersion {
253        self.version
254    }
255
256    // TODO(jan): stop returning number of read bytes, use cursors wrapping readers instead.
257    /// Returns the size in bytes of the data that has been decoded up to now.
258    #[inline]
259    pub fn size_bytes(&self) -> u64 {
260        self.size_bytes
261    }
262
263    /// Returns the next message in the stream.
264    fn next<F, T>(&mut self, mut decoder: F) -> Option<Result<T, DecodeError>>
265    where
266        F: FnMut(&mut Reader<R>) -> Result<(u64, Option<T>), DecodeError>,
267    {
268        re_tracing::profile_function!();
269
270        if self.peek_file_header() {
271            // We've found another file header in the middle of the stream, it's time to switch
272            // gears and start over on this new file.
273
274            let mut data = [0_u8; FileHeader::SIZE];
275            if let Err(err) = self.read.read_exact(&mut data).map_err(DecodeError::Read) {
276                return Some(Err(err));
277            }
278
279            let (version, options) = match options_from_bytes(&data) {
280                Ok(opts) => opts,
281                Err(err) => return Some(Err(err)),
282            };
283
284            self.version = CrateVersion::max(self.version, version);
285            self.options = options;
286            self.size_bytes += FileHeader::SIZE as u64;
287        }
288
289        let msg = match self.options.serializer {
290            Serializer::Protobuf => match decoder(&mut self.read) {
291                Ok((read_bytes, msg)) => {
292                    self.size_bytes += read_bytes;
293                    msg
294                }
295
296                Err(err) => match err {
297                    DecodeError::Read(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
298                        return None;
299                    }
300                    _ => return Some(Err(err)),
301                },
302            },
303        };
304
305        let Some(msg) = msg else {
306            // we might have a concatenated stream, so we peek beyond end of file marker to see
307            if self.peek_file_header() {
308                re_log::debug!(
309                    "Reached end of stream, but it seems we have a concatenated file, continuing"
310                );
311                return self.next(decoder);
312            }
313
314            re_log::trace!("Reached end of stream, iterator complete");
315            return None;
316        };
317
318        Some(Ok(msg))
319    }
320
321    /// Peeks ahead in search of additional `FileHeader`s in the stream.
322    ///
323    /// Returns true if a valid header was found.
324    ///
325    /// No-op if the decoder wasn't initialized with [`Decoder::new_concatenated`].
326    fn peek_file_header(&mut self) -> bool {
327        match &mut self.read {
328            Reader::Raw(_) => false,
329            Reader::Buffered(read) => {
330                if read.fill_buf().map_err(DecodeError::Read).is_err() {
331                    return false;
332                }
333
334                let mut read = std::io::Cursor::new(read.buffer());
335                if FileHeader::decode(&mut read).is_err() {
336                    return false;
337                }
338
339                true
340            }
341        }
342    }
343
344    /// Returns a [`RawIterator`] over the transport-level data (Protobuf).
345    pub fn into_raw_iter(self) -> RawIterator<R> {
346        RawIterator { decoder: self }
347    }
348}
349
350/// Iterator over the transport-level data (Protobuf).
351///
352/// Application-level data (Arrow) is not decoded.
353pub struct RawIterator<R: std::io::Read> {
354    decoder: Decoder<R>,
355}
356
357impl<R: std::io::Read> RawIterator<R> {
358    /// Returns the size in bytes of the data that has been decoded up to now.
359    //
360    // TODO(jan): stop returning number of read bytes, use cursors wrapping readers instead.
361    #[inline]
362    pub fn size_bytes(&self) -> u64 {
363        self.decoder.size_bytes
364    }
365}
366
367impl<R: std::io::Read> Iterator for Decoder<R> {
368    type Item = Result<LogMsg, DecodeError>;
369
370    fn next(&mut self) -> Option<Self::Item> {
371        self.next(decoder::decode_to_app)
372    }
373}
374
375impl<R: std::io::Read> Iterator for RawIterator<R> {
376    type Item = Result<re_protos::log_msg::v1alpha1::log_msg::Msg, DecodeError>;
377
378    fn next(&mut self) -> Option<Self::Item> {
379        self.decoder.next(decoder::decode_to_transport)
380    }
381}
382
383// ----------------------------------------------------------------------------
384
385#[cfg(all(test, feature = "decoder", feature = "encoder"))]
386mod tests {
387    #![allow(clippy::unwrap_used)] // acceptable for tests
388
389    use crate::Compression;
390
391    use super::*;
392    use re_build_info::CrateVersion;
393    use re_chunk::RowId;
394    use re_log_types::{ApplicationId, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
395
396    pub fn fake_log_messages() -> Vec<LogMsg> {
397        let store_id = StoreId::random(StoreKind::Blueprint);
398
399        let arrow_msg = re_chunk::Chunk::builder("test_entity")
400            .with_archetype(
401                re_chunk::RowId::new(),
402                re_log_types::TimePoint::default().with(
403                    re_log_types::Timeline::new_sequence("blueprint"),
404                    re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
405                ),
406                &re_types::blueprint::archetypes::Background::new(
407                    re_types::blueprint::components::BackgroundKind::SolidColor,
408                )
409                .with_color([255, 0, 0]),
410            )
411            .build()
412            .unwrap()
413            .to_arrow_msg()
414            .unwrap();
415
416        vec![
417            LogMsg::SetStoreInfo(SetStoreInfo {
418                row_id: *RowId::new(),
419                info: StoreInfo {
420                    application_id: ApplicationId("test".to_owned()),
421                    store_id: store_id.clone(),
422                    cloned_from: None,
423                    store_source: StoreSource::RustSdk {
424                        rustc_version: String::new(),
425                        llvm_version: String::new(),
426                    },
427                    store_version: Some(CrateVersion::LOCAL),
428                },
429            }),
430            LogMsg::ArrowMsg(store_id.clone(), arrow_msg),
431            LogMsg::BlueprintActivationCommand(re_log_types::BlueprintActivationCommand {
432                blueprint_id: store_id,
433                make_active: true,
434                make_default: true,
435            }),
436        ]
437    }
438
439    #[test]
440    fn test_encode_decode() {
441        let rrd_version = CrateVersion::LOCAL;
442
443        let messages = fake_log_messages();
444
445        let options = [
446            EncodingOptions {
447                compression: Compression::Off,
448                serializer: Serializer::Protobuf,
449            },
450            EncodingOptions {
451                compression: Compression::LZ4,
452                serializer: Serializer::Protobuf,
453            },
454        ];
455
456        for options in options {
457            let mut file = vec![];
458            crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut file)
459                .unwrap();
460
461            let decoded_messages = Decoder::new(&mut file.as_slice())
462                .unwrap()
463                .collect::<Result<Vec<LogMsg>, DecodeError>>()
464                .unwrap();
465
466            similar_asserts::assert_eq!(decoded_messages, messages);
467        }
468    }
469
470    #[test]
471    fn test_concatenated_streams() {
472        let options = [
473            EncodingOptions {
474                compression: Compression::Off,
475                serializer: Serializer::Protobuf,
476            },
477            EncodingOptions {
478                compression: Compression::LZ4,
479                serializer: Serializer::Protobuf,
480            },
481        ];
482
483        for options in options {
484            let mut data = vec![];
485
486            // write "2 files" i.e. 2 streams that end with end-of-stream marker
487            let messages = fake_log_messages();
488
489            // (2 encoders as each encoder writes a file header)
490            let writer = std::io::Cursor::new(&mut data);
491            let mut encoder1 =
492                crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
493            for message in &messages {
494                encoder1.append(message).unwrap();
495            }
496            encoder1.finish().unwrap();
497
498            let written = data.len() as u64;
499            let mut writer = std::io::Cursor::new(&mut data);
500            writer.set_position(written);
501            let mut encoder2 =
502                crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
503            for message in &messages {
504                encoder2.append(message).unwrap();
505            }
506            encoder2.finish().unwrap();
507
508            let decoder =
509                Decoder::new_concatenated(std::io::BufReader::new(data.as_slice())).unwrap();
510
511            let decoded_messages = decoder.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
512
513            similar_asserts::assert_eq!(decoded_messages, [messages.clone(), messages].concat());
514        }
515    }
516}