re_log_encoding/decoder/
stream.rs

1use std::collections::VecDeque;
2use std::io::Cursor;
3use std::io::Read as _;
4
5use re_build_info::CrateVersion;
6
7use crate::codec::Serializer;
8use crate::codec::file::EncodingOptions;
9use crate::codec::file::FileHeader;
10use crate::decoder::{ApplicationIdInjector, CachingApplicationIdInjector, DecodeError};
11
12// ---
13
14// TODO(cmc): This trait will be vastly improved and documented in a follow up that completely
15// revamps how codecs are defined and organized. For now, it's just undocumented bare minimum to
16// make `Decoder` work everywhere.
17
18pub trait FileEncoded: Sized {
19    fn decode(
20        app_id_injector: &mut impl ApplicationIdInjector,
21        message_kind: crate::codec::file::MessageKind,
22        buf: &[u8],
23    ) -> Result<Option<Self>, DecodeError>;
24
25    fn propagate_version(&mut self, version: Option<CrateVersion>) {
26        _ = self;
27        _ = version;
28    }
29}
30
31impl FileEncoded for re_log_types::LogMsg {
32    fn decode(
33        app_id_injector: &mut impl ApplicationIdInjector,
34        message_kind: crate::codec::file::MessageKind,
35        buf: &[u8],
36    ) -> Result<Option<Self>, DecodeError> {
37        crate::codec::file::decoder::decode_bytes_to_app(app_id_injector, message_kind, buf)
38    }
39
40    fn propagate_version(&mut self, version: Option<CrateVersion>) {
41        if let Self::SetStoreInfo(msg) = self {
42            // Propagate the protocol version from the header into the `StoreInfo` so that all
43            // parts of the app can easily access it.
44            msg.info.store_version = version;
45        }
46    }
47}
48
49impl FileEncoded for re_protos::log_msg::v1alpha1::log_msg::Msg {
50    fn decode(
51        _app_id_injector: &mut impl ApplicationIdInjector,
52        message_kind: crate::codec::file::MessageKind,
53        buf: &[u8],
54    ) -> Result<Option<Self>, DecodeError> {
55        crate::codec::file::decoder::decode_bytes_to_transport(message_kind, buf)
56    }
57}
58
59// ---
60
61/// A type alias for a [`Decoder`] that only decodes from raw bytes up to transport-level
62/// types (i.e. Protobuf payloads are decoded, but Arrow data is never touched).
63///
64/// See also [`DecoderTransport`].
65pub type DecoderTransport = Decoder<re_protos::log_msg::v1alpha1::log_msg::Msg>;
66
67/// A type alias for a [`Decoder`] that decodes all the way from raw bytes to
68/// application-level types (i.e. even Arrow layers are decoded).
69///
70/// See also [`DecoderApp`].
71pub type DecoderApp = Decoder<re_log_types::LogMsg>;
72
73/// The stream decoder is a state machine which ingests byte chunks and outputs messages once it
74/// has enough data to deserialize one.
75///
76/// Byte chunks are given to the stream via [`DecoderApp::push_byte_chunk`], and messages are read
77/// back via [`DecoderApp::try_read`].
78//
79// TODO(cmc): explain when you'd use this over StreamingDecoder and vice-versa.
80pub struct Decoder<T: FileEncoded> {
81    /// The Rerun version used to encode the RRD data.
82    ///
83    /// `None` until a Rerun header has been processed.
84    version: Option<CrateVersion>,
85
86    options: EncodingOptions,
87
88    /// Incoming byte chunks are stored here.
89    byte_chunks: ByteChunkBuffer,
90
91    /// The stream state.
92    state: State,
93
94    /// The application id cache used for migrating old data.
95    app_id_cache: CachingApplicationIdInjector,
96
97    _decodable: std::marker::PhantomData<T>,
98}
99
100///
101/// ```text,ignore
102/// StreamHeader
103///      |
104///      v
105/// MessageHeader
106/// ^           |
107/// |           |
108/// ---Message<--
109/// ```
110#[derive(Clone, Copy, Debug, PartialEq, Eq)]
111enum State {
112    /// The beginning of the stream.
113    ///
114    /// The stream header contains the magic bytes (e.g. `RRF2`), the encoded version, and the
115    /// encoding options.
116    ///
117    /// After the stream header is read once, the state machine will only ever switch between
118    /// `MessageHeader` and `Message`
119    StreamHeader,
120
121    /// The beginning of a Protobuf message.
122    MessageHeader,
123
124    /// The message content, serialized using `Protobuf`.
125    ///
126    /// Compression is only applied to individual `ArrowMsg`s, instead of the entire stream.
127    Message(crate::codec::file::MessageHeader),
128
129    /// Stop reading.
130    Aborted,
131}
132
133impl<T: FileEncoded> Decoder<T> {
134    /// Instantiates a new lazy decoding iterator on top of the given buffered reader.
135    ///
136    /// This does not perform any IO until the returned iterator is polled. I.e. this will not
137    /// fail if the reader doesn't even contain valid RRD data.
138    ///
139    /// This takes a `BufRead` instead of a `Read` because:
140    /// * This guarantees this will never run on non-buffered input.
141    /// * This lets the end-user in control of the buffering, which prevents unfortunately stacked
142    ///   buffers (and thus exploding memory usage and copies).
143    ///
144    /// See also [`Self::decode_lazy_with_opts`].
145    pub fn decode_lazy<R: std::io::BufRead>(reader: R) -> DecoderIterator<T, R> {
146        let wait_for_eos = false;
147        Self::decode_lazy_with_opts(reader, wait_for_eos)
148    }
149
150    /// Same as [`Self::decode_lazy`], with extra options.
151    ///
152    /// * `wait_for_eos`: if true, the decoder will always wait for an end-of-stream marker before
153    ///   calling it a day, even if the underlying reader has already reached its EOF state (…for now).
154    ///   This only really makes sense when running in tail mode (see `RetryableFileReader`), otherwise
155    ///   we'd rather terminate early when a potentially short-circuited (and therefore lacking a proper
156    ///   end-of-stream marker) RRD stream indicates EOF.
157    pub fn decode_lazy_with_opts<R: std::io::BufRead>(
158        reader: R,
159        wait_for_eos: bool,
160    ) -> DecoderIterator<T, R> {
161        let decoder = Self::new();
162        DecoderIterator {
163            decoder,
164            reader,
165            wait_for_eos,
166            first_msg: None,
167        }
168    }
169
170    /// Instantiates a new eager decoding iterator on top of the given buffered reader.
171    ///
172    /// This will perform a first decoding pass immediately. This allows this constructor to fail
173    /// synchronously if the underlying reader doesn't even contain valid RRD data at all (e.g. magic
174    /// bytes are not present).
175    ///
176    /// This takes a `BufRead` instead of a `Read` because:
177    /// * This guarantees this will never run on non-buffered input.
178    /// * This lets the end-user in control of the buffering, which prevents unfortunately stacked
179    ///   buffers (and thus exploding memory usage and copies).
180    ///
181    /// See also [`Self::decode_eager_with_opts`].
182    pub fn decode_eager<R: std::io::BufRead>(
183        reader: R,
184    ) -> Result<DecoderIterator<T, R>, DecodeError> {
185        let wait_for_eos = false;
186        Self::decode_eager_with_opts(reader, wait_for_eos)
187    }
188
189    /// Same as [`Self::decode_eager`], with extra options.
190    ///
191    /// * `wait_for_eos`: if true, the decoder will always wait for an end-of-stream marker before
192    ///   calling it a day, even if the underlying reader has already reached its EOF state (…for now).
193    ///   This only really makes sense when running in tail mode (see `RetryableFileReader`), otherwise
194    ///   we'd rather terminate early when a potentially short-circuited (and therefore lacking a proper
195    ///   end-of-stream marker) RRD stream indicates EOF.
196    pub fn decode_eager_with_opts<R: std::io::BufRead>(
197        reader: R,
198        wait_for_eos: bool,
199    ) -> Result<DecoderIterator<T, R>, DecodeError> {
200        let decoder = Self::new();
201        let mut it = DecoderIterator {
202            decoder,
203            reader,
204            wait_for_eos,
205            first_msg: None,
206        };
207
208        it.first_msg = it.next().transpose()?;
209
210        Ok(it)
211    }
212}
213
214impl<T: FileEncoded> Decoder<T> {
215    #[expect(clippy::new_without_default)]
216    pub fn new() -> Self {
217        Self {
218            version: None,
219            // Note: `options` are filled in once we read `FileHeader`, so this value does not matter.
220            options: EncodingOptions::PROTOBUF_UNCOMPRESSED,
221            byte_chunks: ByteChunkBuffer::new(),
222            state: State::StreamHeader,
223            app_id_cache: CachingApplicationIdInjector::default(),
224            _decodable: std::marker::PhantomData::<T>,
225        }
226    }
227
228    /// Feed a bunch of bytes to the decoding state machine.
229    pub fn push_byte_chunk(&mut self, byte_chunk: Vec<u8>) {
230        self.byte_chunks.push(byte_chunk);
231    }
232
233    /// Read the next message in the stream, dropping messages missing application id that cannot
234    /// be migrated (because they arrived before `SetStoreInfo`).
235    pub fn try_read(&mut self) -> Result<Option<T>, DecodeError> {
236        //TODO(#10730): remove this if/when we remove the legacy `StoreId` migration.
237        loop {
238            let result = self.try_read_impl();
239            if let Err(DecodeError::StoreIdMissingApplicationId {
240                store_kind,
241                recording_id,
242            }) = result
243            {
244                re_log::warn_once!(
245                    "Dropping message without application id which arrived before `SetStoreInfo` \
246                    (kind: {store_kind}, recording id: {recording_id}."
247                );
248            } else {
249                return result;
250            }
251        }
252    }
253
254    /// Read the next message in the stream.
255    fn try_read_impl(&mut self) -> Result<Option<T>, DecodeError> {
256        match self.state {
257            State::StreamHeader => {
258                let is_first_header = self.byte_chunks.num_read() == 0;
259                let position = self.byte_chunks.num_read();
260                if let Some(header) = self.byte_chunks.try_read(FileHeader::SIZE) {
261                    re_log::trace!(?header, "Decoding StreamHeader");
262
263                    // header contains version and compression options
264                    let (version, options) = match FileHeader::options_from_bytes(header) {
265                        Ok(ok) => ok,
266                        Err(err) => {
267                            // We expected a header, but didn't find one!
268                            if is_first_header {
269                                return Err(err);
270                            } else {
271                                re_log::error!(
272                                    is_first_header,
273                                    position,
274                                    "Trailing bytes in rrd stream: {header:?} ({err})"
275                                );
276                                self.state = State::Aborted;
277                                return Ok(None);
278                            }
279                        }
280                    };
281
282                    re_log::trace!(
283                        version = version.to_string(),
284                        ?options,
285                        "Found Stream Header"
286                    );
287
288                    self.version = Some(version);
289                    self.options = options;
290
291                    match self.options.serializer {
292                        Serializer::Protobuf => self.state = State::MessageHeader,
293                    }
294
295                    // we might have data left in the current byte chunk, immediately try to read
296                    // length of the next message.
297                    return self.try_read();
298                }
299            }
300
301            State::MessageHeader => {
302                let mut peeked = [0u8; crate::codec::file::MessageHeader::SIZE_BYTES];
303                if self.byte_chunks.try_peek(&mut peeked) == peeked.len() {
304                    let header = match crate::codec::file::MessageHeader::from_bytes(&peeked) {
305                        Ok(header) => header,
306
307                        Err(DecodeError::Codec(crate::codec::CodecError::UnknownMessageHeader)) => {
308                            // We failed to decode a `MessageHeader`: it might be because the
309                            // stream is corrupt, or it might be because it just switched to a
310                            // different, concatenated recording without having the courtesy of
311                            // announcing it via an EOS marker.
312                            self.state = State::StreamHeader;
313                            return self.try_read();
314                        }
315
316                        err @ Err(_) => err?,
317                    };
318
319                    self.byte_chunks
320                        .try_read(crate::codec::file::MessageHeader::SIZE_BYTES)
321                        .expect("reading cannot fail if peeking worked");
322
323                    re_log::trace!(?header, "MessageHeader");
324
325                    self.state = State::Message(header);
326                    // we might have data left in the current byte chunk, immediately try to read
327                    // the message content.
328                    return self.try_read();
329                }
330            }
331
332            State::Message(header) => {
333                if let Some(bytes) = self.byte_chunks.try_read(header.len as usize) {
334                    re_log::trace!(?header, "Read message");
335
336                    let message = match T::decode(&mut self.app_id_cache, header.kind, bytes) {
337                        Ok(msg) => msg,
338                        Err(err) => {
339                            // We successfully parsed a header, but decided to drop the message altogether.
340                            // We must go back to looking for headers, or the decoder will just be stuck in a dead
341                            // state forever.
342                            self.state = State::MessageHeader;
343                            return Err(err);
344                        }
345                    };
346
347                    if let Some(mut message) = message {
348                        re_log::trace!("Decoded new message");
349
350                        message.propagate_version(self.version);
351                        self.state = State::MessageHeader;
352                        return Ok(Some(message));
353                    } else {
354                        re_log::trace!("End of stream - expecting a new Streamheader");
355
356                        // `None` means an end-of-stream marker was hit, but there might be another concatenated
357                        // stream behind, so try to start all over again.
358                        self.state = State::StreamHeader;
359                        return self.try_read();
360                    }
361                }
362            }
363
364            State::Aborted => {
365                return Ok(None);
366            }
367        }
368
369        Ok(None)
370    }
371}
372
373// ---
374
375/// Iteratively decodes the contents of an arbitrary buffered reader.
376pub struct DecoderIterator<T: FileEncoded, R: std::io::BufRead> {
377    decoder: Decoder<T>,
378    reader: R,
379
380    /// If true, the decoder will always wait for an end-of-stream marker before calling it a day,
381    /// even if the underlying reader has already reached its EOF state (…for now).
382    ///
383    /// This only really makes sense when running in tail mode (see `RetryableFileReader`),
384    /// otherwise we'd rather terminate early when a potentially short-circuited (and therefore
385    /// lacking a proper end-of-stream marker) RRD stream indicates EOF.
386    wait_for_eos: bool,
387
388    /// See [`Decoder::decode_eager`] for more information.
389    first_msg: Option<T>,
390}
391
392impl<T: FileEncoded, R: std::io::BufRead> DecoderIterator<T, R> {
393    pub fn num_bytes_processed(&self) -> u64 {
394        self.decoder.byte_chunks.num_read() as _
395    }
396}
397
398impl<T: FileEncoded, R: std::io::BufRead> std::iter::Iterator for DecoderIterator<T, R> {
399    type Item = Result<T, DecodeError>;
400
401    fn next(&mut self) -> Option<Self::Item> {
402        if let Some(first_msg) = self.first_msg.take() {
403            // The iterator was eagerly initialized so make sure to return the first message if there's any.
404            return Some(Ok(first_msg));
405        }
406
407        loop {
408            match self.decoder.try_read() {
409                Ok(Some(msg)) => return Some(Ok(msg)),
410                Ok(None) => {}
411                Err(err) => return Some(Err(err)),
412            }
413
414            match self.reader.fill_buf() {
415                // EOF
416                Ok([]) => {
417                    // There's nothing more to read…
418                    match self.decoder.try_read() {
419                        // …but we still have enough buffered that we can still manage to decode
420                        // more messages, so go on for now.
421                        Ok(Some(msg)) => return Some(Ok(msg)),
422
423                        // …and we don't want to explicitly wait around for more to come, so just leave.
424                        Ok(None) if !self.wait_for_eos => return None,
425
426                        // …and the underlying decoder already considers that it's done (i.e. it's
427                        // waiting for a whole new stream to begin): time to stop.
428                        Ok(None) if self.decoder.state == State::StreamHeader => {
429                            return None;
430                        }
431
432                        // …but the underlying decoder doesn't believe it's done yet (i.e. it's still
433                        // waiting for an EOS marker to show up): we continue.
434                        Ok(None) => {}
435
436                        Err(err) => return Some(Err(err)),
437                    }
438                }
439
440                Ok(buf) => {
441                    self.decoder.push_byte_chunk(buf.to_vec());
442                    let len = buf.len(); // borrowck limitation
443                    self.reader.consume(len);
444                }
445
446                Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {}
447
448                Err(err) => return Some(Err(err.into())),
449            }
450        }
451    }
452}
453
454// ---
455
456/// A bunch of contiguous bytes.
457type ByteChunk = Cursor<Vec<u8>>;
458
459struct ByteChunkBuffer {
460    /// Any incoming byte chunks are queued until they are emptied.
461    queue: VecDeque<ByteChunk>,
462
463    /// This buffer is used as scratch space for any read bytes, so that we can return a contiguous
464    /// slice from `try_read`.
465    buffer: Vec<u8>,
466
467    /// How many bytes of valid data are currently in `self.buffer`.
468    buffer_fill: usize,
469
470    /// How many bytes have been read with [`Self::try_read`] so far?
471    num_read: usize,
472}
473
474impl ByteChunkBuffer {
475    fn new() -> Self {
476        Self {
477            queue: VecDeque::with_capacity(16),
478            buffer: Vec::with_capacity(1024),
479            buffer_fill: 0,
480            num_read: 0,
481        }
482    }
483
484    fn push(&mut self, byte_chunk: Vec<u8>) {
485        if byte_chunk.is_empty() {
486            return;
487        }
488        self.queue.push_back(ByteChunk::new(byte_chunk));
489    }
490
491    /// How many bytes have been read with [`Self::try_read`] so far?
492    fn num_read(&self) -> usize {
493        self.num_read
494    }
495
496    /// Attempt to read exactly `n` bytes out of the queued byte chunks.
497    ///
498    /// Returns `None` if there is not enough data to return a slice of `n` bytes.
499    ///
500    /// NOTE: `try_read` *must* be called with the same `n` until it returns `Some`,
501    /// otherwise this will discard any previously buffered data.
502    fn try_read(&mut self, n: usize) -> Option<&[u8]> {
503        // resize the buffer if the target has changed
504        if self.buffer.len() != n {
505            assert_eq!(
506                self.buffer_fill, 0,
507                "`try_read` called with different `n` for incomplete read"
508            );
509            self.buffer.resize(n, 0);
510            self.buffer_fill = 0;
511        }
512
513        // try to read some bytes from the front of the queue,
514        // until either:
515        // - we've read enough to return a slice of `n` bytes
516        // - we run out of byte chunks to read
517        // while also discarding any empty byte chunks
518        while self.buffer_fill != n {
519            if let Some(byte_chunk) = self.queue.front_mut() {
520                let remainder = &mut self.buffer[self.buffer_fill..];
521                self.buffer_fill += byte_chunk
522                    .read(remainder)
523                    .expect("failed to read from byte chunk");
524                if is_byte_chunk_empty(byte_chunk) {
525                    self.queue.pop_front();
526                }
527            } else {
528                break;
529            }
530        }
531
532        if self.buffer_fill == n {
533            // ensure that a successful call to `try_read(N)`
534            // followed by another call to `try_read(N)` with the same `N`
535            // won't erroneously return the same bytes
536            self.buffer_fill = 0;
537            self.num_read += n;
538            Some(&self.buffer[..])
539        } else {
540            None
541        }
542    }
543
544    /// Attempt to peek exactly `n` bytes from of the queued byte chunks.
545    ///
546    /// Returns the number of bytes that could successfully be peeked, and therefore copied into `out`.
547    /// The returned value is guaranteed to never exceed `out.len`.
548    fn try_peek(&self, out: &mut [u8]) -> usize {
549        use std::io::Write as _;
550
551        let target_len = out.len();
552
553        let mut out = std::io::Cursor::new(out);
554        let mut n = 0;
555
556        // `try_read` will never read from the active buffer if `n` changes, so we must emulate the
557        // same behavior.
558        if target_len == self.buffer.len() {
559            n += out
560                .write(&self.buffer[..self.buffer_fill])
561                .expect("memcpy, cannot fail");
562        }
563
564        for byte_chunk in &self.queue {
565            if n == target_len {
566                return n;
567            }
568
569            let pos = byte_chunk.position() as usize;
570            n += out
571                .write(&byte_chunk.get_ref()[pos..])
572                .expect("memcpy, cannot fail");
573        }
574
575        n
576    }
577}
578
579fn is_byte_chunk_empty(byte_chunk: &ByteChunk) -> bool {
580    byte_chunk.position() >= byte_chunk.get_ref().len() as u64
581}
582
583// ---
584
585#[cfg(test)]
586mod tests {
587    use re_chunk::RowId;
588    use re_log_types::{LogMsg, SetStoreInfo, StoreInfo};
589
590    use crate::Encoder;
591    use crate::EncodingOptions;
592
593    use super::*;
594
595    fn fake_log_msg() -> LogMsg {
596        LogMsg::SetStoreInfo(SetStoreInfo {
597            row_id: *RowId::ZERO,
598            info: StoreInfo {
599                store_version: Some(CrateVersion::LOCAL), // Encoder sets the crate version
600                ..StoreInfo::testing()
601            },
602        })
603    }
604
605    fn test_data(options: EncodingOptions, n: usize) -> (Vec<LogMsg>, Vec<u8>) {
606        let messages: Vec<_> = (0..n).map(|_| fake_log_msg()).collect();
607
608        let mut data = Vec::new();
609        Encoder::encode_into(
610            CrateVersion::LOCAL,
611            options,
612            messages.clone().into_iter().map(Ok),
613            &mut data,
614        )
615        .unwrap();
616
617        (messages, data)
618    }
619
620    macro_rules! assert_message_ok {
621        ($message:expr) => {{
622            match $message {
623                Ok(Some(message)) => {
624                    assert_eq!(&fake_log_msg(), &message);
625                    message
626                }
627                Ok(None) => {
628                    panic!("failed to read message: message could not be read in full");
629                }
630                Err(err) => {
631                    panic!("failed to read message: {err}");
632                }
633            }
634        }};
635    }
636
637    macro_rules! assert_message_incomplete {
638        ($message:expr) => {{
639            match $message {
640                Ok(None) => {}
641                Ok(Some(message)) => {
642                    panic!("expected message to be incomplete, instead received: {message:?}");
643                }
644                Err(err) => {
645                    panic!("failed to read message: {err}");
646                }
647            }
648        }};
649    }
650
651    #[test]
652    fn stream_whole_chunks_uncompressed_protobuf() {
653        let (input, data) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
654
655        let mut decoder = DecoderApp::new();
656
657        assert_message_incomplete!(decoder.try_read());
658
659        decoder.push_byte_chunk(data);
660
661        let decoded_messages: Vec<_> = (0..16)
662            .map(|_| assert_message_ok!(decoder.try_read()))
663            .collect();
664
665        assert_eq!(input, decoded_messages);
666    }
667
668    #[test]
669    fn stream_byte_chunks_uncompressed_protobuf() {
670        let (input, data) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
671
672        let mut decoder = DecoderApp::new();
673
674        assert_message_incomplete!(decoder.try_read());
675
676        for byte_chunk in data.chunks(1) {
677            decoder.push_byte_chunk(byte_chunk.to_vec());
678        }
679
680        let decoded_messages: Vec<_> = (0..16)
681            .map(|_| assert_message_ok!(decoder.try_read()))
682            .collect();
683
684        assert_eq!(input, decoded_messages);
685    }
686
687    #[test]
688    fn two_concatenated_streams_protobuf() {
689        let (input1, data1) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
690        let (input2, data2) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
691        let input = input1.into_iter().chain(input2).collect::<Vec<_>>();
692
693        let mut decoder = DecoderApp::new();
694
695        assert_message_incomplete!(decoder.try_read());
696
697        decoder.push_byte_chunk(data1);
698        decoder.push_byte_chunk(data2);
699
700        let decoded_messages: Vec<_> = (0..32)
701            .map(|_| assert_message_ok!(decoder.try_read()))
702            .collect();
703
704        assert_eq!(input, decoded_messages);
705    }
706
707    #[test]
708    fn stream_whole_chunks_compressed_protobuf() {
709        let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
710
711        let mut decoder = DecoderApp::new();
712
713        assert_message_incomplete!(decoder.try_read());
714
715        decoder.push_byte_chunk(data);
716
717        let decoded_messages: Vec<_> = (0..16)
718            .map(|_| assert_message_ok!(decoder.try_read()))
719            .collect();
720
721        assert_eq!(input, decoded_messages);
722    }
723
724    #[test]
725    fn stream_byte_chunks_compressed_protobuf() {
726        let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
727
728        let mut decoder = DecoderApp::new();
729
730        assert_message_incomplete!(decoder.try_read());
731
732        for byte_chunk in data.chunks(1) {
733            decoder.push_byte_chunk(byte_chunk.to_vec());
734        }
735
736        let decoded_messages: Vec<_> = (0..16)
737            .map(|_| assert_message_ok!(decoder.try_read()))
738            .collect();
739
740        assert_eq!(input, decoded_messages);
741    }
742
743    #[test]
744    fn stream_3x16_chunks_protobuf() {
745        let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
746
747        let mut decoder = DecoderApp::new();
748        let mut decoded_messages = vec![];
749
750        // keep pushing 3 chunks of 16 bytes at a time, and attempting to read messages
751        // until there are no more chunks
752        let mut byte_chunks = data.chunks(16).peekable();
753        while byte_chunks.peek().is_some() {
754            for _ in 0..3 {
755                if let Some(byte_chunk) = byte_chunks.next() {
756                    decoder.push_byte_chunk(byte_chunk.to_vec());
757                } else {
758                    break;
759                }
760            }
761
762            if let Some(message) = decoder.try_read().unwrap() {
763                decoded_messages.push(message);
764            }
765        }
766
767        assert_eq!(input, decoded_messages);
768    }
769
770    #[test]
771    fn stream_irregular_chunks_protobuf() {
772        // this attempts to stress-test `try_read` with byte chunks of various sizes
773
774        let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
775        let mut data = Cursor::new(data);
776
777        let mut decoder = DecoderApp::new();
778        let mut decoded_messages = vec![];
779
780        // read byte chunks 2xN bytes at a time, where `N` comes from a regular pattern
781        // this is slightly closer to using random numbers while still being
782        // fully deterministic
783
784        let pattern = [0, 3, 4, 70, 31];
785        let mut pattern_index = 0;
786        let mut temp = [0_u8; 71];
787
788        while data.position() < data.get_ref().len() as u64 {
789            for _ in 0..2 {
790                let n = data.read(&mut temp[..pattern[pattern_index]]).unwrap();
791                pattern_index = (pattern_index + 1) % pattern.len();
792                decoder.push_byte_chunk(temp[..n].to_vec());
793            }
794
795            while let Some(message) = decoder.try_read().unwrap() {
796                decoded_messages.push(message);
797            }
798        }
799
800        assert_eq!(input, decoded_messages);
801    }
802
803    #[test]
804    fn chunk_buffer_read_single_chunk() {
805        // reading smaller `n` from multiple larger byte chunks
806
807        let mut buffer = ByteChunkBuffer::new();
808
809        let data = &[0, 1, 2, 3, 4];
810        assert_eq!(None, buffer.try_read(1));
811        buffer.push(data.to_vec());
812        assert_eq!(Some(&data[..3]), buffer.try_read(3));
813        assert_eq!(Some(&data[3..]), buffer.try_read(2));
814        assert_eq!(None, buffer.try_read(1));
815    }
816
817    #[test]
818    fn chunk_buffer_read_multi_chunk() {
819        // reading a large `n` from multiple smaller byte chunks
820
821        let mut buffer = ByteChunkBuffer::new();
822
823        let byte_chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]];
824
825        assert_eq!(None, buffer.try_read(1));
826        buffer.push(byte_chunks[0].to_vec());
827        assert_eq!(None, buffer.try_read(5));
828        buffer.push(byte_chunks[1].to_vec());
829        assert_eq!(Some(&[0, 1, 2, 3, 4][..]), buffer.try_read(5));
830        assert_eq!(None, buffer.try_read(1));
831    }
832
833    #[test]
834    fn chunk_buffer_read_same_n() {
835        // reading the same `n` multiple times should not return the same bytes
836
837        let mut buffer = ByteChunkBuffer::new();
838
839        let data = &[0, 1, 2, 3];
840        buffer.push(data.to_vec());
841        assert_eq!(data, buffer.try_read(4).unwrap());
842        assert_eq!(None, buffer.try_read(4));
843        let data = &[4, 5, 6, 7];
844        buffer.push(data.to_vec());
845        assert_eq!(data, buffer.try_read(4).unwrap());
846        assert_eq!(None, buffer.try_read(4));
847    }
848}
849
850// Legacy tests from the old decoder implementation.
851#[cfg(all(test, feature = "decoder", feature = "encoder"))]
852mod tests_legacy {
853    #![expect(clippy::unwrap_used)] // acceptable for tests
854
855    use re_build_info::CrateVersion;
856    use re_chunk::RowId;
857    use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
858    use re_protos::log_msg::v1alpha1 as proto;
859    use re_protos::log_msg::v1alpha1::LogMsg as LogMsgProto;
860
861    use crate::Encoder;
862    use crate::codec::Compression;
863
864    use super::*;
865
866    fn fake_log_messages() -> Vec<LogMsg> {
867        let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
868
869        let arrow_msg = re_chunk::Chunk::builder("test_entity")
870            .with_archetype(
871                re_chunk::RowId::new(),
872                re_log_types::TimePoint::default().with(
873                    re_log_types::Timeline::new_sequence("blueprint"),
874                    re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
875                ),
876                &re_types::blueprint::archetypes::Background::new(
877                    re_types::blueprint::components::BackgroundKind::SolidColor,
878                )
879                .with_color([255, 0, 0]),
880            )
881            .build()
882            .unwrap()
883            .to_arrow_msg()
884            .unwrap();
885
886        vec![
887            LogMsg::SetStoreInfo(SetStoreInfo {
888                row_id: *RowId::new(),
889                info: StoreInfo::new(
890                    store_id.clone(),
891                    StoreSource::RustSdk {
892                        rustc_version: String::new(),
893                        llvm_version: String::new(),
894                    },
895                ),
896            }),
897            LogMsg::ArrowMsg(store_id.clone(), arrow_msg),
898            LogMsg::BlueprintActivationCommand(re_log_types::BlueprintActivationCommand {
899                blueprint_id: store_id,
900                make_active: true,
901                make_default: true,
902            }),
903        ]
904    }
905
906    /// Convert the test log message to their proto version and tweak them so that:
907    /// - `StoreId` do not have an `ApplicationId`
908    /// - `StoreInfo` does have an `ApplicationId`
909    #[expect(deprecated)]
910    fn legacy_fake_log_messages() -> Vec<LogMsgProto> {
911        fake_log_messages()
912            .into_iter()
913            .map(|msg| {
914                crate::protobuf_conversions::log_msg_to_proto(msg, Compression::Off).unwrap()
915            })
916            .map(|mut log_msg| {
917                match &mut log_msg.msg {
918                    None => panic!("Unexpected `LogMsg` without payload"),
919
920                    Some(proto::log_msg::Msg::SetStoreInfo(set_store_info)) => {
921                        if let Some(store_info) = &mut set_store_info.info {
922                            let Some(mut store_id) = store_info.store_id.clone() else {
923                                panic!("Unexpected missing `StoreId`");
924                            };
925
926                            // this should be a non-legacy proto
927                            assert_eq!(store_info.application_id, None);
928                            assert!(store_id.application_id.is_some());
929
930                            // turn this into a legacy proto
931                            store_info.application_id = store_id.application_id;
932                            store_id.application_id = None;
933                            store_info.store_id = Some(store_id);
934                        } else {
935                            panic!("Unexpected missing `store_info`")
936                        }
937                    }
938                    Some(
939                        proto::log_msg::Msg::ArrowMsg(proto::ArrowMsg { store_id, .. })
940                        | proto::log_msg::Msg::BlueprintActivationCommand(
941                            proto::BlueprintActivationCommand {
942                                blueprint_id: store_id,
943                                ..
944                            },
945                        ),
946                    ) => {
947                        let mut legacy_store_id =
948                            store_id.clone().expect("messages should have store ids");
949                        assert!(legacy_store_id.application_id.is_some());
950
951                        // make legacy
952                        legacy_store_id.application_id = None;
953                        *store_id = Some(legacy_store_id);
954                    }
955                }
956
957                log_msg
958            })
959            .collect()
960    }
961
962    impl<W: std::io::Write> Encoder<W> {
963        /// Like [`Self::encode_into`], but intentionally omits the end-of-stream marker, for
964        /// testing purposes.
965        fn encode_into_without_eos(
966            version: CrateVersion,
967            options: EncodingOptions,
968            messages: impl IntoIterator<Item = re_chunk::ChunkResult<impl std::borrow::Borrow<LogMsg>>>,
969            write: &mut W,
970        ) -> Result<u64, crate::EncodeError> {
971            re_tracing::profile_function!();
972            let mut encoder = Encoder::new(version, options, write)?;
973            let mut size_bytes = 0;
974            for message in messages {
975                size_bytes += encoder.append(message?.borrow())?;
976            }
977
978            {
979                encoder.flush_blocking()?;
980
981                // Intentionally leak it so we don't include the EOS marker on drop.
982                #[expect(clippy::mem_forget)]
983                std::mem::forget(encoder);
984            }
985
986            Ok(size_bytes)
987        }
988    }
989
990    #[test]
991    fn test_encode_decode() {
992        let rrd_version = CrateVersion::LOCAL;
993
994        let messages = fake_log_messages();
995
996        let options = [
997            EncodingOptions {
998                compression: Compression::Off,
999                serializer: Serializer::Protobuf,
1000            },
1001            EncodingOptions {
1002                compression: Compression::LZ4,
1003                serializer: Serializer::Protobuf,
1004            },
1005        ];
1006
1007        // Low-level
1008        for options in options {
1009            let mut file = vec![];
1010            crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut file)
1011                .unwrap();
1012
1013            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1014                .map(Result::unwrap)
1015                .collect();
1016            similar_asserts::assert_eq!(decoded_messages, messages);
1017        }
1018
1019        // Iterator
1020        for options in options {
1021            let mut file = vec![];
1022            crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut file)
1023                .unwrap();
1024
1025            let reader = std::io::BufReader::new(file.as_slice());
1026            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(reader)
1027                .map(Result::unwrap)
1028                .collect();
1029            similar_asserts::assert_eq!(decoded_messages, messages);
1030        }
1031
1032        // Iterator: no EOS marker
1033        for options in options {
1034            let mut file = vec![];
1035            crate::Encoder::encode_into_without_eos(
1036                rrd_version,
1037                options,
1038                messages.iter().map(Ok),
1039                &mut file,
1040            )
1041            .unwrap();
1042
1043            let reader = std::io::BufReader::new(file.as_slice());
1044            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(reader)
1045                .map(Result::unwrap)
1046                .collect();
1047            similar_asserts::assert_eq!(decoded_messages, messages);
1048        }
1049    }
1050
1051    /// Test that legacy messages (aka `StoreId` without an application id) are properly decoded.
1052    #[test]
1053    fn test_decode_legacy() {
1054        let rrd_version = CrateVersion::LOCAL;
1055
1056        let messages = legacy_fake_log_messages();
1057
1058        let options = [
1059            EncodingOptions {
1060                compression: Compression::Off,
1061                serializer: Serializer::Protobuf,
1062            },
1063            EncodingOptions {
1064                compression: Compression::LZ4,
1065                serializer: Serializer::Protobuf,
1066            },
1067        ];
1068
1069        for options in options {
1070            let mut file = vec![];
1071
1072            let mut encoder = Encoder::new(rrd_version, options, &mut file).unwrap();
1073            for message in messages.clone() {
1074                encoder
1075                    .append_proto(message)
1076                    .expect("encoding should succeed");
1077            }
1078            drop(encoder);
1079
1080            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1081                .map(Result::unwrap)
1082                .collect();
1083            assert_eq!(decoded_messages.len(), messages.len());
1084        }
1085    }
1086
1087    /// Test that legacy messages (aka `StoreId` without an application id) that arrive _before_
1088    /// a `SetStoreInfo` are dropped without failing.
1089    #[test]
1090    fn test_decode_legacy_out_of_order() {
1091        let rrd_version = CrateVersion::LOCAL;
1092
1093        let messages = legacy_fake_log_messages();
1094
1095        // ensure the test data is as we expect
1096        let orig_message_count = messages.len();
1097        assert_eq!(orig_message_count, 3);
1098        assert!(matches!(
1099            messages[0].msg,
1100            Some(proto::log_msg::Msg::SetStoreInfo(..))
1101        ));
1102        assert!(matches!(
1103            messages[1].msg,
1104            Some(proto::log_msg::Msg::ArrowMsg(..))
1105        ));
1106        assert!(matches!(
1107            messages[2].msg,
1108            Some(proto::log_msg::Msg::BlueprintActivationCommand(..))
1109        ));
1110
1111        let options = [
1112            EncodingOptions {
1113                compression: Compression::Off,
1114                serializer: Serializer::Protobuf,
1115            },
1116            EncodingOptions {
1117                compression: Compression::LZ4,
1118                serializer: Serializer::Protobuf,
1119            },
1120        ];
1121
1122        // make out-of-order messages
1123        let mut out_of_order_messages = vec![messages[1].clone(), messages[2].clone()];
1124        out_of_order_messages.extend(messages);
1125
1126        for options in options {
1127            let mut file = vec![];
1128
1129            let mut encoder = Encoder::new(rrd_version, options, &mut file).unwrap();
1130            for message in out_of_order_messages.clone() {
1131                encoder
1132                    .append_proto(message)
1133                    .expect("encoding should succeed");
1134            }
1135            drop(encoder);
1136
1137            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1138                .map(Result::unwrap)
1139                .collect();
1140            assert_eq!(decoded_messages.len(), orig_message_count);
1141        }
1142    }
1143
1144    /// Test that non-legacy message streams do not rely on the `SetStoreInfo` message to arrive first.
1145    #[test]
1146    fn test_decode_out_of_order() {
1147        let rrd_version = CrateVersion::LOCAL;
1148
1149        let messages = fake_log_messages();
1150
1151        // ensure the test data is as we expect
1152        let orig_message_count = messages.len();
1153        assert_eq!(orig_message_count, 3);
1154        assert!(matches!(messages[0], LogMsg::SetStoreInfo { .. }));
1155        assert!(matches!(messages[1], LogMsg::ArrowMsg { .. }));
1156        assert!(matches!(
1157            messages[2],
1158            LogMsg::BlueprintActivationCommand { .. }
1159        ));
1160
1161        let options = [
1162            EncodingOptions {
1163                compression: Compression::Off,
1164                serializer: Serializer::Protobuf,
1165            },
1166            EncodingOptions {
1167                compression: Compression::LZ4,
1168                serializer: Serializer::Protobuf,
1169            },
1170        ];
1171
1172        // make out-of-order messages
1173        let mut out_of_order_messages = vec![messages[1].clone(), messages[2].clone()];
1174        out_of_order_messages.extend(messages);
1175
1176        for options in options {
1177            let mut file = vec![];
1178            crate::Encoder::encode_into(
1179                rrd_version,
1180                options,
1181                out_of_order_messages.iter().map(Ok),
1182                &mut file,
1183            )
1184            .unwrap();
1185
1186            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1187                .map(Result::unwrap)
1188                .collect();
1189            similar_asserts::assert_eq!(decoded_messages, out_of_order_messages);
1190        }
1191    }
1192
1193    #[test]
1194    fn test_concatenated_streams() {
1195        let options = [
1196            EncodingOptions {
1197                compression: Compression::Off,
1198                serializer: Serializer::Protobuf,
1199            },
1200            EncodingOptions {
1201                compression: Compression::LZ4,
1202                serializer: Serializer::Protobuf,
1203            },
1204        ];
1205
1206        for options in options {
1207            let mut data = vec![];
1208
1209            // write "2 files" i.e. 2 streams that end with end-of-stream markers
1210            let messages = fake_log_messages();
1211
1212            // (2 encoders as each encoder writes a file header)
1213            {
1214                let writer = std::io::Cursor::new(&mut data);
1215                let mut encoder1 =
1216                    crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1217                for message in &messages {
1218                    encoder1.append(message).unwrap();
1219                }
1220                encoder1.finish().unwrap();
1221            }
1222
1223            let written = data.len() as u64;
1224
1225            {
1226                let mut writer = std::io::Cursor::new(&mut data);
1227                writer.set_position(written);
1228                let mut encoder2 =
1229                    crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1230                for message in &messages {
1231                    encoder2.append(message).unwrap();
1232                }
1233                encoder2.finish().unwrap();
1234            }
1235
1236            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(data.as_slice())
1237                .map(Result::unwrap)
1238                .collect();
1239            similar_asserts::assert_eq!(decoded_messages, [messages.clone(), messages].concat());
1240        }
1241
1242        // Same thing, but this time without EOS markers.
1243        for options in options {
1244            let mut data = vec![];
1245
1246            // write "2 files" i.e. 2 streams that do not end with end-of-stream markers
1247            let messages = fake_log_messages();
1248
1249            // (2 encoders as each encoder writes a file header)
1250            {
1251                let writer = std::io::Cursor::new(&mut data);
1252                let mut encoder1 =
1253                    crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1254                for message in &messages {
1255                    encoder1.append(message).unwrap();
1256                }
1257
1258                // Intentionally leak it so we don't include the EOS marker on drop.
1259                #[expect(clippy::mem_forget)]
1260                std::mem::forget(encoder1);
1261            }
1262
1263            let written = data.len() as u64;
1264
1265            {
1266                let mut writer = std::io::Cursor::new(&mut data);
1267                writer.set_position(written);
1268                let mut encoder2 =
1269                    crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1270                for message in &messages {
1271                    encoder2.append(message).unwrap();
1272                }
1273
1274                // Intentionally leak it so we don't include the EOS marker on drop.
1275                #[expect(clippy::mem_forget)]
1276                std::mem::forget(encoder2);
1277            }
1278
1279            let decoded_messages: Vec<_> = DecoderApp::decode_lazy(data.as_slice())
1280                .map(Result::unwrap)
1281                .collect();
1282            assert_eq!(messages.len() * 2, decoded_messages.len());
1283            similar_asserts::assert_eq!(decoded_messages, [messages.clone(), messages].concat());
1284        }
1285    }
1286}