mcap/
lib.rs

1//! A library for manipulating [Foxglove MCAP](https://github.com/foxglove/mcap) files,
2//! both reading:
3//!
4//! ```no_run
5//! use std::fs;
6//!
7//! use anyhow::{Context, Result};
8//! use camino::Utf8Path;
9//! use memmap2::Mmap;
10//!
11//! fn map_mcap<P: AsRef<Utf8Path>>(p: P) -> Result<Mmap> {
12//!     let fd = fs::File::open(p.as_ref()).context("Couldn't open MCAP file")?;
13//!     unsafe { Mmap::map(&fd) }.context("Couldn't map MCAP file")
14//! }
15//!
16//! fn read_it() -> Result<()> {
17//!     let mapped = map_mcap("in.mcap")?;
18//!
19//!     for message in mcap::MessageStream::new(&mapped)? {
20//!         println!("{:?}", message?);
21//!         // Or whatever else you'd like to do...
22//!     }
23//!     Ok(())
24//! }
25//! ```
26//! or writing:
27//! ```no_run
28//! use std::{collections::BTreeMap, fs, io::BufWriter};
29//!
30//! use anyhow::Result;
31//!
32//! use mcap::{Channel, records::MessageHeader, Writer};
33//!
34//! fn write_it() -> Result<()> {
35//!     // To set the profile or compression options, see mcap::WriteOptions.
36//!     let mut out = Writer::new(
37//!         BufWriter::new(fs::File::create("out.mcap")?)
38//!     )?;
39//!
40//!     // Channels and schemas are automatically assigned ID as they're serialized,
41//!     // and automatically deduplicated with `Arc` when deserialized.
42//!     let channel_id = out.add_channel(0, "cool stuff", "application/octet-stream", &BTreeMap::new())?;
43//!
44//!     out.write_to_known_channel(
45//!         &MessageHeader {
46//!             channel_id,
47//!             sequence: 25,
48//!             log_time: 6,
49//!             publish_time: 24
50//!         },
51//!         &[1, 2, 3]
52//!     )?;
53//!     out.write_to_known_channel(
54//!         &MessageHeader {
55//!             channel_id,
56//!             sequence: 32,
57//!             log_time: 23,
58//!             publish_time: 25
59//!         },
60//!         &[3, 4, 5]
61//!     )?;
62//!
63//!     out.finish()?;
64//!
65//!     Ok(())
66//! }
67//! ```
68
69pub mod read;
70pub mod records;
71#[cfg(feature = "tokio")]
72pub mod tokio;
73pub mod write;
74
75mod chunk_sink;
76mod io_utils;
77pub mod sans_io;
78
79use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};
80
81use thiserror::Error;
82
83#[derive(Debug, Error)]
84pub enum McapError {
85    #[error("tried to write to output while attachment is in progress")]
86    AttachmentInProgress,
87    #[error("tried to write bytes to an attachment but no attachment was in progress")]
88    AttachmentNotInProgress,
89    #[error("tried to write {excess} more bytes to attachment than the requested attachment length {attachment_length}")]
90    AttachmentTooLarge { excess: u64, attachment_length: u64 },
91    #[error("tried to finish writing attachment but current length {current} was not expected length {expected}")]
92    AttachmentIncomplete { current: u64, expected: u64 },
93    #[error("Bad magic number")]
94    BadMagic,
95    #[error("Footer record couldn't be found at the end of the file, before the magic bytes")]
96    BadFooter,
97    #[error("Attachment CRC failed (expeted {saved:08X}, got {calculated:08X}")]
98    BadAttachmentCrc { saved: u32, calculated: u32 },
99    #[error("Chunk CRC failed (expected {saved:08X}, got {calculated:08X}")]
100    BadChunkCrc { saved: u32, calculated: u32 },
101    #[error("Data section CRC failed (expected {saved:08X}, got {calculated:08X})")]
102    BadDataCrc { saved: u32, calculated: u32 },
103    #[error("Summary section CRC failed (expected {saved:08X}, got {calculated:08X})")]
104    BadSummaryCrc { saved: u32, calculated: u32 },
105    #[error("Index offset and length didn't point to the expected record type")]
106    BadIndex,
107    #[error("Attachment length ({header}) exceeds space in record ({available})")]
108    BadAttachmentLength { header: u64, available: u64 },
109    #[error("Chunk length ({header}) exceeds space in record ({available})")]
110    BadChunkLength { header: u64, available: u64 },
111    #[error("Schema length ({header}) exceeds space in record ({available})")]
112    BadSchemaLength { header: u32, available: u32 },
113    #[error("Private records must have an opcode >= 0x80, got {opcode:#04x}")]
114    PrivateRecordOpcodeIsReserved { opcode: u8 },
115    #[error("Channel `{0}` has mulitple records that don't match.")]
116    ConflictingChannels(String),
117    #[error("Schema `{0}` has mulitple records that don't match.")]
118    ConflictingSchemas(String),
119    #[error("Record parse failed")]
120    Parse(#[from] binrw::Error),
121    #[error("I/O error from writing, or reading a compression stream")]
122    Io(#[from] std::io::Error),
123    #[error("Schema has an ID of 0")]
124    InvalidSchemaId,
125    #[error("MCAP file ended in the middle of a record")]
126    UnexpectedEof,
127    #[error("Chunk ended in the middle of a record")]
128    UnexpectedEoc,
129    #[error("Record with opcode {opcode:02X} has length {len}, need at least {expected} to parse")]
130    RecordTooShort { opcode: u8, len: u64, expected: u64 },
131    #[error("Message {0} referenced unknown channel {1}")]
132    UnknownChannel(u32, u16),
133    #[error("Channel `{0}` referenced unknown schema {1}")]
134    UnknownSchema(String, u16),
135    #[error("Found record with opcode {0:02X} in a chunk")]
136    UnexpectedChunkRecord(u8),
137    #[error("Unsupported compression format `{0}`")]
138    UnsupportedCompression(String),
139    #[error("Error during decompression: `{0}`")]
140    DecompressionError(String),
141    #[error("chunk size option exceeds usize max: `{0}`")]
142    ChunkBufferTooLarge(u64),
143    #[error("record with opcode {opcode:02x} length exceeds limit: `{len}`")]
144    RecordTooLarge { opcode: u8, len: u64 },
145    #[error("chunk (de)compressed length exceeds limit: `{0}`")]
146    ChunkTooLarge(u64),
147    #[error("chunk start offset is out of file range: {0}")]
148    BadChunkStartOffset(u64),
149    #[error("cannot write more than 65536 channels to one MCAP")]
150    TooManyChannels,
151    #[error("cannot write more than 65535 schemas to one MCAP")]
152    TooManySchemas,
153    #[error("indexed reader received chunk data with unexpected offset or length")]
154    UnexpectedChunkDataInserted,
155    #[error("attempted another write after a write method failed")]
156    AttemptedWriteAfterFailure,
157    #[error("file has more bytes after end magic")]
158    BytesAfterEndMagic,
159}
160
161pub type McapResult<T> = Result<T, McapError>;
162
163/// Magic bytes for the MCAP format
164pub const MAGIC: &[u8] = &[0x89, b'M', b'C', b'A', b'P', 0x30, b'\r', b'\n'];
165
166/// Compression options for chunks of channels, schemas, and messages in an MCAP file
167#[derive(Debug, Copy, Clone)]
168pub enum Compression {
169    #[cfg(feature = "zstd")]
170    Zstd,
171    #[cfg(feature = "lz4")]
172    Lz4,
173}
174
175/// Describes a schema used by one or more [Channel]s in an MCAP file
176///
177/// The [`CoW`](std::borrow::Cow) can either borrow directly from the mapped file,
178/// or hold its own buffer if it was decompressed from a chunk.
179#[derive(Clone, PartialEq, Eq, Hash)]
180pub struct Schema<'a> {
181    pub id: u16,
182    pub name: String,
183    pub encoding: String,
184    pub data: Cow<'a, [u8]>,
185}
186
187impl fmt::Debug for Schema<'_> {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        f.debug_struct("Schema")
190            .field("name", &self.name)
191            .field("encoding", &self.encoding)
192            .finish_non_exhaustive()
193    }
194}
195
196/// Describes a channel which [Message]s are published to in an MCAP file
197#[derive(Debug, Clone, PartialEq, Eq, Hash)]
198pub struct Channel<'a> {
199    pub id: u16,
200    pub topic: String,
201    pub schema: Option<Arc<Schema<'a>>>,
202
203    pub message_encoding: String,
204    pub metadata: BTreeMap<String, String>,
205}
206
207/// An event in an MCAP file, published to a [Channel]
208///
209/// The [`CoW`](std::borrow::Cow) can either borrow directly from the mapped file,
210/// or hold its own buffer if it was decompressed from a chunk.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct Message<'a> {
213    pub channel: Arc<Channel<'a>>,
214    pub sequence: u32,
215    pub log_time: u64,
216    pub publish_time: u64,
217    pub data: Cow<'a, [u8]>,
218}
219
220/// An attachment and its metadata in an MCAP file
221#[derive(Debug, PartialEq, Eq)]
222pub struct Attachment<'a> {
223    pub log_time: u64,
224    pub create_time: u64,
225    pub name: String,
226    pub media_type: String,
227    pub data: Cow<'a, [u8]>,
228}
229
230pub use read::{parse_record, MessageStream, Summary};
231pub use write::{WriteOptions, Writer};
232
233// The following assertions ensure that the MCAP components can be sent between threads.
234mod assertions {
235    use super::*;
236    use static_assertions::assert_impl_all;
237    use std::io::Cursor;
238
239    assert_impl_all!(Writer<Cursor<Vec<u8>>>: Send);
240    assert_impl_all!(MessageStream: Send);
241    assert_impl_all!(sans_io::LinearReader: Send);
242    #[cfg(feature = "tokio")]
243    assert_impl_all!(tokio::linear_reader::LinearReader<Cursor<Vec<u8>>>: Send);
244}