1pub mod stream;
4
5#[cfg(feature = "decoder")]
6pub mod streaming;
7
8use std::io::BufRead as _;
9use std::io::Read as _;
10
11use re_build_info::CrateVersion;
12use re_log_types::LogMsg;
13
14use crate::FileHeader;
15use crate::OLD_RRD_HEADERS;
16use crate::codec;
17use crate::codec::file::decoder;
18use crate::{EncodingOptions, Serializer};
19
20fn warn_on_version_mismatch(encoded_version: [u8; 4]) -> Result<(), DecodeError> {
23 let encoded_version = if encoded_version == [0, 0, 0, 0] {
25 CrateVersion::new(0, 2, 0)
26 } else {
27 CrateVersion::from_bytes(encoded_version)
28 };
29
30 if encoded_version.major == 0 && encoded_version.minor < 23 {
31 Err(DecodeError::IncompatibleRerunVersion {
33 file: encoded_version,
34 local: CrateVersion::LOCAL,
35 })
36 } else if encoded_version <= CrateVersion::LOCAL {
37 Ok(())
39 } else {
40 re_log::warn_once!(
41 "Found data stream with Rerun version {encoded_version} which is newer than the local Rerun version ({}). This file may contain data that is not compatible with this version of Rerun. Consider updating Rerun.",
42 CrateVersion::LOCAL
43 );
44 Ok(())
45 }
46}
47
48#[derive(thiserror::Error, Debug)]
52pub enum DecodeError {
53 #[error("Not an .rrd file")]
54 NotAnRrd,
55
56 #[error("Data was from an old, incompatible Rerun version")]
57 OldRrdVersion,
58
59 #[error(
60 "Data from Rerun version {file}, which is incompatible with the local Rerun version {local}"
61 )]
62 IncompatibleRerunVersion {
63 file: CrateVersion,
64 local: CrateVersion,
65 },
66
67 #[error("Failed to decode the options: {0}")]
68 Options(#[from] crate::OptionsError),
69
70 #[error("Failed to read: {0}")]
71 Read(#[from] std::io::Error),
72
73 #[error("lz4 error: {0}")]
74 Lz4(#[from] lz4_flex::block::DecompressError),
75
76 #[error("Protobuf error: {0}")]
77 Protobuf(#[from] re_protos::external::prost::DecodeError),
78
79 #[error("Could not convert type from protobuf: {0}")]
80 TypeConversion(#[from] re_protos::TypeConversionError),
81
82 #[error("Sorbet error: {0}")]
83 SorbetError(#[from] re_sorbet::SorbetError),
84
85 #[error("Failed to read chunk: {0}")]
86 Chunk(#[from] re_chunk::ChunkError),
87
88 #[error("Arrow error: {0}")]
89 Arrow(#[from] arrow::error::ArrowError),
90
91 #[error("Codec error: {0}")]
92 Codec(#[from] codec::CodecError),
93}
94
95pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
98 re_tracing::profile_function!();
99 let decoder = Decoder::new(std::io::Cursor::new(bytes))?;
100 let mut msgs = vec![];
101 for msg in decoder {
102 msgs.push(msg?);
103 }
104 Ok(msgs)
105}
106
107pub fn read_options(
111 reader: &mut impl std::io::Read,
112) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
113 let mut data = [0_u8; FileHeader::SIZE];
114 reader.read_exact(&mut data).map_err(DecodeError::Read)?;
115
116 options_from_bytes(&data)
117}
118
119pub async fn read_options_async(
121 reader: &mut (impl tokio::io::AsyncRead + Unpin),
122) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
123 let mut data = [0_u8; FileHeader::SIZE];
124
125 use tokio::io::AsyncReadExt as _;
126 reader
127 .read_exact(&mut data)
128 .await
129 .map_err(DecodeError::Read)?;
130
131 options_from_bytes(&data)
132}
133
134pub fn options_from_bytes(bytes: &[u8]) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
135 let mut read = std::io::Cursor::new(bytes);
136
137 let FileHeader {
138 magic,
139 version,
140 options,
141 } = FileHeader::decode(&mut read)?;
142
143 if OLD_RRD_HEADERS.contains(&magic) {
144 return Err(DecodeError::OldRrdVersion);
145 } else if &magic != crate::RRD_HEADER {
146 return Err(DecodeError::NotAnRrd);
147 }
148
149 warn_on_version_mismatch(version)?;
150
151 match options.serializer {
152 Serializer::Protobuf => {}
153 }
154
155 Ok((CrateVersion::from_bytes(version), options))
156}
157
158enum Reader<R: std::io::Read> {
159 Raw(R),
160 Buffered(std::io::BufReader<R>),
161}
162
163impl<R: std::io::Read> std::io::Read for Reader<R> {
164 #[inline]
165 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
166 match self {
167 Self::Raw(read) => read.read(buf),
168 Self::Buffered(read) => read.read(buf),
169 }
170 }
171}
172
173pub struct Decoder<R: std::io::Read> {
174 version: CrateVersion,
175 options: EncodingOptions,
176 read: Reader<R>,
177
178 size_bytes: u64,
180}
181
182impl<R: std::io::Read> Decoder<R> {
183 pub fn new(mut read: R) -> Result<Self, DecodeError> {
194 re_tracing::profile_function!();
195
196 let mut data = [0_u8; FileHeader::SIZE];
197 read.read_exact(&mut data).map_err(DecodeError::Read)?;
198
199 let (version, options) = options_from_bytes(&data)?;
200
201 Ok(Self {
202 version,
203 options,
204 read: Reader::Raw(read),
205 size_bytes: FileHeader::SIZE as _,
206 })
207 }
208
209 pub fn new_with_options(options: EncodingOptions, version: CrateVersion, read: R) -> Self {
210 Self {
211 version,
212 options,
213 read: Reader::Raw(read),
214 size_bytes: FileHeader::SIZE as _,
215 }
216 }
217
218 pub fn new_concatenated(mut read: std::io::BufReader<R>) -> Result<Self, DecodeError> {
235 re_tracing::profile_function!();
236
237 let mut data = [0_u8; FileHeader::SIZE];
238 read.read_exact(&mut data).map_err(DecodeError::Read)?;
239
240 let (version, options) = options_from_bytes(&data)?;
241
242 Ok(Self {
243 version,
244 options,
245 read: Reader::Buffered(read),
246 size_bytes: FileHeader::SIZE as _,
247 })
248 }
249
250 #[inline]
252 pub fn version(&self) -> CrateVersion {
253 self.version
254 }
255
256 #[inline]
259 pub fn size_bytes(&self) -> u64 {
260 self.size_bytes
261 }
262
263 fn next<F, T>(&mut self, mut decoder: F) -> Option<Result<T, DecodeError>>
265 where
266 F: FnMut(&mut Reader<R>) -> Result<(u64, Option<T>), DecodeError>,
267 {
268 re_tracing::profile_function!();
269
270 if self.peek_file_header() {
271 let mut data = [0_u8; FileHeader::SIZE];
275 if let Err(err) = self.read.read_exact(&mut data).map_err(DecodeError::Read) {
276 return Some(Err(err));
277 }
278
279 let (version, options) = match options_from_bytes(&data) {
280 Ok(opts) => opts,
281 Err(err) => return Some(Err(err)),
282 };
283
284 self.version = CrateVersion::max(self.version, version);
285 self.options = options;
286 self.size_bytes += FileHeader::SIZE as u64;
287 }
288
289 let msg = match self.options.serializer {
290 Serializer::Protobuf => match decoder(&mut self.read) {
291 Ok((read_bytes, msg)) => {
292 self.size_bytes += read_bytes;
293 msg
294 }
295
296 Err(err) => match err {
297 DecodeError::Read(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
298 return None;
299 }
300 _ => return Some(Err(err)),
301 },
302 },
303 };
304
305 let Some(msg) = msg else {
306 if self.peek_file_header() {
308 re_log::debug!(
309 "Reached end of stream, but it seems we have a concatenated file, continuing"
310 );
311 return self.next(decoder);
312 }
313
314 re_log::trace!("Reached end of stream, iterator complete");
315 return None;
316 };
317
318 Some(Ok(msg))
319 }
320
321 fn peek_file_header(&mut self) -> bool {
327 match &mut self.read {
328 Reader::Raw(_) => false,
329 Reader::Buffered(read) => {
330 if read.fill_buf().map_err(DecodeError::Read).is_err() {
331 return false;
332 }
333
334 let mut read = std::io::Cursor::new(read.buffer());
335 if FileHeader::decode(&mut read).is_err() {
336 return false;
337 }
338
339 true
340 }
341 }
342 }
343
344 pub fn into_raw_iter(self) -> RawIterator<R> {
346 RawIterator { decoder: self }
347 }
348}
349
350pub struct RawIterator<R: std::io::Read> {
354 decoder: Decoder<R>,
355}
356
357impl<R: std::io::Read> RawIterator<R> {
358 #[inline]
362 pub fn size_bytes(&self) -> u64 {
363 self.decoder.size_bytes
364 }
365}
366
367impl<R: std::io::Read> Iterator for Decoder<R> {
368 type Item = Result<LogMsg, DecodeError>;
369
370 fn next(&mut self) -> Option<Self::Item> {
371 self.next(decoder::decode_to_app)
372 }
373}
374
375impl<R: std::io::Read> Iterator for RawIterator<R> {
376 type Item = Result<re_protos::log_msg::v1alpha1::log_msg::Msg, DecodeError>;
377
378 fn next(&mut self) -> Option<Self::Item> {
379 self.decoder.next(decoder::decode_to_transport)
380 }
381}
382
383#[cfg(all(test, feature = "decoder", feature = "encoder"))]
386mod tests {
387 #![allow(clippy::unwrap_used)] use crate::Compression;
390
391 use super::*;
392 use re_build_info::CrateVersion;
393 use re_chunk::RowId;
394 use re_log_types::{ApplicationId, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource};
395
396 pub fn fake_log_messages() -> Vec<LogMsg> {
397 let store_id = StoreId::random(StoreKind::Blueprint);
398
399 let arrow_msg = re_chunk::Chunk::builder("test_entity")
400 .with_archetype(
401 re_chunk::RowId::new(),
402 re_log_types::TimePoint::default().with(
403 re_log_types::Timeline::new_sequence("blueprint"),
404 re_log_types::TimeInt::from_millis(re_log_types::NonMinI64::MIN),
405 ),
406 &re_types::blueprint::archetypes::Background::new(
407 re_types::blueprint::components::BackgroundKind::SolidColor,
408 )
409 .with_color([255, 0, 0]),
410 )
411 .build()
412 .unwrap()
413 .to_arrow_msg()
414 .unwrap();
415
416 vec![
417 LogMsg::SetStoreInfo(SetStoreInfo {
418 row_id: *RowId::new(),
419 info: StoreInfo {
420 application_id: ApplicationId("test".to_owned()),
421 store_id: store_id.clone(),
422 cloned_from: None,
423 store_source: StoreSource::RustSdk {
424 rustc_version: String::new(),
425 llvm_version: String::new(),
426 },
427 store_version: Some(CrateVersion::LOCAL),
428 },
429 }),
430 LogMsg::ArrowMsg(store_id.clone(), arrow_msg),
431 LogMsg::BlueprintActivationCommand(re_log_types::BlueprintActivationCommand {
432 blueprint_id: store_id,
433 make_active: true,
434 make_default: true,
435 }),
436 ]
437 }
438
439 #[test]
440 fn test_encode_decode() {
441 let rrd_version = CrateVersion::LOCAL;
442
443 let messages = fake_log_messages();
444
445 let options = [
446 EncodingOptions {
447 compression: Compression::Off,
448 serializer: Serializer::Protobuf,
449 },
450 EncodingOptions {
451 compression: Compression::LZ4,
452 serializer: Serializer::Protobuf,
453 },
454 ];
455
456 for options in options {
457 let mut file = vec![];
458 crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut file)
459 .unwrap();
460
461 let decoded_messages = Decoder::new(&mut file.as_slice())
462 .unwrap()
463 .collect::<Result<Vec<LogMsg>, DecodeError>>()
464 .unwrap();
465
466 similar_asserts::assert_eq!(decoded_messages, messages);
467 }
468 }
469
470 #[test]
471 fn test_concatenated_streams() {
472 let options = [
473 EncodingOptions {
474 compression: Compression::Off,
475 serializer: Serializer::Protobuf,
476 },
477 EncodingOptions {
478 compression: Compression::LZ4,
479 serializer: Serializer::Protobuf,
480 },
481 ];
482
483 for options in options {
484 let mut data = vec![];
485
486 let messages = fake_log_messages();
488
489 let writer = std::io::Cursor::new(&mut data);
491 let mut encoder1 =
492 crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
493 for message in &messages {
494 encoder1.append(message).unwrap();
495 }
496 encoder1.finish().unwrap();
497
498 let written = data.len() as u64;
499 let mut writer = std::io::Cursor::new(&mut data);
500 writer.set_position(written);
501 let mut encoder2 =
502 crate::encoder::Encoder::new(CrateVersion::LOCAL, options, writer).unwrap();
503 for message in &messages {
504 encoder2.append(message).unwrap();
505 }
506 encoder2.finish().unwrap();
507
508 let decoder =
509 Decoder::new_concatenated(std::io::BufReader::new(data.as_slice())).unwrap();
510
511 let decoded_messages = decoder.into_iter().collect::<Result<Vec<_>, _>>().unwrap();
512
513 similar_asserts::assert_eq!(decoded_messages, [messages.clone(), messages].concat());
514 }
515 }
516}