mcap_rs/
read.rs

1//! Read MCAP files
2//!
3//! MCAPs are read from a byte slice instead of a [`Read`] trait object.
4//! This helps us avoid unnecessary copies, since [`Schema`]s and [`Message`]s
5//! can refer directly to their data.
6//!
7//! Consider [memory-mapping](https://docs.rs/memmap/0.7.0/memmap/struct.Mmap.html)
8//! the file - the OS will load (and cache!) it on-demand, without any
9//! further system calls.
10use std::{
11    borrow::Cow,
12    collections::{BTreeMap, HashMap},
13    fmt,
14    io::{self, prelude::*, Cursor},
15    sync::Arc,
16};
17
18use binrw::prelude::*;
19use crc32fast::hash as crc32;
20use enumset::{enum_set, EnumSet, EnumSetType};
21use log::*;
22
23use crate::{
24    io_utils::CountingCrcReader,
25    records::{self, op, Record},
26    Attachment, Channel, McapError, McapResult, Message, Schema, MAGIC,
27};
28
29/// Nonstandard reading options, e.g.,
30/// to be more lenient when trying to recover incomplete/damaged files.
31///
32/// More may be added in future releases.
33#[derive(EnumSetType, Debug)]
34pub enum Options {
35    /// Don't require the MCAP file to end with its magic bytes.
36    IgnoreEndMagic,
37}
38
39/// Scans a mapped MCAP file from start to end, returning each record.
40///
41/// You probably want a [MessageStream] instead - this yields the raw records
42/// from the file without any postprocessing (decompressing chunks, etc.)
43/// and is mostly meant as a building block for higher-level readers.
44pub struct LinearReader<'a> {
45    buf: &'a [u8],
46    malformed: bool,
47}
48
49impl<'a> LinearReader<'a> {
50    /// Create a reader for the given file,
51    /// checking [`MAGIC`] bytes on both ends.
52    pub fn new(buf: &'a [u8]) -> McapResult<Self> {
53        Self::new_with_options(buf, enum_set!())
54    }
55
56    /// Create a reader for the given file with special options.
57    pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
58        if !buf.starts_with(MAGIC)
59            || (!options.contains(Options::IgnoreEndMagic)
60                && (!buf.ends_with(MAGIC) || buf.len() < 2 * MAGIC.len()))
61        {
62            return Err(McapError::BadMagic);
63        }
64        let buf = &buf[MAGIC.len()..];
65        if buf.ends_with(MAGIC) {
66            Ok(Self::sans_magic(&buf[0..buf.len() - MAGIC.len()]))
67        } else {
68            Ok(Self::sans_magic(buf))
69        }
70    }
71
72    /// Like [`new()`](Self::new), but assumes `buf` has the magic bytes sliced off.
73    ///
74    /// Useful for iterating through slices of an MCAP file instead of the whole thing.
75    pub fn sans_magic(buf: &'a [u8]) -> Self {
76        Self {
77            buf,
78            malformed: false,
79        }
80    }
81
82    /// Returns the number of unprocessed bytes
83    /// (sans the file's starting and ending magic)
84    ///
85    /// Used to calculate offsets for the data section et al.
86    fn bytes_remaining(&self) -> usize {
87        self.buf.len()
88    }
89}
90
91impl<'a> Iterator for LinearReader<'a> {
92    type Item = McapResult<records::Record<'a>>;
93
94    fn next(&mut self) -> Option<Self::Item> {
95        if self.buf.is_empty() {
96            return None;
97        }
98
99        // After an unrecoverable error (due to something wonky in the file),
100        // don't keep trying to walk it.
101        if self.malformed {
102            return None;
103        }
104
105        let record = match read_record_from_slice(&mut self.buf) {
106            Ok(k) => k,
107            Err(e) => {
108                self.malformed = true;
109                return Some(Err(e));
110            }
111        };
112
113        Some(Ok(record))
114    }
115}
116
117/// Read a record and advance the slice
118fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult<records::Record<'a>> {
119    if buf.len() < 5 {
120        warn!("Malformed MCAP - not enough space for record + length!");
121        return Err(McapError::UnexpectedEof);
122    }
123
124    let op = read_u8(buf);
125    let len = read_u64(buf);
126
127    if buf.len() < len as usize {
128        warn!(
129            "Malformed MCAP - record with length {len}, but only {} bytes remain",
130            buf.len()
131        );
132        return Err(McapError::UnexpectedEof);
133    }
134
135    let body = &buf[..len as usize];
136    debug!("slice: opcode {op:02X}, length {len}");
137    let record = read_record(op, body)?;
138    trace!("       {:?}", record);
139
140    *buf = &buf[len as usize..];
141    Ok(record)
142}
143
144/// Given a record's opcode and its slice, read it into a [Record]
145fn read_record(op: u8, body: &[u8]) -> McapResult<records::Record<'_>> {
146    macro_rules! record {
147        ($b:ident) => {{
148            let mut cur = Cursor::new($b);
149            let res = cur.read_le()?;
150            assert_eq!($b.len() as u64, cur.position());
151            res
152        }};
153    }
154
155    Ok(match op {
156        op::HEADER => Record::Header(record!(body)),
157        op::FOOTER => Record::Footer(record!(body)),
158        op::SCHEMA => {
159            let mut c = Cursor::new(body);
160            let header: records::SchemaHeader = c.read_le()?;
161            let data = Cow::Borrowed(&body[c.position() as usize..]);
162            if header.data_len != data.len() as u32 {
163                warn!(
164                    "Schema {}'s data length doesn't match the total schema length",
165                    header.name
166                );
167            }
168            Record::Schema { header, data }
169        }
170        op::CHANNEL => Record::Channel(record!(body)),
171        op::MESSAGE => {
172            let mut c = Cursor::new(body);
173            let header = c.read_le()?;
174            let data = Cow::Borrowed(&body[c.position() as usize..]);
175            Record::Message { header, data }
176        }
177        op::CHUNK => {
178            let mut c = Cursor::new(body);
179            let header: records::ChunkHeader = c.read_le()?;
180            let data = &body[c.position() as usize..];
181            if header.compressed_size != data.len() as u64 {
182                warn!("Chunk's compressed length doesn't match its header");
183            }
184            Record::Chunk { header, data }
185        }
186        op::MESSAGE_INDEX => Record::MessageIndex(record!(body)),
187        op::CHUNK_INDEX => Record::ChunkIndex(record!(body)),
188        op::ATTACHMENT => {
189            let mut c = Cursor::new(body);
190            let header: records::AttachmentHeader = c.read_le()?;
191            let data = &body[c.position() as usize..body.len() - 4];
192            if header.data_len != data.len() as u64 {
193                warn!(
194                    "Attachment {}'s data length doesn't match the total schema length",
195                    header.name
196                );
197            }
198            let crc = Cursor::new(&body[body.len() - 4..]).read_le()?;
199
200            // We usually leave CRCs to higher-level readers -
201            // (ChunkReader, read_summary(), etc.) - but
202            //
203            // 1. We can trivially check it here without checking other records,
204            //    decompressing anything, or doing any other non-trivial work
205            //
206            // 2. Since the CRC depends on the serialized header, it doesn't make
207            //    much sense to have users check it.
208            //    (What would they do? lol reserialize the header?)
209            if crc != 0 {
210                let calculated = crc32(&body[..body.len() - 4]);
211                if crc != calculated {
212                    return Err(McapError::BadAttachmentCrc {
213                        saved: crc,
214                        calculated,
215                    });
216                }
217            }
218
219            Record::Attachment { header, data }
220        }
221        op::ATTACHMENT_INDEX => Record::AttachmentIndex(record!(body)),
222        op::STATISTICS => Record::Statistics(record!(body)),
223        op::METADATA => Record::Metadata(record!(body)),
224        op::METADATA_INDEX => Record::MetadataIndex(record!(body)),
225        op::SUMMARY_OFFSET => Record::SummaryOffset(record!(body)),
226        op::END_OF_DATA => Record::EndOfData(record!(body)),
227        opcode => Record::Unknown {
228            opcode,
229            data: Cow::Borrowed(body),
230        },
231    })
232}
233
234enum ChunkDecompressor<'a> {
235    Null(LinearReader<'a>),
236    Compressed(Option<CountingCrcReader<Box<dyn Read + Send + 'a>>>),
237}
238
239/// Streams records out of a [Chunk](Record::Chunk), decompressing as needed.
240pub struct ChunkReader<'a> {
241    header: records::ChunkHeader,
242    decompressor: ChunkDecompressor<'a>,
243}
244
245impl<'a> ChunkReader<'a> {
246    pub fn new(header: records::ChunkHeader, data: &'a [u8]) -> McapResult<Self> {
247        let decompressor = match header.compression.as_str() {
248            "zstd" => ChunkDecompressor::Compressed(Some(CountingCrcReader::new(Box::new(
249                zstd::Decoder::new(data)?,
250            )))),
251            "lz4" => ChunkDecompressor::Compressed(Some(CountingCrcReader::new(Box::new(
252                lz4::Decoder::new(data)?,
253            )))),
254            "" => {
255                if header.uncompressed_size != header.compressed_size {
256                    warn!(
257                        "Chunk is uncompressed, but claims different compress/uncompressed lengths"
258                    );
259                }
260
261                if header.uncompressed_crc != 0 {
262                    let calculated = crc32(data);
263                    if header.uncompressed_crc != calculated {
264                        return Err(McapError::BadChunkCrc {
265                            saved: header.uncompressed_crc,
266                            calculated,
267                        });
268                    }
269                }
270
271                ChunkDecompressor::Null(LinearReader::sans_magic(data))
272            }
273            wat => return Err(McapError::UnsupportedCompression(wat.to_string())),
274        };
275
276        Ok(Self {
277            header,
278            decompressor,
279        })
280    }
281}
282
283impl<'a> Iterator for ChunkReader<'a> {
284    type Item = McapResult<records::Record<'a>>;
285
286    fn next(&mut self) -> Option<Self::Item> {
287        match &mut self.decompressor {
288            ChunkDecompressor::Null(r) => r.next(),
289            ChunkDecompressor::Compressed(stream) => {
290                // If we consumed the stream last time to get the CRC,
291                // or because of an error, we're done.
292                if stream.is_none() {
293                    return None;
294                }
295
296                let s = stream.as_mut().unwrap();
297
298                let record = match read_record_from_chunk_stream(s) {
299                    Ok(k) => k,
300                    Err(e) => {
301                        *stream = None; // Don't try to recover.
302                        return Some(Err(e));
303                    }
304                };
305
306                // If we've read all there is to read...
307                if s.position() >= self.header.uncompressed_size {
308                    // Get the CRC.
309                    let calculated = stream.take().unwrap().finalize();
310
311                    // If the header stored a CRC
312                    // and it doesn't match what we have, complain.
313                    if self.header.uncompressed_crc != 0
314                        && self.header.uncompressed_crc != calculated
315                    {
316                        return Some(Err(McapError::BadChunkCrc {
317                            saved: self.header.uncompressed_crc,
318                            calculated,
319                        }));
320                    }
321                    // All good!
322                }
323
324                Some(Ok(record))
325            }
326        }
327    }
328}
329
330/// Like [read_record_from_slice], but for a decompression stream
331fn read_record_from_chunk_stream<'a, R: Read>(r: &mut R) -> McapResult<records::Record<'a>> {
332    // We can't use binrw because compressions streams aren't seekable.
333    // byteorder time!
334    use byteorder::{ReadBytesExt, LE};
335
336    let op = r.read_u8()?;
337    let len = r.read_u64::<LE>()?;
338
339    debug!("chunk: opcode {op:02X}, length {len}");
340    let record = match op {
341        op::SCHEMA => {
342            let mut record = Vec::new();
343            r.take(len).read_to_end(&mut record)?;
344            if len as usize != record.len() {
345                return Err(McapError::UnexpectedEoc);
346            }
347
348            let mut c = Cursor::new(&record);
349            let header: records::SchemaHeader = c.read_le()?;
350
351            let header_end = c.position();
352
353            // Should we rotate and shrink instead?
354            let data = record.split_off(header_end as usize);
355
356            if header.data_len as usize != data.len() {
357                warn!(
358                    "Schema {}'s data length doesn't match the total schema length",
359                    header.name
360                );
361            }
362            Record::Schema {
363                header,
364                data: Cow::Owned(data),
365            }
366        }
367        op::CHANNEL => {
368            let mut record = Vec::new();
369            r.take(len).read_to_end(&mut record)?;
370            if len as usize != record.len() {
371                return Err(McapError::UnexpectedEoc);
372            }
373
374            let mut c = Cursor::new(&record);
375            let channel: records::Channel = c.read_le()?;
376
377            if c.position() != record.len() as u64 {
378                warn!(
379                    "Channel {}'s length doesn't match its record length",
380                    channel.topic
381                );
382            }
383
384            Record::Channel(channel)
385        }
386        op::MESSAGE => {
387            // Optimization: messages are the mainstay of the file,
388            // so allocate the header and the data separately to avoid having
389            // to split them up or move them around later.
390            // Fortunately, message headers are fixed length.
391            const HEADER_LEN: u64 = 22;
392
393            let mut header_buf = Vec::new();
394            r.take(HEADER_LEN).read_to_end(&mut header_buf)?;
395            if header_buf.len() as u64 != HEADER_LEN {
396                return Err(McapError::UnexpectedEoc);
397            }
398            let header: records::MessageHeader = Cursor::new(header_buf).read_le()?;
399
400            let mut data = Vec::new();
401            r.take(len - HEADER_LEN).read_to_end(&mut data)?;
402            if data.len() as u64 != len - HEADER_LEN {
403                return Err(McapError::UnexpectedEoc);
404            }
405
406            Record::Message {
407                header,
408                data: Cow::Owned(data),
409            }
410        }
411        wut => return Err(McapError::UnexpectedChunkRecord(wut)),
412    };
413    trace!("       {:?}", record);
414    Ok(record)
415}
416
417/// Like [`LinearReader`], but unpacks chunks' records into its stream
418pub struct ChunkFlattener<'a> {
419    top_level: LinearReader<'a>,
420    dechunk: Option<ChunkReader<'a>>,
421    malformed: bool,
422}
423
424impl<'a> ChunkFlattener<'a> {
425    pub fn new(buf: &'a [u8]) -> McapResult<Self> {
426        Self::new_with_options(buf, enum_set!())
427    }
428
429    pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
430        let top_level = LinearReader::new_with_options(buf, options)?;
431        Ok(Self {
432            top_level,
433            dechunk: None,
434            malformed: false,
435        })
436    }
437
438    fn bytes_remaining(&self) -> usize {
439        self.top_level.bytes_remaining()
440    }
441}
442
443impl<'a> Iterator for ChunkFlattener<'a> {
444    type Item = McapResult<records::Record<'a>>;
445
446    fn next(&mut self) -> Option<Self::Item> {
447        if self.malformed {
448            return None;
449        }
450
451        let n: Option<Self::Item> = loop {
452            // If we're reading from a chunk, do that until it returns None.
453            if let Some(d) = &mut self.dechunk {
454                match d.next() {
455                    Some(d) => break Some(d),
456                    None => self.dechunk = None,
457                }
458            }
459            // Fall through - if we didn't extract a record from a chunk
460            // (or that chunk ended), move on to the next top-level record.
461            match self.top_level.next() {
462                // If it's a chunk, get a new chunk reader going...
463                Some(Ok(Record::Chunk { header, data })) => {
464                    self.dechunk = match ChunkReader::new(header, data) {
465                        Ok(d) => Some(d),
466                        Err(e) => break Some(Err(e)),
467                    };
468                    // ...then continue the loop to get the first item from the chunk.
469                }
470                // If it's not a chunk, just yield it.
471                not_a_chunk => break not_a_chunk,
472            }
473        };
474
475        // Give up on errors
476        if matches!(n, Some(Err(_))) {
477            self.malformed = true;
478        }
479        n
480    }
481}
482
483/// Parses schemas and channels and wires them together
484#[derive(Debug, Default)]
485struct ChannelAccumulator<'a> {
486    schemas: HashMap<u16, Arc<Schema<'a>>>,
487    channels: HashMap<u16, Arc<Channel<'a>>>,
488}
489
490impl<'a> ChannelAccumulator<'a> {
491    fn add_schema(&mut self, header: records::SchemaHeader, data: Cow<'a, [u8]>) -> McapResult<()> {
492        if header.id == 0 {
493            return Err(McapError::InvalidSchemaId);
494        }
495
496        let schema = Arc::new(Schema {
497            name: header.name.clone(),
498            encoding: header.encoding,
499            data,
500        });
501
502        if let Some(preexisting) = self.schemas.insert(header.id, schema.clone()) {
503            // Oh boy, we have this schema already.
504            // It had better be identital.
505            if schema != preexisting {
506                return Err(McapError::ConflictingSchemas(header.name));
507            }
508        }
509        Ok(())
510    }
511
512    fn add_channel(&mut self, chan: records::Channel) -> McapResult<()> {
513        // The schema ID can be 0 for "no schema",
514        // Or must reference some previously-read schema.
515        let schema = if chan.schema_id == 0 {
516            None
517        } else {
518            match self.schemas.get(&chan.schema_id) {
519                Some(s) => Some(s.clone()),
520                None => {
521                    return Err(McapError::UnknownSchema(chan.topic, chan.schema_id));
522                }
523            }
524        };
525
526        let channel = Arc::new(Channel {
527            topic: chan.topic.clone(),
528            schema,
529            message_encoding: chan.message_encoding,
530            metadata: chan.metadata,
531        });
532        if let Some(preexisting) = self.channels.insert(chan.id, channel.clone()) {
533            // Oh boy, we have this channel already.
534            // It had better be identital.
535            if preexisting != channel {
536                return Err(McapError::ConflictingChannels(chan.topic));
537            }
538        }
539        Ok(())
540    }
541
542    fn get(&self, chan_id: u16) -> Option<Arc<Channel<'a>>> {
543        self.channels.get(&chan_id).cloned()
544    }
545}
546
547/// Reads all messages from the MCAP file---in the order they were written---and
548/// perform needed validation (CRCs, etc.) as we go.
549///
550/// This stops at the end of the data section and does not read the summary.
551///
552/// Because tying the lifetime of each message to the underlying MCAP memory map
553/// makes it very difficult to send between threads or use in async land,
554/// and because we assume _most_ MCAP files have _most_ messages in compressed chunks,
555/// yielded [`Message`](crate::Message)s have unbounded lifetimes.
556/// For messages we've decompressed into their own buffers, this is free!
557/// For uncompressed messages, we take a copy of the message's data.
558pub struct MessageStream<'a> {
559    full_file: &'a [u8],
560    records: ChunkFlattener<'a>,
561    done: bool,
562    channeler: ChannelAccumulator<'static>,
563}
564
565impl<'a> MessageStream<'a> {
566    pub fn new(buf: &'a [u8]) -> McapResult<Self> {
567        Self::new_with_options(buf, enum_set!())
568    }
569
570    pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
571        let full_file = buf;
572        let records = ChunkFlattener::new_with_options(buf, options)?;
573
574        Ok(Self {
575            full_file,
576            records,
577            done: false,
578            channeler: ChannelAccumulator::default(),
579        })
580    }
581}
582
583impl<'a> Iterator for MessageStream<'a> {
584    type Item = McapResult<Message<'static>>;
585
586    fn next(&mut self) -> Option<Self::Item> {
587        if self.done {
588            return None;
589        }
590
591        let n = loop {
592            // Let's start with a working record.
593            let record = match self.records.next() {
594                Some(Ok(rec)) => rec,
595                Some(Err(e)) => break Some(Err(e)),
596                None => break None,
597            };
598
599            match record {
600                // Insert schemas into self so we know when subsequent channels reference them.
601                Record::Schema { header, data } => {
602                    let data = Cow::Owned(data.into_owned());
603                    if let Err(e) = self.channeler.add_schema(header, data) {
604                        break Some(Err(e));
605                    }
606                }
607
608                // Insert channels into self so we know when subsequent messages reference them.
609                Record::Channel(chan) => {
610                    if let Err(e) = self.channeler.add_channel(chan) {
611                        break Some(Err(e));
612                    }
613                }
614
615                Record::Message { header, data } => {
616                    // Messages must have a previously-read channel.
617                    let channel = match self.channeler.get(header.channel_id) {
618                        Some(c) => c,
619                        None => {
620                            break Some(Err(McapError::UnknownChannel(
621                                header.sequence,
622                                header.channel_id,
623                            )))
624                        }
625                    };
626
627                    let m = Message {
628                        channel,
629                        sequence: header.sequence,
630                        log_time: header.log_time,
631                        publish_time: header.publish_time,
632                        data: Cow::Owned(data.into_owned()),
633                    };
634                    break Some(Ok(m));
635                }
636
637                // If it's EOD, do unholy things to calculate the CRC.
638                Record::EndOfData(end) => {
639                    if end.data_section_crc != 0 {
640                        // This is terrible. Less math with less magic numbers, please.
641                        let data_section_len = (self.full_file.len() - MAGIC.len() * 2) // Actual working area
642                            - self.records.bytes_remaining();
643
644                        let data_section =
645                            &self.full_file[MAGIC.len()..MAGIC.len() + data_section_len];
646                        let calculated = crc32(data_section);
647                        if end.data_section_crc != calculated {
648                            break Some(Err(McapError::BadDataCrc {
649                                saved: end.data_section_crc,
650                                calculated,
651                            }));
652                        }
653                    }
654                    break None; // We're done at any rate.
655                }
656                _skip => {}
657            };
658        };
659
660        if !matches!(n, Some(Ok(_))) {
661            self.done = true;
662        }
663        n
664    }
665}
666
667const FOOTER_LEN: usize = 20 + 8 + 1; // 20 bytes + 8 byte len + 1 byte opcode
668
669/// Read the MCAP footer.
670///
671/// You'd probably prefer to use [`Summary::read`] to parse the whole summary,
672/// then index into the rest of the file with
673/// [`Summary::stream_chunk`], [`attachment`], [`metadata`], etc.
674pub fn footer(mcap: &[u8]) -> McapResult<records::Footer> {
675    if mcap.len() < MAGIC.len() * 2 + FOOTER_LEN {
676        return Err(McapError::UnexpectedEof);
677    }
678
679    if !mcap.starts_with(MAGIC) || !mcap.ends_with(MAGIC) {
680        return Err(McapError::BadMagic);
681    }
682
683    let footer_buf = &mcap[mcap.len() - MAGIC.len() - FOOTER_LEN..];
684
685    match LinearReader::sans_magic(footer_buf).next() {
686        Some(Ok(Record::Footer(f))) => Ok(f),
687        _ => Err(McapError::BadFooter),
688    }
689}
690
691/// Indexes of an MCAP file parsed from its (optional) summary section
692#[derive(Default, Eq, PartialEq)]
693pub struct Summary<'a> {
694    pub stats: Option<records::Statistics>,
695    /// Maps channel IDs to their channel
696    pub channels: HashMap<u16, Arc<Channel<'a>>>,
697    /// Maps schema IDs to their schema
698    pub schemas: HashMap<u16, Arc<Schema<'a>>>,
699    pub chunk_indexes: Vec<records::ChunkIndex>,
700    pub attachment_indexes: Vec<records::AttachmentIndex>,
701    pub metadata_indexes: Vec<records::MetadataIndex>,
702}
703
704impl fmt::Debug for Summary<'_> {
705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706        // Keep the actual maps as HashMaps for constant-time lookups,
707        // but order everything up before debug printing it here.
708        let channels = self.channels.iter().collect::<BTreeMap<_, _>>();
709        let schemas = self.schemas.iter().collect::<BTreeMap<_, _>>();
710
711        f.debug_struct("Summary")
712            .field("stats", &self.stats)
713            .field("channels", &channels)
714            .field("schemas", &schemas)
715            .field("chunk_indexes", &self.chunk_indexes)
716            .field("attachment_indexes", &self.attachment_indexes)
717            .field("metadata_indexes", &self.metadata_indexes)
718            .finish()
719    }
720}
721
722impl<'a> Summary<'a> {
723    /// Read the summary section of the given mapped MCAP file, if it has one.
724    pub fn read(mcap: &'a [u8]) -> McapResult<Option<Self>> {
725        let foot = footer(mcap)?;
726
727        // A summary start offset of 0 means there's no summary.
728        if foot.summary_start == 0 {
729            return Ok(None);
730        }
731
732        if foot.summary_crc != 0 {
733            // The checksum covers the entire summary _except_ itself, including other footer bytes.
734            let calculated =
735                crc32(&mcap[foot.summary_start as usize..mcap.len() - MAGIC.len() - 4]);
736            if foot.summary_crc != calculated {
737                return Err(McapError::BadSummaryCrc {
738                    saved: foot.summary_crc,
739                    calculated,
740                });
741            }
742        }
743
744        let mut summary = Summary::default();
745        let mut channeler = ChannelAccumulator::default();
746
747        let summary_end = match foot.summary_offset_start {
748            0 => MAGIC.len() - FOOTER_LEN,
749            sos => sos as usize,
750        };
751        let summary_buf = &mcap[foot.summary_start as usize..summary_end];
752
753        for record in LinearReader::sans_magic(summary_buf) {
754            match record? {
755                Record::Statistics(s) => {
756                    if summary.stats.is_some() {
757                        warn!("Multiple statistics records found in summary");
758                    }
759                    summary.stats = Some(s);
760                }
761                Record::Schema { header, data } => channeler.add_schema(header, data)?,
762                Record::Channel(c) => channeler.add_channel(c)?,
763                Record::ChunkIndex(c) => summary.chunk_indexes.push(c),
764                Record::AttachmentIndex(a) => summary.attachment_indexes.push(a),
765                Record::MetadataIndex(i) => summary.metadata_indexes.push(i),
766                _ => {}
767            };
768        }
769
770        summary.schemas = channeler.schemas;
771        summary.channels = channeler.channels;
772
773        Ok(Some(summary))
774    }
775
776    /// Stream messages from the chunk with the given index.
777    ///
778    /// To avoid having to read all preceding chunks first,
779    /// channels and their schemas are pulled from this summary.
780    pub fn stream_chunk(
781        &self,
782        mcap: &'a [u8],
783        index: &records::ChunkIndex,
784    ) -> McapResult<impl Iterator<Item = McapResult<Message<'a>>> + '_> {
785        let end = (index.chunk_start_offset + index.chunk_length) as usize;
786        if mcap.len() < end {
787            return Err(McapError::BadIndex);
788        }
789
790        // Get the chunk (as a header and its data) out of the file at the given offset.
791        let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]);
792        let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
793            Ok(records::Record::Chunk { header, data }) => (header, data),
794            Ok(_other_record) => return Err(McapError::BadIndex),
795            Err(e) => return Err(e),
796        };
797
798        if reader.next().is_some() {
799            // Wut - multiple records in the given slice?
800            return Err(McapError::BadIndex);
801        }
802
803        // Now let's stream messages out of the chunk.
804        let messages = ChunkReader::new(h, d)?.filter_map(|record| match record {
805            Ok(records::Record::Message { header, data }) => {
806                // Correlate the message to its channel from this summary.
807                let channel = match self.channels.get(&header.channel_id) {
808                    Some(c) => c.clone(),
809                    None => {
810                        return Some(Err(McapError::UnknownChannel(
811                            header.sequence,
812                            header.channel_id,
813                        )));
814                    }
815                };
816
817                let m = Message {
818                    channel,
819                    sequence: header.sequence,
820                    log_time: header.log_time,
821                    publish_time: header.publish_time,
822                    data,
823                };
824
825                Some(Ok(m))
826            }
827            // We don't care about other chunk records (channels, schemas) -
828            // we should have them from &self already.
829            Ok(_other_record) => None,
830            // We do care about errors, though.
831            Err(e) => Some(Err(e)),
832        });
833
834        Ok(messages)
835    }
836
837    /// Read the mesage indexes for the given indexed chunk.
838    ///
839    /// Channels and their schemas are pulled from this summary.
840    /// The offsets in each [`MessageIndexEntry`](records::MessageIndexEntry)
841    /// is relative to the decompressed contents of the given chunk.
842    pub fn read_message_indexes(
843        &self,
844        mcap: &[u8],
845        index: &records::ChunkIndex,
846    ) -> McapResult<HashMap<Arc<Channel>, Vec<records::MessageIndexEntry>>> {
847        if index.message_index_offsets.is_empty() {
848            // Message indexing is optional... should we be more descriptive here?
849            return Err(McapError::BadIndex);
850        }
851
852        let mut indexes = HashMap::new();
853
854        for (channel_id, offset) in &index.message_index_offsets {
855            let offset = *offset as usize;
856
857            // Message indexes are at least 15 bytes:
858            // 1 byte opcode, 8 byte length, 2 byte channel ID, 4 byte array len
859            if mcap.len() < offset + 15 {
860                return Err(McapError::BadIndex);
861            }
862
863            // Get the MessageIndex out of the file at the given offset.
864            let mut reader = LinearReader::sans_magic(&mcap[offset..]);
865            let index = match reader.next().ok_or(McapError::BadIndex)? {
866                Ok(records::Record::MessageIndex(i)) => i,
867                Ok(_other_record) => return Err(McapError::BadIndex),
868                Err(e) => return Err(e),
869            };
870
871            // The channel ID from the chunk index and the message index should match
872            if *channel_id != index.channel_id {
873                return Err(McapError::BadIndex);
874            }
875
876            let channel = match self.channels.get(&index.channel_id) {
877                Some(c) => c,
878                None => {
879                    return Err(McapError::UnknownChannel(
880                        0, // We don't have a message sequence num yet.
881                        index.channel_id,
882                    ));
883                }
884            };
885
886            if indexes.insert(channel.clone(), index.records).is_some() {
887                return Err(McapError::ConflictingChannels(channel.topic.clone()));
888            }
889        }
890
891        Ok(indexes)
892    }
893
894    /// Seek to the given message in the given indexed chunk.
895    ///
896    /// If you're interested in more than a single message from the chunk,
897    /// filtering [`Summary::stream_chunk`] is probably a better bet.
898    /// Compressed chunks aren't random access -
899    /// this decompresses everything in the chunk before
900    /// [`message.offset`](records::MessageIndexEntry::offset) and throws it away.
901    pub fn seek_message(
902        &self,
903        mcap: &'a [u8],
904        index: &records::ChunkIndex,
905        message: &records::MessageIndexEntry,
906    ) -> McapResult<Message> {
907        // Get the chunk (as a header and its data) out of the file at the given offset.
908        let end = (index.chunk_start_offset + index.chunk_length) as usize;
909        if mcap.len() < end {
910            return Err(McapError::BadIndex);
911        }
912
913        let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]);
914        let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
915            Ok(records::Record::Chunk { header, data }) => (header, data),
916            Ok(_other_record) => return Err(McapError::BadIndex),
917            Err(e) => return Err(e),
918        };
919
920        if reader.next().is_some() {
921            // Wut - multiple records in the given slice?
922            return Err(McapError::BadIndex);
923        }
924
925        let mut chunk_reader = ChunkReader::new(h, d)?;
926
927        // Do unspeakable things to seek to the message.
928        match &mut chunk_reader.decompressor {
929            ChunkDecompressor::Null(reader) => {
930                // Skip messages until we're at the offset.
931                while reader.bytes_remaining() as u64 > index.uncompressed_size - message.offset {
932                    match reader.next() {
933                        Some(Ok(_)) => {}
934                        Some(Err(e)) => return Err(e),
935                        None => return Err(McapError::BadIndex),
936                    };
937                }
938                // Be exact!
939                if reader.bytes_remaining() as u64 != index.uncompressed_size - message.offset {
940                    return Err(McapError::BadIndex);
941                }
942            }
943            ChunkDecompressor::Compressed(maybe_read) => {
944                let reader = maybe_read.as_mut().unwrap();
945                // Decompress offset bytes, which should put us at the message we want.
946                io::copy(&mut reader.take(message.offset), &mut io::sink())?;
947            }
948        }
949
950        // Now let's get our message.
951        match chunk_reader.next() {
952            Some(Ok(records::Record::Message { header, data })) => {
953                // Correlate the message to its channel from this summary.
954                let channel = match self.channels.get(&header.channel_id) {
955                    Some(c) => c.clone(),
956                    None => {
957                        return Err(McapError::UnknownChannel(
958                            header.sequence,
959                            header.channel_id,
960                        ));
961                    }
962                };
963
964                let m = Message {
965                    channel,
966                    sequence: header.sequence,
967                    log_time: header.log_time,
968                    publish_time: header.publish_time,
969                    data,
970                };
971
972                Ok(m)
973            }
974            // The index told us this was a message...
975            Some(Ok(_other_record)) => Err(McapError::BadIndex),
976            Some(Err(e)) => Err(e),
977            None => Err(McapError::BadIndex),
978        }
979    }
980}
981
982/// Read the attachment with the given index.
983pub fn attachment<'a>(
984    mcap: &'a [u8],
985    index: &records::AttachmentIndex,
986) -> McapResult<Attachment<'a>> {
987    let end = (index.offset + index.length) as usize;
988    if mcap.len() < end {
989        return Err(McapError::BadIndex);
990    }
991
992    let mut reader = LinearReader::sans_magic(&mcap[index.offset as usize..end]);
993    let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
994        Ok(records::Record::Attachment { header, data }) => (header, data),
995        Ok(_other_record) => return Err(McapError::BadIndex),
996        Err(e) => return Err(e),
997    };
998
999    if reader.next().is_some() {
1000        // Wut - multiple records in the given slice?
1001        return Err(McapError::BadIndex);
1002    }
1003
1004    Ok(Attachment {
1005        log_time: h.log_time,
1006        create_time: h.create_time,
1007        name: h.name,
1008        content_type: h.content_type,
1009        data: Cow::Borrowed(d),
1010    })
1011}
1012
1013/// Read the metadata with the given index.
1014pub fn metadata(mcap: &[u8], index: &records::MetadataIndex) -> McapResult<records::Metadata> {
1015    let end = (index.offset + index.length) as usize;
1016    if mcap.len() < end {
1017        return Err(McapError::BadIndex);
1018    }
1019
1020    let mut reader = LinearReader::sans_magic(&mcap[index.offset as usize..end]);
1021    let m = match reader.next().ok_or(McapError::BadIndex)? {
1022        Ok(records::Record::Metadata(m)) => m,
1023        Ok(_other_record) => return Err(McapError::BadIndex),
1024        Err(e) => return Err(e),
1025    };
1026
1027    if reader.next().is_some() {
1028        // Wut - multiple records in the given slice?
1029        return Err(McapError::BadIndex);
1030    }
1031
1032    Ok(m)
1033}
1034
1035// All of the following panic if they walk off the back of the data block;
1036// callers are assumed to have made sure they got enoug bytes back with
1037// `validate_response()`
1038
1039/// Builds a `read_<type>(&mut buf)` function that reads a given type
1040/// off the buffer and advances it the appropriate number of bytes.
1041macro_rules! reader {
1042    ($type:ty) => {
1043        paste::paste! {
1044            #[inline]
1045            fn [<read_ $type>](block: &mut &[u8]) -> $type {
1046                const SIZE: usize = std::mem::size_of::<$type>();
1047                let res = $type::from_le_bytes(
1048                    block[0..SIZE].try_into().unwrap()
1049                );
1050                *block = &block[SIZE..];
1051                res
1052            }
1053        }
1054    };
1055}
1056
1057reader!(u8);
1058reader!(u64);
1059
1060#[cfg(test)]
1061mod test {
1062    use super::*;
1063
1064    // Can we read a file that's only magic?
1065    // (Probably considered malformed by the spec, but let's not panic on user input)
1066
1067    #[test]
1068    fn only_two_magics() {
1069        let two_magics = MAGIC.repeat(2);
1070        let mut reader = LinearReader::new(&two_magics).unwrap();
1071        assert!(reader.next().is_none());
1072    }
1073
1074    #[test]
1075    fn only_one_magic() {
1076        assert!(matches!(LinearReader::new(MAGIC), Err(McapError::BadMagic)));
1077    }
1078
1079    #[test]
1080    fn only_two_magic_with_ignore_end_magic() {
1081        let two_magics = MAGIC.repeat(2);
1082        let mut reader =
1083            LinearReader::new_with_options(&two_magics, enum_set!(Options::IgnoreEndMagic))
1084                .unwrap();
1085        assert!(reader.next().is_none());
1086    }
1087
1088    #[test]
1089    fn only_one_magic_with_ignore_end_magic() {
1090        let mut reader =
1091            LinearReader::new_with_options(MAGIC, enum_set!(Options::IgnoreEndMagic)).unwrap();
1092        assert!(reader.next().is_none());
1093    }
1094}