1use std::{
11 borrow::Cow,
12 collections::{BTreeMap, HashMap},
13 fmt,
14 io::{self, prelude::*, Cursor},
15 sync::Arc,
16};
17
18use binrw::prelude::*;
19use crc32fast::hash as crc32;
20use enumset::{enum_set, EnumSet, EnumSetType};
21use log::*;
22
23use crate::{
24 io_utils::CountingCrcReader,
25 records::{self, op, Record},
26 Attachment, Channel, McapError, McapResult, Message, Schema, MAGIC,
27};
28
29#[derive(EnumSetType, Debug)]
34pub enum Options {
35 IgnoreEndMagic,
37}
38
39pub struct LinearReader<'a> {
45 buf: &'a [u8],
46 malformed: bool,
47}
48
49impl<'a> LinearReader<'a> {
50 pub fn new(buf: &'a [u8]) -> McapResult<Self> {
53 Self::new_with_options(buf, enum_set!())
54 }
55
56 pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
58 if !buf.starts_with(MAGIC)
59 || (!options.contains(Options::IgnoreEndMagic)
60 && (!buf.ends_with(MAGIC) || buf.len() < 2 * MAGIC.len()))
61 {
62 return Err(McapError::BadMagic);
63 }
64 let buf = &buf[MAGIC.len()..];
65 if buf.ends_with(MAGIC) {
66 Ok(Self::sans_magic(&buf[0..buf.len() - MAGIC.len()]))
67 } else {
68 Ok(Self::sans_magic(buf))
69 }
70 }
71
72 pub fn sans_magic(buf: &'a [u8]) -> Self {
76 Self {
77 buf,
78 malformed: false,
79 }
80 }
81
82 fn bytes_remaining(&self) -> usize {
87 self.buf.len()
88 }
89}
90
91impl<'a> Iterator for LinearReader<'a> {
92 type Item = McapResult<records::Record<'a>>;
93
94 fn next(&mut self) -> Option<Self::Item> {
95 if self.buf.is_empty() {
96 return None;
97 }
98
99 if self.malformed {
102 return None;
103 }
104
105 let record = match read_record_from_slice(&mut self.buf) {
106 Ok(k) => k,
107 Err(e) => {
108 self.malformed = true;
109 return Some(Err(e));
110 }
111 };
112
113 Some(Ok(record))
114 }
115}
116
117fn read_record_from_slice<'a>(buf: &mut &'a [u8]) -> McapResult<records::Record<'a>> {
119 if buf.len() < 5 {
120 warn!("Malformed MCAP - not enough space for record + length!");
121 return Err(McapError::UnexpectedEof);
122 }
123
124 let op = read_u8(buf);
125 let len = read_u64(buf);
126
127 if buf.len() < len as usize {
128 warn!(
129 "Malformed MCAP - record with length {len}, but only {} bytes remain",
130 buf.len()
131 );
132 return Err(McapError::UnexpectedEof);
133 }
134
135 let body = &buf[..len as usize];
136 debug!("slice: opcode {op:02X}, length {len}");
137 let record = read_record(op, body)?;
138 trace!(" {:?}", record);
139
140 *buf = &buf[len as usize..];
141 Ok(record)
142}
143
144fn read_record(op: u8, body: &[u8]) -> McapResult<records::Record<'_>> {
146 macro_rules! record {
147 ($b:ident) => {{
148 let mut cur = Cursor::new($b);
149 let res = cur.read_le()?;
150 assert_eq!($b.len() as u64, cur.position());
151 res
152 }};
153 }
154
155 Ok(match op {
156 op::HEADER => Record::Header(record!(body)),
157 op::FOOTER => Record::Footer(record!(body)),
158 op::SCHEMA => {
159 let mut c = Cursor::new(body);
160 let header: records::SchemaHeader = c.read_le()?;
161 let data = Cow::Borrowed(&body[c.position() as usize..]);
162 if header.data_len != data.len() as u32 {
163 warn!(
164 "Schema {}'s data length doesn't match the total schema length",
165 header.name
166 );
167 }
168 Record::Schema { header, data }
169 }
170 op::CHANNEL => Record::Channel(record!(body)),
171 op::MESSAGE => {
172 let mut c = Cursor::new(body);
173 let header = c.read_le()?;
174 let data = Cow::Borrowed(&body[c.position() as usize..]);
175 Record::Message { header, data }
176 }
177 op::CHUNK => {
178 let mut c = Cursor::new(body);
179 let header: records::ChunkHeader = c.read_le()?;
180 let data = &body[c.position() as usize..];
181 if header.compressed_size != data.len() as u64 {
182 warn!("Chunk's compressed length doesn't match its header");
183 }
184 Record::Chunk { header, data }
185 }
186 op::MESSAGE_INDEX => Record::MessageIndex(record!(body)),
187 op::CHUNK_INDEX => Record::ChunkIndex(record!(body)),
188 op::ATTACHMENT => {
189 let mut c = Cursor::new(body);
190 let header: records::AttachmentHeader = c.read_le()?;
191 let data = &body[c.position() as usize..body.len() - 4];
192 if header.data_len != data.len() as u64 {
193 warn!(
194 "Attachment {}'s data length doesn't match the total schema length",
195 header.name
196 );
197 }
198 let crc = Cursor::new(&body[body.len() - 4..]).read_le()?;
199
200 if crc != 0 {
210 let calculated = crc32(&body[..body.len() - 4]);
211 if crc != calculated {
212 return Err(McapError::BadAttachmentCrc {
213 saved: crc,
214 calculated,
215 });
216 }
217 }
218
219 Record::Attachment { header, data }
220 }
221 op::ATTACHMENT_INDEX => Record::AttachmentIndex(record!(body)),
222 op::STATISTICS => Record::Statistics(record!(body)),
223 op::METADATA => Record::Metadata(record!(body)),
224 op::METADATA_INDEX => Record::MetadataIndex(record!(body)),
225 op::SUMMARY_OFFSET => Record::SummaryOffset(record!(body)),
226 op::END_OF_DATA => Record::EndOfData(record!(body)),
227 opcode => Record::Unknown {
228 opcode,
229 data: Cow::Borrowed(body),
230 },
231 })
232}
233
234enum ChunkDecompressor<'a> {
235 Null(LinearReader<'a>),
236 Compressed(Option<CountingCrcReader<Box<dyn Read + Send + 'a>>>),
237}
238
239pub struct ChunkReader<'a> {
241 header: records::ChunkHeader,
242 decompressor: ChunkDecompressor<'a>,
243}
244
245impl<'a> ChunkReader<'a> {
246 pub fn new(header: records::ChunkHeader, data: &'a [u8]) -> McapResult<Self> {
247 let decompressor = match header.compression.as_str() {
248 "zstd" => ChunkDecompressor::Compressed(Some(CountingCrcReader::new(Box::new(
249 zstd::Decoder::new(data)?,
250 )))),
251 "lz4" => ChunkDecompressor::Compressed(Some(CountingCrcReader::new(Box::new(
252 lz4::Decoder::new(data)?,
253 )))),
254 "" => {
255 if header.uncompressed_size != header.compressed_size {
256 warn!(
257 "Chunk is uncompressed, but claims different compress/uncompressed lengths"
258 );
259 }
260
261 if header.uncompressed_crc != 0 {
262 let calculated = crc32(data);
263 if header.uncompressed_crc != calculated {
264 return Err(McapError::BadChunkCrc {
265 saved: header.uncompressed_crc,
266 calculated,
267 });
268 }
269 }
270
271 ChunkDecompressor::Null(LinearReader::sans_magic(data))
272 }
273 wat => return Err(McapError::UnsupportedCompression(wat.to_string())),
274 };
275
276 Ok(Self {
277 header,
278 decompressor,
279 })
280 }
281}
282
283impl<'a> Iterator for ChunkReader<'a> {
284 type Item = McapResult<records::Record<'a>>;
285
286 fn next(&mut self) -> Option<Self::Item> {
287 match &mut self.decompressor {
288 ChunkDecompressor::Null(r) => r.next(),
289 ChunkDecompressor::Compressed(stream) => {
290 if stream.is_none() {
293 return None;
294 }
295
296 let s = stream.as_mut().unwrap();
297
298 let record = match read_record_from_chunk_stream(s) {
299 Ok(k) => k,
300 Err(e) => {
301 *stream = None; return Some(Err(e));
303 }
304 };
305
306 if s.position() >= self.header.uncompressed_size {
308 let calculated = stream.take().unwrap().finalize();
310
311 if self.header.uncompressed_crc != 0
314 && self.header.uncompressed_crc != calculated
315 {
316 return Some(Err(McapError::BadChunkCrc {
317 saved: self.header.uncompressed_crc,
318 calculated,
319 }));
320 }
321 }
323
324 Some(Ok(record))
325 }
326 }
327 }
328}
329
330fn read_record_from_chunk_stream<'a, R: Read>(r: &mut R) -> McapResult<records::Record<'a>> {
332 use byteorder::{ReadBytesExt, LE};
335
336 let op = r.read_u8()?;
337 let len = r.read_u64::<LE>()?;
338
339 debug!("chunk: opcode {op:02X}, length {len}");
340 let record = match op {
341 op::SCHEMA => {
342 let mut record = Vec::new();
343 r.take(len).read_to_end(&mut record)?;
344 if len as usize != record.len() {
345 return Err(McapError::UnexpectedEoc);
346 }
347
348 let mut c = Cursor::new(&record);
349 let header: records::SchemaHeader = c.read_le()?;
350
351 let header_end = c.position();
352
353 let data = record.split_off(header_end as usize);
355
356 if header.data_len as usize != data.len() {
357 warn!(
358 "Schema {}'s data length doesn't match the total schema length",
359 header.name
360 );
361 }
362 Record::Schema {
363 header,
364 data: Cow::Owned(data),
365 }
366 }
367 op::CHANNEL => {
368 let mut record = Vec::new();
369 r.take(len).read_to_end(&mut record)?;
370 if len as usize != record.len() {
371 return Err(McapError::UnexpectedEoc);
372 }
373
374 let mut c = Cursor::new(&record);
375 let channel: records::Channel = c.read_le()?;
376
377 if c.position() != record.len() as u64 {
378 warn!(
379 "Channel {}'s length doesn't match its record length",
380 channel.topic
381 );
382 }
383
384 Record::Channel(channel)
385 }
386 op::MESSAGE => {
387 const HEADER_LEN: u64 = 22;
392
393 let mut header_buf = Vec::new();
394 r.take(HEADER_LEN).read_to_end(&mut header_buf)?;
395 if header_buf.len() as u64 != HEADER_LEN {
396 return Err(McapError::UnexpectedEoc);
397 }
398 let header: records::MessageHeader = Cursor::new(header_buf).read_le()?;
399
400 let mut data = Vec::new();
401 r.take(len - HEADER_LEN).read_to_end(&mut data)?;
402 if data.len() as u64 != len - HEADER_LEN {
403 return Err(McapError::UnexpectedEoc);
404 }
405
406 Record::Message {
407 header,
408 data: Cow::Owned(data),
409 }
410 }
411 wut => return Err(McapError::UnexpectedChunkRecord(wut)),
412 };
413 trace!(" {:?}", record);
414 Ok(record)
415}
416
417pub struct ChunkFlattener<'a> {
419 top_level: LinearReader<'a>,
420 dechunk: Option<ChunkReader<'a>>,
421 malformed: bool,
422}
423
424impl<'a> ChunkFlattener<'a> {
425 pub fn new(buf: &'a [u8]) -> McapResult<Self> {
426 Self::new_with_options(buf, enum_set!())
427 }
428
429 pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
430 let top_level = LinearReader::new_with_options(buf, options)?;
431 Ok(Self {
432 top_level,
433 dechunk: None,
434 malformed: false,
435 })
436 }
437
438 fn bytes_remaining(&self) -> usize {
439 self.top_level.bytes_remaining()
440 }
441}
442
443impl<'a> Iterator for ChunkFlattener<'a> {
444 type Item = McapResult<records::Record<'a>>;
445
446 fn next(&mut self) -> Option<Self::Item> {
447 if self.malformed {
448 return None;
449 }
450
451 let n: Option<Self::Item> = loop {
452 if let Some(d) = &mut self.dechunk {
454 match d.next() {
455 Some(d) => break Some(d),
456 None => self.dechunk = None,
457 }
458 }
459 match self.top_level.next() {
462 Some(Ok(Record::Chunk { header, data })) => {
464 self.dechunk = match ChunkReader::new(header, data) {
465 Ok(d) => Some(d),
466 Err(e) => break Some(Err(e)),
467 };
468 }
470 not_a_chunk => break not_a_chunk,
472 }
473 };
474
475 if matches!(n, Some(Err(_))) {
477 self.malformed = true;
478 }
479 n
480 }
481}
482
483#[derive(Debug, Default)]
485struct ChannelAccumulator<'a> {
486 schemas: HashMap<u16, Arc<Schema<'a>>>,
487 channels: HashMap<u16, Arc<Channel<'a>>>,
488}
489
490impl<'a> ChannelAccumulator<'a> {
491 fn add_schema(&mut self, header: records::SchemaHeader, data: Cow<'a, [u8]>) -> McapResult<()> {
492 if header.id == 0 {
493 return Err(McapError::InvalidSchemaId);
494 }
495
496 let schema = Arc::new(Schema {
497 name: header.name.clone(),
498 encoding: header.encoding,
499 data,
500 });
501
502 if let Some(preexisting) = self.schemas.insert(header.id, schema.clone()) {
503 if schema != preexisting {
506 return Err(McapError::ConflictingSchemas(header.name));
507 }
508 }
509 Ok(())
510 }
511
512 fn add_channel(&mut self, chan: records::Channel) -> McapResult<()> {
513 let schema = if chan.schema_id == 0 {
516 None
517 } else {
518 match self.schemas.get(&chan.schema_id) {
519 Some(s) => Some(s.clone()),
520 None => {
521 return Err(McapError::UnknownSchema(chan.topic, chan.schema_id));
522 }
523 }
524 };
525
526 let channel = Arc::new(Channel {
527 topic: chan.topic.clone(),
528 schema,
529 message_encoding: chan.message_encoding,
530 metadata: chan.metadata,
531 });
532 if let Some(preexisting) = self.channels.insert(chan.id, channel.clone()) {
533 if preexisting != channel {
536 return Err(McapError::ConflictingChannels(chan.topic));
537 }
538 }
539 Ok(())
540 }
541
542 fn get(&self, chan_id: u16) -> Option<Arc<Channel<'a>>> {
543 self.channels.get(&chan_id).cloned()
544 }
545}
546
547pub struct MessageStream<'a> {
559 full_file: &'a [u8],
560 records: ChunkFlattener<'a>,
561 done: bool,
562 channeler: ChannelAccumulator<'static>,
563}
564
565impl<'a> MessageStream<'a> {
566 pub fn new(buf: &'a [u8]) -> McapResult<Self> {
567 Self::new_with_options(buf, enum_set!())
568 }
569
570 pub fn new_with_options(buf: &'a [u8], options: EnumSet<Options>) -> McapResult<Self> {
571 let full_file = buf;
572 let records = ChunkFlattener::new_with_options(buf, options)?;
573
574 Ok(Self {
575 full_file,
576 records,
577 done: false,
578 channeler: ChannelAccumulator::default(),
579 })
580 }
581}
582
583impl<'a> Iterator for MessageStream<'a> {
584 type Item = McapResult<Message<'static>>;
585
586 fn next(&mut self) -> Option<Self::Item> {
587 if self.done {
588 return None;
589 }
590
591 let n = loop {
592 let record = match self.records.next() {
594 Some(Ok(rec)) => rec,
595 Some(Err(e)) => break Some(Err(e)),
596 None => break None,
597 };
598
599 match record {
600 Record::Schema { header, data } => {
602 let data = Cow::Owned(data.into_owned());
603 if let Err(e) = self.channeler.add_schema(header, data) {
604 break Some(Err(e));
605 }
606 }
607
608 Record::Channel(chan) => {
610 if let Err(e) = self.channeler.add_channel(chan) {
611 break Some(Err(e));
612 }
613 }
614
615 Record::Message { header, data } => {
616 let channel = match self.channeler.get(header.channel_id) {
618 Some(c) => c,
619 None => {
620 break Some(Err(McapError::UnknownChannel(
621 header.sequence,
622 header.channel_id,
623 )))
624 }
625 };
626
627 let m = Message {
628 channel,
629 sequence: header.sequence,
630 log_time: header.log_time,
631 publish_time: header.publish_time,
632 data: Cow::Owned(data.into_owned()),
633 };
634 break Some(Ok(m));
635 }
636
637 Record::EndOfData(end) => {
639 if end.data_section_crc != 0 {
640 let data_section_len = (self.full_file.len() - MAGIC.len() * 2) - self.records.bytes_remaining();
643
644 let data_section =
645 &self.full_file[MAGIC.len()..MAGIC.len() + data_section_len];
646 let calculated = crc32(data_section);
647 if end.data_section_crc != calculated {
648 break Some(Err(McapError::BadDataCrc {
649 saved: end.data_section_crc,
650 calculated,
651 }));
652 }
653 }
654 break None; }
656 _skip => {}
657 };
658 };
659
660 if !matches!(n, Some(Ok(_))) {
661 self.done = true;
662 }
663 n
664 }
665}
666
667const FOOTER_LEN: usize = 20 + 8 + 1; pub fn footer(mcap: &[u8]) -> McapResult<records::Footer> {
675 if mcap.len() < MAGIC.len() * 2 + FOOTER_LEN {
676 return Err(McapError::UnexpectedEof);
677 }
678
679 if !mcap.starts_with(MAGIC) || !mcap.ends_with(MAGIC) {
680 return Err(McapError::BadMagic);
681 }
682
683 let footer_buf = &mcap[mcap.len() - MAGIC.len() - FOOTER_LEN..];
684
685 match LinearReader::sans_magic(footer_buf).next() {
686 Some(Ok(Record::Footer(f))) => Ok(f),
687 _ => Err(McapError::BadFooter),
688 }
689}
690
691#[derive(Default, Eq, PartialEq)]
693pub struct Summary<'a> {
694 pub stats: Option<records::Statistics>,
695 pub channels: HashMap<u16, Arc<Channel<'a>>>,
697 pub schemas: HashMap<u16, Arc<Schema<'a>>>,
699 pub chunk_indexes: Vec<records::ChunkIndex>,
700 pub attachment_indexes: Vec<records::AttachmentIndex>,
701 pub metadata_indexes: Vec<records::MetadataIndex>,
702}
703
704impl fmt::Debug for Summary<'_> {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 let channels = self.channels.iter().collect::<BTreeMap<_, _>>();
709 let schemas = self.schemas.iter().collect::<BTreeMap<_, _>>();
710
711 f.debug_struct("Summary")
712 .field("stats", &self.stats)
713 .field("channels", &channels)
714 .field("schemas", &schemas)
715 .field("chunk_indexes", &self.chunk_indexes)
716 .field("attachment_indexes", &self.attachment_indexes)
717 .field("metadata_indexes", &self.metadata_indexes)
718 .finish()
719 }
720}
721
722impl<'a> Summary<'a> {
723 pub fn read(mcap: &'a [u8]) -> McapResult<Option<Self>> {
725 let foot = footer(mcap)?;
726
727 if foot.summary_start == 0 {
729 return Ok(None);
730 }
731
732 if foot.summary_crc != 0 {
733 let calculated =
735 crc32(&mcap[foot.summary_start as usize..mcap.len() - MAGIC.len() - 4]);
736 if foot.summary_crc != calculated {
737 return Err(McapError::BadSummaryCrc {
738 saved: foot.summary_crc,
739 calculated,
740 });
741 }
742 }
743
744 let mut summary = Summary::default();
745 let mut channeler = ChannelAccumulator::default();
746
747 let summary_end = match foot.summary_offset_start {
748 0 => MAGIC.len() - FOOTER_LEN,
749 sos => sos as usize,
750 };
751 let summary_buf = &mcap[foot.summary_start as usize..summary_end];
752
753 for record in LinearReader::sans_magic(summary_buf) {
754 match record? {
755 Record::Statistics(s) => {
756 if summary.stats.is_some() {
757 warn!("Multiple statistics records found in summary");
758 }
759 summary.stats = Some(s);
760 }
761 Record::Schema { header, data } => channeler.add_schema(header, data)?,
762 Record::Channel(c) => channeler.add_channel(c)?,
763 Record::ChunkIndex(c) => summary.chunk_indexes.push(c),
764 Record::AttachmentIndex(a) => summary.attachment_indexes.push(a),
765 Record::MetadataIndex(i) => summary.metadata_indexes.push(i),
766 _ => {}
767 };
768 }
769
770 summary.schemas = channeler.schemas;
771 summary.channels = channeler.channels;
772
773 Ok(Some(summary))
774 }
775
776 pub fn stream_chunk(
781 &self,
782 mcap: &'a [u8],
783 index: &records::ChunkIndex,
784 ) -> McapResult<impl Iterator<Item = McapResult<Message<'a>>> + '_> {
785 let end = (index.chunk_start_offset + index.chunk_length) as usize;
786 if mcap.len() < end {
787 return Err(McapError::BadIndex);
788 }
789
790 let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]);
792 let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
793 Ok(records::Record::Chunk { header, data }) => (header, data),
794 Ok(_other_record) => return Err(McapError::BadIndex),
795 Err(e) => return Err(e),
796 };
797
798 if reader.next().is_some() {
799 return Err(McapError::BadIndex);
801 }
802
803 let messages = ChunkReader::new(h, d)?.filter_map(|record| match record {
805 Ok(records::Record::Message { header, data }) => {
806 let channel = match self.channels.get(&header.channel_id) {
808 Some(c) => c.clone(),
809 None => {
810 return Some(Err(McapError::UnknownChannel(
811 header.sequence,
812 header.channel_id,
813 )));
814 }
815 };
816
817 let m = Message {
818 channel,
819 sequence: header.sequence,
820 log_time: header.log_time,
821 publish_time: header.publish_time,
822 data,
823 };
824
825 Some(Ok(m))
826 }
827 Ok(_other_record) => None,
830 Err(e) => Some(Err(e)),
832 });
833
834 Ok(messages)
835 }
836
837 pub fn read_message_indexes(
843 &self,
844 mcap: &[u8],
845 index: &records::ChunkIndex,
846 ) -> McapResult<HashMap<Arc<Channel>, Vec<records::MessageIndexEntry>>> {
847 if index.message_index_offsets.is_empty() {
848 return Err(McapError::BadIndex);
850 }
851
852 let mut indexes = HashMap::new();
853
854 for (channel_id, offset) in &index.message_index_offsets {
855 let offset = *offset as usize;
856
857 if mcap.len() < offset + 15 {
860 return Err(McapError::BadIndex);
861 }
862
863 let mut reader = LinearReader::sans_magic(&mcap[offset..]);
865 let index = match reader.next().ok_or(McapError::BadIndex)? {
866 Ok(records::Record::MessageIndex(i)) => i,
867 Ok(_other_record) => return Err(McapError::BadIndex),
868 Err(e) => return Err(e),
869 };
870
871 if *channel_id != index.channel_id {
873 return Err(McapError::BadIndex);
874 }
875
876 let channel = match self.channels.get(&index.channel_id) {
877 Some(c) => c,
878 None => {
879 return Err(McapError::UnknownChannel(
880 0, index.channel_id,
882 ));
883 }
884 };
885
886 if indexes.insert(channel.clone(), index.records).is_some() {
887 return Err(McapError::ConflictingChannels(channel.topic.clone()));
888 }
889 }
890
891 Ok(indexes)
892 }
893
894 pub fn seek_message(
902 &self,
903 mcap: &'a [u8],
904 index: &records::ChunkIndex,
905 message: &records::MessageIndexEntry,
906 ) -> McapResult<Message> {
907 let end = (index.chunk_start_offset + index.chunk_length) as usize;
909 if mcap.len() < end {
910 return Err(McapError::BadIndex);
911 }
912
913 let mut reader = LinearReader::sans_magic(&mcap[index.chunk_start_offset as usize..end]);
914 let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
915 Ok(records::Record::Chunk { header, data }) => (header, data),
916 Ok(_other_record) => return Err(McapError::BadIndex),
917 Err(e) => return Err(e),
918 };
919
920 if reader.next().is_some() {
921 return Err(McapError::BadIndex);
923 }
924
925 let mut chunk_reader = ChunkReader::new(h, d)?;
926
927 match &mut chunk_reader.decompressor {
929 ChunkDecompressor::Null(reader) => {
930 while reader.bytes_remaining() as u64 > index.uncompressed_size - message.offset {
932 match reader.next() {
933 Some(Ok(_)) => {}
934 Some(Err(e)) => return Err(e),
935 None => return Err(McapError::BadIndex),
936 };
937 }
938 if reader.bytes_remaining() as u64 != index.uncompressed_size - message.offset {
940 return Err(McapError::BadIndex);
941 }
942 }
943 ChunkDecompressor::Compressed(maybe_read) => {
944 let reader = maybe_read.as_mut().unwrap();
945 io::copy(&mut reader.take(message.offset), &mut io::sink())?;
947 }
948 }
949
950 match chunk_reader.next() {
952 Some(Ok(records::Record::Message { header, data })) => {
953 let channel = match self.channels.get(&header.channel_id) {
955 Some(c) => c.clone(),
956 None => {
957 return Err(McapError::UnknownChannel(
958 header.sequence,
959 header.channel_id,
960 ));
961 }
962 };
963
964 let m = Message {
965 channel,
966 sequence: header.sequence,
967 log_time: header.log_time,
968 publish_time: header.publish_time,
969 data,
970 };
971
972 Ok(m)
973 }
974 Some(Ok(_other_record)) => Err(McapError::BadIndex),
976 Some(Err(e)) => Err(e),
977 None => Err(McapError::BadIndex),
978 }
979 }
980}
981
982pub fn attachment<'a>(
984 mcap: &'a [u8],
985 index: &records::AttachmentIndex,
986) -> McapResult<Attachment<'a>> {
987 let end = (index.offset + index.length) as usize;
988 if mcap.len() < end {
989 return Err(McapError::BadIndex);
990 }
991
992 let mut reader = LinearReader::sans_magic(&mcap[index.offset as usize..end]);
993 let (h, d) = match reader.next().ok_or(McapError::BadIndex)? {
994 Ok(records::Record::Attachment { header, data }) => (header, data),
995 Ok(_other_record) => return Err(McapError::BadIndex),
996 Err(e) => return Err(e),
997 };
998
999 if reader.next().is_some() {
1000 return Err(McapError::BadIndex);
1002 }
1003
1004 Ok(Attachment {
1005 log_time: h.log_time,
1006 create_time: h.create_time,
1007 name: h.name,
1008 content_type: h.content_type,
1009 data: Cow::Borrowed(d),
1010 })
1011}
1012
1013pub fn metadata(mcap: &[u8], index: &records::MetadataIndex) -> McapResult<records::Metadata> {
1015 let end = (index.offset + index.length) as usize;
1016 if mcap.len() < end {
1017 return Err(McapError::BadIndex);
1018 }
1019
1020 let mut reader = LinearReader::sans_magic(&mcap[index.offset as usize..end]);
1021 let m = match reader.next().ok_or(McapError::BadIndex)? {
1022 Ok(records::Record::Metadata(m)) => m,
1023 Ok(_other_record) => return Err(McapError::BadIndex),
1024 Err(e) => return Err(e),
1025 };
1026
1027 if reader.next().is_some() {
1028 return Err(McapError::BadIndex);
1030 }
1031
1032 Ok(m)
1033}
1034
1035macro_rules! reader {
1042 ($type:ty) => {
1043 paste::paste! {
1044 #[inline]
1045 fn [<read_ $type>](block: &mut &[u8]) -> $type {
1046 const SIZE: usize = std::mem::size_of::<$type>();
1047 let res = $type::from_le_bytes(
1048 block[0..SIZE].try_into().unwrap()
1049 );
1050 *block = &block[SIZE..];
1051 res
1052 }
1053 }
1054 };
1055}
1056
1057reader!(u8);
1058reader!(u64);
1059
1060#[cfg(test)]
1061mod test {
1062 use super::*;
1063
1064 #[test]
1068 fn only_two_magics() {
1069 let two_magics = MAGIC.repeat(2);
1070 let mut reader = LinearReader::new(&two_magics).unwrap();
1071 assert!(reader.next().is_none());
1072 }
1073
1074 #[test]
1075 fn only_one_magic() {
1076 assert!(matches!(LinearReader::new(MAGIC), Err(McapError::BadMagic)));
1077 }
1078
1079 #[test]
1080 fn only_two_magic_with_ignore_end_magic() {
1081 let two_magics = MAGIC.repeat(2);
1082 let mut reader =
1083 LinearReader::new_with_options(&two_magics, enum_set!(Options::IgnoreEndMagic))
1084 .unwrap();
1085 assert!(reader.next().is_none());
1086 }
1087
1088 #[test]
1089 fn only_one_magic_with_ignore_end_magic() {
1090 let mut reader =
1091 LinearReader::new_with_options(MAGIC, enum_set!(Options::IgnoreEndMagic)).unwrap();
1092 assert!(reader.next().is_none());
1093 }
1094}