1use std::collections::VecDeque;
2use std::io::Cursor;
3use std::io::Read as _;
4
5use re_build_info::CrateVersion;
6
7use crate::codec::Serializer;
8use crate::codec::file::EncodingOptions;
9use crate::codec::file::FileHeader;
10use crate::decoder::{ApplicationIdInjector, CachingApplicationIdInjector, DecodeError};
11
12pub trait FileEncoded: Sized {
19 fn decode(
20 app_id_injector: &mut impl ApplicationIdInjector,
21 message_kind: crate::codec::file::MessageKind,
22 buf: &[u8],
23 ) -> Result<Option<Self>, DecodeError>;
24
25 fn propagate_version(&mut self, version: Option<CrateVersion>) {
26 _ = self;
27 _ = version;
28 }
29}
30
31impl FileEncoded for re_log_types::LogMsg {
32 fn decode(
33 app_id_injector: &mut impl ApplicationIdInjector,
34 message_kind: crate::codec::file::MessageKind,
35 buf: &[u8],
36 ) -> Result<Option<Self>, DecodeError> {
37 crate::codec::file::decoder::decode_bytes_to_app(app_id_injector, message_kind, buf)
38 }
39
40 fn propagate_version(&mut self, version: Option<CrateVersion>) {
41 if let Self::SetStoreInfo(msg) = self {
42 msg.info.store_version = version;
45 }
46 }
47}
48
49impl FileEncoded for re_protos::log_msg::v1alpha1::log_msg::Msg {
50 fn decode(
51 _app_id_injector: &mut impl ApplicationIdInjector,
52 message_kind: crate::codec::file::MessageKind,
53 buf: &[u8],
54 ) -> Result<Option<Self>, DecodeError> {
55 crate::codec::file::decoder::decode_bytes_to_transport(message_kind, buf)
56 }
57}
58
59pub type DecoderTransport = Decoder<re_protos::log_msg::v1alpha1::log_msg::Msg>;
66
67pub type DecoderApp = Decoder<re_log_types::LogMsg>;
72
73pub struct Decoder<T: FileEncoded> {
81 version: Option<CrateVersion>,
85
86 options: EncodingOptions,
87
88 byte_chunks: ByteChunkBuffer,
90
91 state: State,
93
94 app_id_cache: CachingApplicationIdInjector,
96
97 _decodable: std::marker::PhantomData<T>,
98}
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
111enum State {
112 StreamHeader,
120
121 MessageHeader,
123
124 Message(crate::codec::file::MessageHeader),
128
129 Aborted,
131}
132
133impl<T: FileEncoded> Decoder<T> {
134 pub fn decode_lazy<R: std::io::BufRead>(reader: R) -> DecoderIterator<T, R> {
146 let wait_for_eos = false;
147 Self::decode_lazy_with_opts(reader, wait_for_eos)
148 }
149
150 pub fn decode_lazy_with_opts<R: std::io::BufRead>(
158 reader: R,
159 wait_for_eos: bool,
160 ) -> DecoderIterator<T, R> {
161 let decoder = Self::new();
162 DecoderIterator {
163 decoder,
164 reader,
165 wait_for_eos,
166 first_msg: None,
167 }
168 }
169
170 pub fn decode_eager<R: std::io::BufRead>(
183 reader: R,
184 ) -> Result<DecoderIterator<T, R>, DecodeError> {
185 let wait_for_eos = false;
186 Self::decode_eager_with_opts(reader, wait_for_eos)
187 }
188
189 pub fn decode_eager_with_opts<R: std::io::BufRead>(
197 reader: R,
198 wait_for_eos: bool,
199 ) -> Result<DecoderIterator<T, R>, DecodeError> {
200 let decoder = Self::new();
201 let mut it = DecoderIterator {
202 decoder,
203 reader,
204 wait_for_eos,
205 first_msg: None,
206 };
207
208 it.first_msg = it.next().transpose()?;
209
210 Ok(it)
211 }
212}
213
214impl<T: FileEncoded> Decoder<T> {
215 #[expect(clippy::new_without_default)]
216 pub fn new() -> Self {
217 Self {
218 version: None,
219 options: EncodingOptions::PROTOBUF_UNCOMPRESSED,
221 byte_chunks: ByteChunkBuffer::new(),
222 state: State::StreamHeader,
223 app_id_cache: CachingApplicationIdInjector::default(),
224 _decodable: std::marker::PhantomData::<T>,
225 }
226 }
227
228 pub fn push_byte_chunk(&mut self, byte_chunk: Vec<u8>) {
230 self.byte_chunks.push(byte_chunk);
231 }
232
233 pub fn try_read(&mut self) -> Result<Option<T>, DecodeError> {
236 loop {
238 let result = self.try_read_impl();
239 if let Err(DecodeError::StoreIdMissingApplicationId {
240 store_kind,
241 recording_id,
242 }) = result
243 {
244 re_log::warn_once!(
245 "Dropping message without application id which arrived before `SetStoreInfo` \
246 (kind: {store_kind}, recording id: {recording_id}."
247 );
248 } else {
249 return result;
250 }
251 }
252 }
253
254 fn try_read_impl(&mut self) -> Result<Option<T>, DecodeError> {
256 match self.state {
257 State::StreamHeader => {
258 let is_first_header = self.byte_chunks.num_read() == 0;
259 let position = self.byte_chunks.num_read();
260 if let Some(header) = self.byte_chunks.try_read(FileHeader::SIZE) {
261 re_log::trace!(?header, "Decoding StreamHeader");
262
263 let (version, options) = match FileHeader::options_from_bytes(header) {
265 Ok(ok) => ok,
266 Err(err) => {
267 if is_first_header {
269 return Err(err);
270 } else {
271 re_log::error!(
272 is_first_header,
273 position,
274 "Trailing bytes in rrd stream: {header:?} ({err})"
275 );
276 self.state = State::Aborted;
277 return Ok(None);
278 }
279 }
280 };
281
282 re_log::trace!(
283 version = version.to_string(),
284 ?options,
285 "Found Stream Header"
286 );
287
288 self.version = Some(version);
289 self.options = options;
290
291 match self.options.serializer {
292 Serializer::Protobuf => self.state = State::MessageHeader,
293 }
294
295 return self.try_read();
298 }
299 }
300
301 State::MessageHeader => {
302 let mut peeked = [0u8; crate::codec::file::MessageHeader::SIZE_BYTES];
303 if self.byte_chunks.try_peek(&mut peeked) == peeked.len() {
304 let header = match crate::codec::file::MessageHeader::from_bytes(&peeked) {
305 Ok(header) => header,
306
307 Err(DecodeError::Codec(crate::codec::CodecError::UnknownMessageHeader)) => {
308 self.state = State::StreamHeader;
313 return self.try_read();
314 }
315
316 err @ Err(_) => err?,
317 };
318
319 self.byte_chunks
320 .try_read(crate::codec::file::MessageHeader::SIZE_BYTES)
321 .expect("reading cannot fail if peeking worked");
322
323 re_log::trace!(?header, "MessageHeader");
324
325 self.state = State::Message(header);
326 return self.try_read();
329 }
330 }
331
332 State::Message(header) => {
333 if let Some(bytes) = self.byte_chunks.try_read(header.len as usize) {
334 re_log::trace!(?header, "Read message");
335
336 let message = match T::decode(&mut self.app_id_cache, header.kind, bytes) {
337 Ok(msg) => msg,
338 Err(err) => {
339 self.state = State::MessageHeader;
343 return Err(err);
344 }
345 };
346
347 if let Some(mut message) = message {
348 re_log::trace!("Decoded new message");
349
350 message.propagate_version(self.version);
351 self.state = State::MessageHeader;
352 return Ok(Some(message));
353 } else {
354 re_log::trace!("End of stream - expecting a new Streamheader");
355
356 self.state = State::StreamHeader;
359 return self.try_read();
360 }
361 }
362 }
363
364 State::Aborted => {
365 return Ok(None);
366 }
367 }
368
369 Ok(None)
370 }
371}
372
373pub struct DecoderIterator<T: FileEncoded, R: std::io::BufRead> {
377 decoder: Decoder<T>,
378 reader: R,
379
380 wait_for_eos: bool,
387
388 first_msg: Option<T>,
390}
391
392impl<T: FileEncoded, R: std::io::BufRead> DecoderIterator<T, R> {
393 pub fn num_bytes_processed(&self) -> u64 {
394 self.decoder.byte_chunks.num_read() as _
395 }
396}
397
398impl<T: FileEncoded, R: std::io::BufRead> std::iter::Iterator for DecoderIterator<T, R> {
399 type Item = Result<T, DecodeError>;
400
401 fn next(&mut self) -> Option<Self::Item> {
402 if let Some(first_msg) = self.first_msg.take() {
403 return Some(Ok(first_msg));
405 }
406
407 loop {
408 match self.decoder.try_read() {
409 Ok(Some(msg)) => return Some(Ok(msg)),
410 Ok(None) => {}
411 Err(err) => return Some(Err(err)),
412 }
413
414 match self.reader.fill_buf() {
415 Ok([]) => {
417 match self.decoder.try_read() {
419 Ok(Some(msg)) => return Some(Ok(msg)),
422
423 Ok(None) if !self.wait_for_eos => return None,
425
426 Ok(None) if self.decoder.state == State::StreamHeader => {
429 return None;
430 }
431
432 Ok(None) => {}
435
436 Err(err) => return Some(Err(err)),
437 }
438 }
439
440 Ok(buf) => {
441 self.decoder.push_byte_chunk(buf.to_vec());
442 let len = buf.len(); self.reader.consume(len);
444 }
445
446 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {}
447
448 Err(err) => return Some(Err(err.into())),
449 }
450 }
451 }
452}
453
454type ByteChunk = Cursor<Vec<u8>>;
458
459struct ByteChunkBuffer {
460 queue: VecDeque<ByteChunk>,
462
463 buffer: Vec<u8>,
466
467 buffer_fill: usize,
469
470 num_read: usize,
472}
473
474impl ByteChunkBuffer {
475 fn new() -> Self {
476 Self {
477 queue: VecDeque::with_capacity(16),
478 buffer: Vec::with_capacity(1024),
479 buffer_fill: 0,
480 num_read: 0,
481 }
482 }
483
484 fn push(&mut self, byte_chunk: Vec<u8>) {
485 if byte_chunk.is_empty() {
486 return;
487 }
488 self.queue.push_back(ByteChunk::new(byte_chunk));
489 }
490
491 fn num_read(&self) -> usize {
493 self.num_read
494 }
495
496 fn try_read(&mut self, n: usize) -> Option<&[u8]> {
503 if self.buffer.len() != n {
505 assert_eq!(
506 self.buffer_fill, 0,
507 "`try_read` called with different `n` for incomplete read"
508 );
509 self.buffer.resize(n, 0);
510 self.buffer_fill = 0;
511 }
512
513 while self.buffer_fill != n {
519 if let Some(byte_chunk) = self.queue.front_mut() {
520 let remainder = &mut self.buffer[self.buffer_fill..];
521 self.buffer_fill += byte_chunk
522 .read(remainder)
523 .expect("failed to read from byte chunk");
524 if is_byte_chunk_empty(byte_chunk) {
525 self.queue.pop_front();
526 }
527 } else {
528 break;
529 }
530 }
531
532 if self.buffer_fill == n {
533 self.buffer_fill = 0;
537 self.num_read += n;
538 Some(&self.buffer[..])
539 } else {
540 None
541 }
542 }
543
544 fn try_peek(&self, out: &mut [u8]) -> usize {
549 use std::io::Write as _;
550
551 let target_len = out.len();
552
553 let mut out = std::io::Cursor::new(out);
554 let mut n = 0;
555
556 if target_len == self.buffer.len() {
559 n += out
560 .write(&self.buffer[..self.buffer_fill])
561 .expect("memcpy, cannot fail");
562 }
563
564 for byte_chunk in &self.queue {
565 if n == target_len {
566 return n;
567 }
568
569 let pos = byte_chunk.position() as usize;
570 n += out
571 .write(&byte_chunk.get_ref()[pos..])
572 .expect("memcpy, cannot fail");
573 }
574
575 n
576 }
577}
578
579fn is_byte_chunk_empty(byte_chunk: &ByteChunk) -> bool {
580 byte_chunk.position() >= byte_chunk.get_ref().len() as u64
581}
582
583#[cfg(test)]
586mod tests {
587 use re_chunk::RowId;
588 use re_log_types::{LogMsg, SetStoreInfo, StoreInfo};
589
590 use crate::Encoder;
591 use crate::EncodingOptions;
592
593 use super::*;
594
595 fn fake_log_msg() -> LogMsg {
596 LogMsg::SetStoreInfo(SetStoreInfo {
597 row_id: *RowId::ZERO,
598 info: StoreInfo {
599 store_version: Some(CrateVersion::LOCAL), ..StoreInfo::testing()
601 },
602 })
603 }
604
605 fn test_data(options: EncodingOptions, n: usize) -> (Vec<LogMsg>, Vec<u8>) {
606 let messages: Vec<_> = (0..n).map(|_| fake_log_msg()).collect();
607
608 let mut data = Vec::new();
609 Encoder::encode_into(
610 CrateVersion::LOCAL,
611 options,
612 messages.clone().into_iter().map(Ok),
613 &mut data,
614 )
615 .unwrap();
616
617 (messages, data)
618 }
619
620 macro_rules! assert_message_ok {
621 ($message:expr) => {{
622 match $message {
623 Ok(Some(message)) => {
624 assert_eq!(&fake_log_msg(), &message);
625 message
626 }
627 Ok(None) => {
628 panic!("failed to read message: message could not be read in full");
629 }
630 Err(err) => {
631 panic!("failed to read message: {err}");
632 }
633 }
634 }};
635 }
636
637 macro_rules! assert_message_incomplete {
638 ($message:expr) => {{
639 match $message {
640 Ok(None) => {}
641 Ok(Some(message)) => {
642 panic!("expected message to be incomplete, instead received: {message:?}");
643 }
644 Err(err) => {
645 panic!("failed to read message: {err}");
646 }
647 }
648 }};
649 }
650
651 #[test]
652 fn stream_whole_chunks_uncompressed_protobuf() {
653 let (input, data) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
654
655 let mut decoder = DecoderApp::new();
656
657 assert_message_incomplete!(decoder.try_read());
658
659 decoder.push_byte_chunk(data);
660
661 let decoded_messages: Vec<_> = (0..16)
662 .map(|_| assert_message_ok!(decoder.try_read()))
663 .collect();
664
665 assert_eq!(input, decoded_messages);
666 }
667
668 #[test]
669 fn stream_byte_chunks_uncompressed_protobuf() {
670 let (input, data) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
671
672 let mut decoder = DecoderApp::new();
673
674 assert_message_incomplete!(decoder.try_read());
675
676 for byte_chunk in data.chunks(1) {
677 decoder.push_byte_chunk(byte_chunk.to_vec());
678 }
679
680 let decoded_messages: Vec<_> = (0..16)
681 .map(|_| assert_message_ok!(decoder.try_read()))
682 .collect();
683
684 assert_eq!(input, decoded_messages);
685 }
686
687 #[test]
688 fn two_concatenated_streams_protobuf() {
689 let (input1, data1) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
690 let (input2, data2) = test_data(EncodingOptions::PROTOBUF_UNCOMPRESSED, 16);
691 let input = input1.into_iter().chain(input2).collect::<Vec<_>>();
692
693 let mut decoder = DecoderApp::new();
694
695 assert_message_incomplete!(decoder.try_read());
696
697 decoder.push_byte_chunk(data1);
698 decoder.push_byte_chunk(data2);
699
700 let decoded_messages: Vec<_> = (0..32)
701 .map(|_| assert_message_ok!(decoder.try_read()))
702 .collect();
703
704 assert_eq!(input, decoded_messages);
705 }
706
707 #[test]
708 fn stream_whole_chunks_compressed_protobuf() {
709 let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
710
711 let mut decoder = DecoderApp::new();
712
713 assert_message_incomplete!(decoder.try_read());
714
715 decoder.push_byte_chunk(data);
716
717 let decoded_messages: Vec<_> = (0..16)
718 .map(|_| assert_message_ok!(decoder.try_read()))
719 .collect();
720
721 assert_eq!(input, decoded_messages);
722 }
723
724 #[test]
725 fn stream_byte_chunks_compressed_protobuf() {
726 let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
727
728 let mut decoder = DecoderApp::new();
729
730 assert_message_incomplete!(decoder.try_read());
731
732 for byte_chunk in data.chunks(1) {
733 decoder.push_byte_chunk(byte_chunk.to_vec());
734 }
735
736 let decoded_messages: Vec<_> = (0..16)
737 .map(|_| assert_message_ok!(decoder.try_read()))
738 .collect();
739
740 assert_eq!(input, decoded_messages);
741 }
742
743 #[test]
744 fn stream_3x16_chunks_protobuf() {
745 let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
746
747 let mut decoder = DecoderApp::new();
748 let mut decoded_messages = vec![];
749
750 let mut byte_chunks = data.chunks(16).peekable();
753 while byte_chunks.peek().is_some() {
754 for _ in 0..3 {
755 if let Some(byte_chunk) = byte_chunks.next() {
756 decoder.push_byte_chunk(byte_chunk.to_vec());
757 } else {
758 break;
759 }
760 }
761
762 if let Some(message) = decoder.try_read().unwrap() {
763 decoded_messages.push(message);
764 }
765 }
766
767 assert_eq!(input, decoded_messages);
768 }
769
770 #[test]
771 fn stream_irregular_chunks_protobuf() {
772 let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
775 let mut data = Cursor::new(data);
776
777 let mut decoder = DecoderApp::new();
778 let mut decoded_messages = vec![];
779
780 let pattern = [0, 3, 4, 70, 31];
785 let mut pattern_index = 0;
786 let mut temp = [0_u8; 71];
787
788 while data.position() < data.get_ref().len() as u64 {
789 for _ in 0..2 {
790 let n = data.read(&mut temp[..pattern[pattern_index]]).unwrap();
791 pattern_index = (pattern_index + 1) % pattern.len();
792 decoder.push_byte_chunk(temp[..n].to_vec());
793 }
794
795 while let Some(message) = decoder.try_read().unwrap() {
796 decoded_messages.push(message);
797 }
798 }
799
800 assert_eq!(input, decoded_messages);
801 }
802
803 #[test]
804 fn chunk_buffer_read_single_chunk() {
805 let mut buffer = ByteChunkBuffer::new();
808
809 let data = &[0, 1, 2, 3, 4];
810 assert_eq!(None, buffer.try_read(1));
811 buffer.push(data.to_vec());
812 assert_eq!(Some(&data[..3]), buffer.try_read(3));
813 assert_eq!(Some(&data[3..]), buffer.try_read(2));
814 assert_eq!(None, buffer.try_read(1));
815 }
816
817 #[test]
818 fn chunk_buffer_read_multi_chunk() {
819 let mut buffer = ByteChunkBuffer::new();
822
823 let byte_chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]];
824
825 assert_eq!(None, buffer.try_read(1));
826 buffer.push(byte_chunks[0].to_vec());
827 assert_eq!(None, buffer.try_read(5));
828 buffer.push(byte_chunks[1].to_vec());
829 assert_eq!(Some(&[0, 1, 2, 3, 4][..]), buffer.try_read(5));
830 assert_eq!(None, buffer.try_read(1));
831 }
832
833 #[test]
834 fn chunk_buffer_read_same_n() {
835 let mut buffer = ByteChunkBuffer::new();
838
839 let data = &[0, 1, 2, 3];
840 buffer.push(data.to_vec());
841 assert_eq!(data, buffer.try_read(4).unwrap());
842 assert_eq!(None, buffer.try_read(4));
843 let data = &[4, 5, 6, 7];
844 buffer.push(data.to_vec());
845 assert_eq!(data, buffer.try_read(4).unwrap());
846 assert_eq!(None, buffer.try_read(4));
847 }
848}
849
850#[cfg(all(test, feature = "decoder", feature = "encoder"))]
852mod tests_legacy {
853 #![expect(clippy::unwrap_used)] use re_build_info::CrateVersion;
856 use re_chunk::RowId;
857 use re_log_types::{LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
858 use re_protos::log_msg::v1alpha1 as proto;
859 use re_protos::log_msg::v1alpha1::LogMsg as LogMsgProto;
860
861 use crate::Encoder;
862 use crate::codec::Compression;
863
864 use super::*;
865
866 fn fake_log_messages() -> Vec<LogMsg> {
867 let store_id = StoreId::random(StoreKind::Blueprint, "test_app");
868
869 let arrow_msg = re_chunk::Chunk::builder("test_entity")
870 .with_archetype(
871 re_chunk::RowId::new(),
872 re_log_types::TimePoint::default().with(
873 re_log_types::Timeline::new_sequence("blueprint"),
874 re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
875 ),
876 &re_types::blueprint::archetypes::Background::new(
877 re_types::blueprint::components::BackgroundKind::SolidColor,
878 )
879 .with_color([255, 0, 0]),
880 )
881 .build()
882 .unwrap()
883 .to_arrow_msg()
884 .unwrap();
885
886 vec![
887 LogMsg::SetStoreInfo(SetStoreInfo {
888 row_id: *RowId::new(),
889 info: StoreInfo::new(
890 store_id.clone(),
891 StoreSource::RustSdk {
892 rustc_version: String::new(),
893 llvm_version: String::new(),
894 },
895 ),
896 }),
897 LogMsg::ArrowMsg(store_id.clone(), arrow_msg),
898 LogMsg::BlueprintActivationCommand(re_log_types::BlueprintActivationCommand {
899 blueprint_id: store_id,
900 make_active: true,
901 make_default: true,
902 }),
903 ]
904 }
905
906 #[expect(deprecated)]
910 fn legacy_fake_log_messages() -> Vec<LogMsgProto> {
911 fake_log_messages()
912 .into_iter()
913 .map(|msg| {
914 crate::protobuf_conversions::log_msg_to_proto(msg, Compression::Off).unwrap()
915 })
916 .map(|mut log_msg| {
917 match &mut log_msg.msg {
918 None => panic!("Unexpected `LogMsg` without payload"),
919
920 Some(proto::log_msg::Msg::SetStoreInfo(set_store_info)) => {
921 if let Some(store_info) = &mut set_store_info.info {
922 let Some(mut store_id) = store_info.store_id.clone() else {
923 panic!("Unexpected missing `StoreId`");
924 };
925
926 assert_eq!(store_info.application_id, None);
928 assert!(store_id.application_id.is_some());
929
930 store_info.application_id = store_id.application_id;
932 store_id.application_id = None;
933 store_info.store_id = Some(store_id);
934 } else {
935 panic!("Unexpected missing `store_info`")
936 }
937 }
938 Some(
939 proto::log_msg::Msg::ArrowMsg(proto::ArrowMsg { store_id, .. })
940 | proto::log_msg::Msg::BlueprintActivationCommand(
941 proto::BlueprintActivationCommand {
942 blueprint_id: store_id,
943 ..
944 },
945 ),
946 ) => {
947 let mut legacy_store_id =
948 store_id.clone().expect("messages should have store ids");
949 assert!(legacy_store_id.application_id.is_some());
950
951 legacy_store_id.application_id = None;
953 *store_id = Some(legacy_store_id);
954 }
955 }
956
957 log_msg
958 })
959 .collect()
960 }
961
962 impl<W: std::io::Write> Encoder<W> {
963 fn encode_into_without_eos(
966 version: CrateVersion,
967 options: EncodingOptions,
968 messages: impl IntoIterator<Item = re_chunk::ChunkResult<impl std::borrow::Borrow<LogMsg>>>,
969 write: &mut W,
970 ) -> Result<u64, crate::EncodeError> {
971 re_tracing::profile_function!();
972 let mut encoder = Encoder::new(version, options, write)?;
973 let mut size_bytes = 0;
974 for message in messages {
975 size_bytes += encoder.append(message?.borrow())?;
976 }
977
978 {
979 encoder.flush_blocking()?;
980
981 #[expect(clippy::mem_forget)]
983 std::mem::forget(encoder);
984 }
985
986 Ok(size_bytes)
987 }
988 }
989
990 #[test]
991 fn test_encode_decode() {
992 let rrd_version = CrateVersion::LOCAL;
993
994 let messages = fake_log_messages();
995
996 let options = [
997 EncodingOptions {
998 compression: Compression::Off,
999 serializer: Serializer::Protobuf,
1000 },
1001 EncodingOptions {
1002 compression: Compression::LZ4,
1003 serializer: Serializer::Protobuf,
1004 },
1005 ];
1006
1007 for options in options {
1009 let mut file = vec![];
1010 crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut file)
1011 .unwrap();
1012
1013 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1014 .map(Result::unwrap)
1015 .collect();
1016 similar_asserts::assert_eq!(decoded_messages, messages);
1017 }
1018
1019 for options in options {
1021 let mut file = vec![];
1022 crate::Encoder::encode_into(rrd_version, options, messages.iter().map(Ok), &mut file)
1023 .unwrap();
1024
1025 let reader = std::io::BufReader::new(file.as_slice());
1026 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(reader)
1027 .map(Result::unwrap)
1028 .collect();
1029 similar_asserts::assert_eq!(decoded_messages, messages);
1030 }
1031
1032 for options in options {
1034 let mut file = vec![];
1035 crate::Encoder::encode_into_without_eos(
1036 rrd_version,
1037 options,
1038 messages.iter().map(Ok),
1039 &mut file,
1040 )
1041 .unwrap();
1042
1043 let reader = std::io::BufReader::new(file.as_slice());
1044 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(reader)
1045 .map(Result::unwrap)
1046 .collect();
1047 similar_asserts::assert_eq!(decoded_messages, messages);
1048 }
1049 }
1050
1051 #[test]
1053 fn test_decode_legacy() {
1054 let rrd_version = CrateVersion::LOCAL;
1055
1056 let messages = legacy_fake_log_messages();
1057
1058 let options = [
1059 EncodingOptions {
1060 compression: Compression::Off,
1061 serializer: Serializer::Protobuf,
1062 },
1063 EncodingOptions {
1064 compression: Compression::LZ4,
1065 serializer: Serializer::Protobuf,
1066 },
1067 ];
1068
1069 for options in options {
1070 let mut file = vec![];
1071
1072 let mut encoder = Encoder::new(rrd_version, options, &mut file).unwrap();
1073 for message in messages.clone() {
1074 encoder
1075 .append_proto(message)
1076 .expect("encoding should succeed");
1077 }
1078 drop(encoder);
1079
1080 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1081 .map(Result::unwrap)
1082 .collect();
1083 assert_eq!(decoded_messages.len(), messages.len());
1084 }
1085 }
1086
1087 #[test]
1090 fn test_decode_legacy_out_of_order() {
1091 let rrd_version = CrateVersion::LOCAL;
1092
1093 let messages = legacy_fake_log_messages();
1094
1095 let orig_message_count = messages.len();
1097 assert_eq!(orig_message_count, 3);
1098 assert!(matches!(
1099 messages[0].msg,
1100 Some(proto::log_msg::Msg::SetStoreInfo(..))
1101 ));
1102 assert!(matches!(
1103 messages[1].msg,
1104 Some(proto::log_msg::Msg::ArrowMsg(..))
1105 ));
1106 assert!(matches!(
1107 messages[2].msg,
1108 Some(proto::log_msg::Msg::BlueprintActivationCommand(..))
1109 ));
1110
1111 let options = [
1112 EncodingOptions {
1113 compression: Compression::Off,
1114 serializer: Serializer::Protobuf,
1115 },
1116 EncodingOptions {
1117 compression: Compression::LZ4,
1118 serializer: Serializer::Protobuf,
1119 },
1120 ];
1121
1122 let mut out_of_order_messages = vec![messages[1].clone(), messages[2].clone()];
1124 out_of_order_messages.extend(messages);
1125
1126 for options in options {
1127 let mut file = vec![];
1128
1129 let mut encoder = Encoder::new(rrd_version, options, &mut file).unwrap();
1130 for message in out_of_order_messages.clone() {
1131 encoder
1132 .append_proto(message)
1133 .expect("encoding should succeed");
1134 }
1135 drop(encoder);
1136
1137 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1138 .map(Result::unwrap)
1139 .collect();
1140 assert_eq!(decoded_messages.len(), orig_message_count);
1141 }
1142 }
1143
1144 #[test]
1146 fn test_decode_out_of_order() {
1147 let rrd_version = CrateVersion::LOCAL;
1148
1149 let messages = fake_log_messages();
1150
1151 let orig_message_count = messages.len();
1153 assert_eq!(orig_message_count, 3);
1154 assert!(matches!(messages[0], LogMsg::SetStoreInfo { .. }));
1155 assert!(matches!(messages[1], LogMsg::ArrowMsg { .. }));
1156 assert!(matches!(
1157 messages[2],
1158 LogMsg::BlueprintActivationCommand { .. }
1159 ));
1160
1161 let options = [
1162 EncodingOptions {
1163 compression: Compression::Off,
1164 serializer: Serializer::Protobuf,
1165 },
1166 EncodingOptions {
1167 compression: Compression::LZ4,
1168 serializer: Serializer::Protobuf,
1169 },
1170 ];
1171
1172 let mut out_of_order_messages = vec![messages[1].clone(), messages[2].clone()];
1174 out_of_order_messages.extend(messages);
1175
1176 for options in options {
1177 let mut file = vec![];
1178 crate::Encoder::encode_into(
1179 rrd_version,
1180 options,
1181 out_of_order_messages.iter().map(Ok),
1182 &mut file,
1183 )
1184 .unwrap();
1185
1186 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(file.as_slice())
1187 .map(Result::unwrap)
1188 .collect();
1189 similar_asserts::assert_eq!(decoded_messages, out_of_order_messages);
1190 }
1191 }
1192
1193 #[test]
1194 fn test_concatenated_streams() {
1195 let options = [
1196 EncodingOptions {
1197 compression: Compression::Off,
1198 serializer: Serializer::Protobuf,
1199 },
1200 EncodingOptions {
1201 compression: Compression::LZ4,
1202 serializer: Serializer::Protobuf,
1203 },
1204 ];
1205
1206 for options in options {
1207 let mut data = vec![];
1208
1209 let messages = fake_log_messages();
1211
1212 {
1214 let writer = std::io::Cursor::new(&mut data);
1215 let mut encoder1 =
1216 crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1217 for message in &messages {
1218 encoder1.append(message).unwrap();
1219 }
1220 encoder1.finish().unwrap();
1221 }
1222
1223 let written = data.len() as u64;
1224
1225 {
1226 let mut writer = std::io::Cursor::new(&mut data);
1227 writer.set_position(written);
1228 let mut encoder2 =
1229 crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1230 for message in &messages {
1231 encoder2.append(message).unwrap();
1232 }
1233 encoder2.finish().unwrap();
1234 }
1235
1236 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(data.as_slice())
1237 .map(Result::unwrap)
1238 .collect();
1239 similar_asserts::assert_eq!(decoded_messages, [messages.clone(), messages].concat());
1240 }
1241
1242 for options in options {
1244 let mut data = vec![];
1245
1246 let messages = fake_log_messages();
1248
1249 {
1251 let writer = std::io::Cursor::new(&mut data);
1252 let mut encoder1 =
1253 crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1254 for message in &messages {
1255 encoder1.append(message).unwrap();
1256 }
1257
1258 #[expect(clippy::mem_forget)]
1260 std::mem::forget(encoder1);
1261 }
1262
1263 let written = data.len() as u64;
1264
1265 {
1266 let mut writer = std::io::Cursor::new(&mut data);
1267 writer.set_position(written);
1268 let mut encoder2 =
1269 crate::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
1270 for message in &messages {
1271 encoder2.append(message).unwrap();
1272 }
1273
1274 #[expect(clippy::mem_forget)]
1276 std::mem::forget(encoder2);
1277 }
1278
1279 let decoded_messages: Vec<_> = DecoderApp::decode_lazy(data.as_slice())
1280 .map(Result::unwrap)
1281 .collect();
1282 assert_eq!(messages.len() * 2, decoded_messages.len());
1283 similar_asserts::assert_eq!(decoded_messages, [messages.clone(), messages].concat());
1284 }
1285 }
1286}