mcap_rs/
records.rs

1//! Raw records parsed from an MCAP file
2//!
3//! See <https://github.com/foxglove/mcap/tree/main/docs/specification>
4//!
5//! You probably want to user higher-level interfaces, like
6//! [`Message`](crate::Message), [`Channel`](crate::Channel), and [`Schema`](crate::Schema),
7//! read from iterators like [`MesssageStream`](crate::MessageStream).
8
9use binrw::io::{Read, Seek, Write};
10use binrw::*;
11
12use std::{
13    borrow::Cow,
14    collections::BTreeMap,
15    time::{Duration, SystemTime, UNIX_EPOCH},
16};
17
18/// Opcodes for MCAP file records.
19///
20/// "Records are identified by a single-byte opcode.
21/// Record opcodes in the range 0x01-0x7F are reserved for future MCAP format usage.
22/// 0x80-0xFF are reserved for application extensions and user proposals."
23pub mod op {
24    pub const HEADER: u8 = 0x01;
25    pub const FOOTER: u8 = 0x02;
26    pub const SCHEMA: u8 = 0x03;
27    pub const CHANNEL: u8 = 0x04;
28    pub const MESSAGE: u8 = 0x05;
29    pub const CHUNK: u8 = 0x06;
30    pub const MESSAGE_INDEX: u8 = 0x07;
31    pub const CHUNK_INDEX: u8 = 0x08;
32    pub const ATTACHMENT: u8 = 0x09;
33    pub const ATTACHMENT_INDEX: u8 = 0x0A;
34    pub const STATISTICS: u8 = 0x0B;
35    pub const METADATA: u8 = 0x0C;
36    pub const METADATA_INDEX: u8 = 0x0D;
37    pub const SUMMARY_OFFSET: u8 = 0x0E;
38    pub const END_OF_DATA: u8 = 0x0F;
39}
40
41/// A raw record from an MCAP file.
42///
43/// For records with large slices of binary data (schemas, messages, chunks...),
44/// we use a [`CoW`](std::borrow::Cow) that can either borrow directly from the mapped file,
45/// or hold its own buffer if it was decompressed from a chunk.
46#[derive(Debug)]
47pub enum Record<'a> {
48    Header(Header),
49    Footer(Footer),
50    Schema {
51        header: SchemaHeader,
52        data: Cow<'a, [u8]>,
53    },
54    Channel(Channel),
55    Message {
56        header: MessageHeader,
57        data: Cow<'a, [u8]>,
58    },
59    Chunk {
60        header: ChunkHeader,
61        data: &'a [u8],
62    },
63    MessageIndex(MessageIndex),
64    ChunkIndex(ChunkIndex),
65    Attachment {
66        header: AttachmentHeader,
67        data: &'a [u8],
68    },
69    AttachmentIndex(AttachmentIndex),
70    Statistics(Statistics),
71    Metadata(Metadata),
72    MetadataIndex(MetadataIndex),
73    SummaryOffset(SummaryOffset),
74    EndOfData(EndOfData),
75    /// A record of unknown type
76    Unknown {
77        opcode: u8,
78        data: Cow<'a, [u8]>,
79    },
80}
81
82impl Record<'_> {
83    pub fn opcode(&self) -> u8 {
84        match &self {
85            Record::Header(_) => op::HEADER,
86            Record::Footer(_) => op::FOOTER,
87            Record::Schema { .. } => op::SCHEMA,
88            Record::Channel(_) => op::CHANNEL,
89            Record::Message { .. } => op::MESSAGE,
90            Record::Chunk { .. } => op::CHUNK,
91            Record::MessageIndex(_) => op::MESSAGE_INDEX,
92            Record::ChunkIndex(_) => op::CHUNK_INDEX,
93            Record::Attachment { .. } => op::ATTACHMENT,
94            Record::AttachmentIndex(_) => op::ATTACHMENT_INDEX,
95            Record::Statistics(_) => op::STATISTICS,
96            Record::Metadata(_) => op::METADATA,
97            Record::MetadataIndex(_) => op::METADATA_INDEX,
98            Record::SummaryOffset(_) => op::SUMMARY_OFFSET,
99            Record::EndOfData(_) => op::END_OF_DATA,
100            Record::Unknown { opcode, .. } => *opcode,
101        }
102    }
103}
104
105#[binrw]
106#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
107struct McapString {
108    #[br(temp)]
109    #[bw(calc = inner.len() as u32)]
110    pub len: u32,
111
112    #[br(count = len, try_map = String::from_utf8)]
113    #[bw(map = |s| s.as_bytes())]
114    pub inner: String,
115}
116
117/// Avoids taking a copy to turn a String to an McapString for serialization
118fn write_string<W: binrw::io::Write + binrw::io::Seek>(
119    s: &String,
120    w: &mut W,
121    opts: &WriteOptions,
122    args: (),
123) -> BinResult<()> {
124    (s.len() as u32).write_options(w, opts, args)?;
125    (s.as_bytes()).write_options(w, opts, args)?;
126    Ok(())
127}
128
129fn parse_vec<T: binrw::BinRead<Args = ()>, R: Read + Seek>(
130    reader: &mut R,
131    ro: &ReadOptions,
132    args: (),
133) -> BinResult<Vec<T>> {
134    let mut parsed = Vec::new();
135
136    // Length of the map in BYTES, not records.
137    let byte_len: u32 = BinRead::read_options(reader, ro, args)?;
138    let pos = reader.stream_position()?;
139
140    while (reader.stream_position()? - pos) < byte_len as u64 {
141        parsed.push(T::read_options(reader, ro, args)?);
142    }
143
144    Ok(parsed)
145}
146
147#[allow(clippy::ptr_arg)] // needed to match binrw macros
148fn write_vec<W: binrw::io::Write + binrw::io::Seek, T: binrw::BinWrite<Args = ()>>(
149    v: &Vec<T>,
150    w: &mut W,
151    opts: &WriteOptions,
152    args: (),
153) -> BinResult<()> {
154    use std::io::SeekFrom;
155
156    let start = w.stream_position()?;
157    (!0u32).write_options(w, opts, args)?; // Revisit...
158    for e in v.iter() {
159        e.write_options(w, opts, args)?;
160    }
161    let end = w.stream_position()?;
162    let data_len = end - start - 4;
163    w.seek(SeekFrom::Start(start))?;
164    (data_len as u32).write_options(w, opts, args)?;
165    assert_eq!(w.seek(SeekFrom::End(0))?, end);
166    Ok(())
167}
168
169#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
170pub struct Header {
171    #[br(map = |s: McapString| s.inner )]
172    #[bw(write_with = write_string)]
173    pub profile: String,
174
175    #[br(map = |s: McapString| s.inner )]
176    #[bw(write_with = write_string)]
177    pub library: String,
178}
179
180#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
181pub struct Footer {
182    pub summary_start: u64,
183    pub summary_offset_start: u64,
184    pub summary_crc: u32,
185}
186
187#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
188pub struct SchemaHeader {
189    pub id: u16,
190
191    #[br(map = |s: McapString| s.inner )]
192    #[bw(write_with = write_string)]
193    pub name: String,
194
195    #[br(map = |s: McapString| s.inner )]
196    #[bw(write_with = write_string)]
197    pub encoding: String,
198
199    pub data_len: u32,
200}
201
202fn parse_string_map<R: Read + Seek>(
203    reader: &mut R,
204    ro: &ReadOptions,
205    args: (),
206) -> BinResult<BTreeMap<String, String>> {
207    let mut parsed = BTreeMap::new();
208
209    // Length of the map in BYTES, not records.
210    let byte_len: u32 = BinRead::read_options(reader, ro, args)?;
211    let pos = reader.stream_position()?;
212
213    while (reader.stream_position()? - pos) < byte_len as u64 {
214        let k = McapString::read_options(reader, ro, args)?;
215        let v = McapString::read_options(reader, ro, args)?;
216        if let Some(_prev) = parsed.insert(k.inner, v.inner) {
217            return Err(binrw::Error::Custom {
218                pos,
219                err: Box::new("Duplicate keys in map"),
220            });
221        }
222    }
223
224    Ok(parsed)
225}
226
227fn write_string_map<W: Write + Seek>(
228    s: &BTreeMap<String, String>,
229    w: &mut W,
230    opts: &WriteOptions,
231    args: (),
232) -> BinResult<()> {
233    // Ugh: figure out total number of bytes to write:
234    let mut byte_len = 0;
235    for (k, v) in s {
236        byte_len += 8; // Four bytes each for lengths of key and value
237        byte_len += k.len();
238        byte_len += v.len();
239    }
240
241    (byte_len as u32).write_options(w, opts, args)?;
242    let pos = w.stream_position()?;
243
244    for (k, v) in s {
245        write_string(k, w, opts, args)?;
246        write_string(v, w, opts, args)?;
247    }
248    assert_eq!(w.stream_position()?, pos + byte_len as u64);
249    Ok(())
250}
251
252fn write_int_map<K: BinWrite<Args = ()>, V: BinWrite<Args = ()>, W: Write + Seek>(
253    s: &BTreeMap<K, V>,
254    w: &mut W,
255    opts: &WriteOptions,
256    args: (),
257) -> BinResult<()> {
258    // Ugh: figure out total number of bytes to write:
259    let mut byte_len = 0;
260    for _ in s.values() {
261        // Hack: We're assuming serialized size of the value is its in-memory size.
262        // For ints of all flavors, this should be true.
263        byte_len += core::mem::size_of::<K>();
264        byte_len += core::mem::size_of::<V>();
265    }
266
267    (byte_len as u32).write_options(w, opts, args)?;
268    let pos = w.stream_position()?;
269
270    for (k, v) in s {
271        k.write_options(w, opts, args)?;
272        v.write_options(w, opts, args)?;
273    }
274    assert_eq!(w.stream_position()?, pos + byte_len as u64);
275    Ok(())
276}
277
278fn parse_int_map<K, V, R>(reader: &mut R, ro: &ReadOptions, args: ()) -> BinResult<BTreeMap<K, V>>
279where
280    K: BinRead<Args = ()> + std::cmp::Ord,
281    V: BinRead<Args = ()>,
282    R: Read + Seek,
283{
284    let mut parsed = BTreeMap::new();
285
286    // Length of the map in BYTES, not records.
287    let byte_len: u32 = BinRead::read_options(reader, ro, args)?;
288    let pos = reader.stream_position()?;
289
290    while (reader.stream_position()? - pos) < byte_len as u64 {
291        let k = K::read_options(reader, ro, args)?;
292        let v = V::read_options(reader, ro, args)?;
293        if let Some(_prev) = parsed.insert(k, v) {
294            return Err(binrw::Error::Custom {
295                pos,
296                err: Box::new("Duplicate keys in map"),
297            });
298        }
299    }
300
301    Ok(parsed)
302}
303
304#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
305pub struct Channel {
306    pub id: u16,
307    pub schema_id: u16,
308
309    #[br(map = |s: McapString| s.inner )]
310    #[bw(write_with = write_string)]
311    pub topic: String,
312
313    #[br(map = |s: McapString| s.inner )]
314    #[bw(write_with = write_string)]
315    pub message_encoding: String,
316
317    #[br(parse_with = parse_string_map)]
318    #[bw(write_with = write_string_map)]
319    pub metadata: BTreeMap<String, String>,
320}
321
322pub fn system_time_to_nanos(d: &SystemTime) -> u64 {
323    let ns = d.duration_since(UNIX_EPOCH).unwrap().as_nanos();
324    assert!(ns <= u64::MAX as u128);
325    ns as u64
326}
327
328pub fn nanos_to_system_time(n: u64) -> SystemTime {
329    UNIX_EPOCH + Duration::from_nanos(n)
330}
331
332#[derive(Debug, Copy, Clone, Eq, PartialEq, BinRead, BinWrite)]
333pub struct MessageHeader {
334    pub channel_id: u16,
335    pub sequence: u32,
336
337    pub log_time: u64,
338
339    pub publish_time: u64,
340}
341
342#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
343pub struct ChunkHeader {
344    pub message_start_time: u64,
345
346    pub message_end_time: u64,
347
348    pub uncompressed_size: u64,
349
350    pub uncompressed_crc: u32,
351
352    #[br(map = |s: McapString| s.inner )]
353    #[bw(write_with = write_string)]
354    pub compression: String,
355
356    pub compressed_size: u64,
357}
358
359#[derive(Debug, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
360pub struct MessageIndexEntry {
361    pub log_time: u64,
362
363    pub offset: u64,
364}
365
366#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
367pub struct MessageIndex {
368    pub channel_id: u16,
369
370    #[br(parse_with = parse_vec)]
371    #[bw(write_with = write_vec)]
372    pub records: Vec<MessageIndexEntry>,
373}
374
375#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
376pub struct ChunkIndex {
377    pub message_start_time: u64,
378
379    pub message_end_time: u64,
380
381    pub chunk_start_offset: u64,
382
383    pub chunk_length: u64,
384
385    #[br(parse_with = parse_int_map)]
386    #[bw(write_with = write_int_map)]
387    pub message_index_offsets: BTreeMap<u16, u64>,
388
389    pub message_index_length: u64,
390
391    #[br(map = |s: McapString| s.inner )]
392    #[bw(write_with = write_string)]
393    pub compression: String,
394
395    pub compressed_size: u64,
396
397    pub uncompressed_size: u64,
398}
399
400#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
401pub struct AttachmentHeader {
402    pub log_time: u64,
403
404    pub create_time: u64,
405
406    #[br(map = |s: McapString| s.inner )]
407    #[bw(write_with = write_string)]
408    pub name: String,
409
410    #[br(map = |s: McapString| s.inner )]
411    #[bw(write_with = write_string)]
412    pub content_type: String,
413
414    pub data_len: u64,
415}
416
417#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
418pub struct AttachmentIndex {
419    pub offset: u64,
420
421    pub length: u64,
422
423    pub log_time: u64,
424
425    pub create_time: u64,
426
427    pub data_size: u64,
428
429    #[br(map = |s: McapString| s.inner )]
430    #[bw(write_with = write_string)]
431    pub name: String,
432
433    #[br(map = |s: McapString| s.inner )]
434    #[bw(write_with = write_string)]
435    pub content_type: String,
436}
437
438#[derive(Debug, Default, Clone, Eq, PartialEq, BinRead, BinWrite)]
439pub struct Statistics {
440    pub message_count: u64,
441    pub schema_count: u16,
442    pub channel_count: u32,
443    pub attachment_count: u32,
444    pub metadata_count: u32,
445    pub chunk_count: u32,
446
447    pub message_start_time: u64,
448
449    pub message_end_time: u64,
450
451    #[br(parse_with = parse_int_map)]
452    #[bw(write_with = write_int_map)]
453    pub channel_message_counts: BTreeMap<u16, u64>,
454}
455
456#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
457pub struct Metadata {
458    #[br(map = |s: McapString| s.inner )]
459    #[bw(write_with = write_string)]
460    pub name: String,
461
462    #[br(parse_with = parse_string_map)]
463    #[bw(write_with = write_string_map)]
464    pub metadata: BTreeMap<String, String>,
465}
466
467#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
468pub struct MetadataIndex {
469    pub offset: u64,
470
471    pub length: u64,
472
473    #[br(map = |s: McapString| s.inner )]
474    #[bw(write_with = write_string)]
475    pub name: String,
476}
477
478#[derive(Debug, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
479pub struct SummaryOffset {
480    pub group_opcode: u8,
481    pub group_start: u64,
482    pub group_length: u64,
483}
484
485#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
486pub struct EndOfData {
487    pub data_section_crc: u32,
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use std::io::Cursor;
494
495    #[test]
496    fn string_parse() {
497        let ms: McapString = Cursor::new(b"\x04\0\0\0abcd").read_le().unwrap();
498        assert_eq!(
499            ms,
500            McapString {
501                inner: String::from("abcd")
502            }
503        );
504
505        assert!(Cursor::new(b"\x05\0\0\0abcd")
506            .read_le::<McapString>()
507            .is_err());
508
509        let mut written = Vec::new();
510        Cursor::new(&mut written)
511            .write_le(&McapString {
512                inner: String::from("hullo"),
513            })
514            .unwrap();
515        assert_eq!(&written, b"\x05\0\0\0hullo");
516    }
517
518    #[test]
519    fn header_parse() {
520        let expected = b"\x04\0\0\0abcd\x03\0\0\x00123";
521
522        let h: Header = Cursor::new(expected).read_le().unwrap();
523        assert_eq!(h.profile, "abcd");
524        assert_eq!(h.library, "123");
525
526        let mut written = Vec::new();
527        Cursor::new(&mut written).write_le(&h).unwrap();
528        assert_eq!(written, expected);
529    }
530}