1use binrw::io::{Read, Seek, Write};
10use binrw::*;
11
12use std::{
13 borrow::Cow,
14 collections::BTreeMap,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17
18pub mod op {
24 pub const HEADER: u8 = 0x01;
25 pub const FOOTER: u8 = 0x02;
26 pub const SCHEMA: u8 = 0x03;
27 pub const CHANNEL: u8 = 0x04;
28 pub const MESSAGE: u8 = 0x05;
29 pub const CHUNK: u8 = 0x06;
30 pub const MESSAGE_INDEX: u8 = 0x07;
31 pub const CHUNK_INDEX: u8 = 0x08;
32 pub const ATTACHMENT: u8 = 0x09;
33 pub const ATTACHMENT_INDEX: u8 = 0x0A;
34 pub const STATISTICS: u8 = 0x0B;
35 pub const METADATA: u8 = 0x0C;
36 pub const METADATA_INDEX: u8 = 0x0D;
37 pub const SUMMARY_OFFSET: u8 = 0x0E;
38 pub const END_OF_DATA: u8 = 0x0F;
39}
40
41#[derive(Debug)]
47pub enum Record<'a> {
48 Header(Header),
49 Footer(Footer),
50 Schema {
51 header: SchemaHeader,
52 data: Cow<'a, [u8]>,
53 },
54 Channel(Channel),
55 Message {
56 header: MessageHeader,
57 data: Cow<'a, [u8]>,
58 },
59 Chunk {
60 header: ChunkHeader,
61 data: &'a [u8],
62 },
63 MessageIndex(MessageIndex),
64 ChunkIndex(ChunkIndex),
65 Attachment {
66 header: AttachmentHeader,
67 data: &'a [u8],
68 },
69 AttachmentIndex(AttachmentIndex),
70 Statistics(Statistics),
71 Metadata(Metadata),
72 MetadataIndex(MetadataIndex),
73 SummaryOffset(SummaryOffset),
74 EndOfData(EndOfData),
75 Unknown {
77 opcode: u8,
78 data: Cow<'a, [u8]>,
79 },
80}
81
82impl Record<'_> {
83 pub fn opcode(&self) -> u8 {
84 match &self {
85 Record::Header(_) => op::HEADER,
86 Record::Footer(_) => op::FOOTER,
87 Record::Schema { .. } => op::SCHEMA,
88 Record::Channel(_) => op::CHANNEL,
89 Record::Message { .. } => op::MESSAGE,
90 Record::Chunk { .. } => op::CHUNK,
91 Record::MessageIndex(_) => op::MESSAGE_INDEX,
92 Record::ChunkIndex(_) => op::CHUNK_INDEX,
93 Record::Attachment { .. } => op::ATTACHMENT,
94 Record::AttachmentIndex(_) => op::ATTACHMENT_INDEX,
95 Record::Statistics(_) => op::STATISTICS,
96 Record::Metadata(_) => op::METADATA,
97 Record::MetadataIndex(_) => op::METADATA_INDEX,
98 Record::SummaryOffset(_) => op::SUMMARY_OFFSET,
99 Record::EndOfData(_) => op::END_OF_DATA,
100 Record::Unknown { opcode, .. } => *opcode,
101 }
102 }
103}
104
105#[binrw]
106#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
107struct McapString {
108 #[br(temp)]
109 #[bw(calc = inner.len() as u32)]
110 pub len: u32,
111
112 #[br(count = len, try_map = String::from_utf8)]
113 #[bw(map = |s| s.as_bytes())]
114 pub inner: String,
115}
116
117fn write_string<W: binrw::io::Write + binrw::io::Seek>(
119 s: &String,
120 w: &mut W,
121 opts: &WriteOptions,
122 args: (),
123) -> BinResult<()> {
124 (s.len() as u32).write_options(w, opts, args)?;
125 (s.as_bytes()).write_options(w, opts, args)?;
126 Ok(())
127}
128
129fn parse_vec<T: binrw::BinRead<Args = ()>, R: Read + Seek>(
130 reader: &mut R,
131 ro: &ReadOptions,
132 args: (),
133) -> BinResult<Vec<T>> {
134 let mut parsed = Vec::new();
135
136 let byte_len: u32 = BinRead::read_options(reader, ro, args)?;
138 let pos = reader.stream_position()?;
139
140 while (reader.stream_position()? - pos) < byte_len as u64 {
141 parsed.push(T::read_options(reader, ro, args)?);
142 }
143
144 Ok(parsed)
145}
146
147#[allow(clippy::ptr_arg)] fn write_vec<W: binrw::io::Write + binrw::io::Seek, T: binrw::BinWrite<Args = ()>>(
149 v: &Vec<T>,
150 w: &mut W,
151 opts: &WriteOptions,
152 args: (),
153) -> BinResult<()> {
154 use std::io::SeekFrom;
155
156 let start = w.stream_position()?;
157 (!0u32).write_options(w, opts, args)?; for e in v.iter() {
159 e.write_options(w, opts, args)?;
160 }
161 let end = w.stream_position()?;
162 let data_len = end - start - 4;
163 w.seek(SeekFrom::Start(start))?;
164 (data_len as u32).write_options(w, opts, args)?;
165 assert_eq!(w.seek(SeekFrom::End(0))?, end);
166 Ok(())
167}
168
169#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
170pub struct Header {
171 #[br(map = |s: McapString| s.inner )]
172 #[bw(write_with = write_string)]
173 pub profile: String,
174
175 #[br(map = |s: McapString| s.inner )]
176 #[bw(write_with = write_string)]
177 pub library: String,
178}
179
180#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
181pub struct Footer {
182 pub summary_start: u64,
183 pub summary_offset_start: u64,
184 pub summary_crc: u32,
185}
186
187#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
188pub struct SchemaHeader {
189 pub id: u16,
190
191 #[br(map = |s: McapString| s.inner )]
192 #[bw(write_with = write_string)]
193 pub name: String,
194
195 #[br(map = |s: McapString| s.inner )]
196 #[bw(write_with = write_string)]
197 pub encoding: String,
198
199 pub data_len: u32,
200}
201
202fn parse_string_map<R: Read + Seek>(
203 reader: &mut R,
204 ro: &ReadOptions,
205 args: (),
206) -> BinResult<BTreeMap<String, String>> {
207 let mut parsed = BTreeMap::new();
208
209 let byte_len: u32 = BinRead::read_options(reader, ro, args)?;
211 let pos = reader.stream_position()?;
212
213 while (reader.stream_position()? - pos) < byte_len as u64 {
214 let k = McapString::read_options(reader, ro, args)?;
215 let v = McapString::read_options(reader, ro, args)?;
216 if let Some(_prev) = parsed.insert(k.inner, v.inner) {
217 return Err(binrw::Error::Custom {
218 pos,
219 err: Box::new("Duplicate keys in map"),
220 });
221 }
222 }
223
224 Ok(parsed)
225}
226
227fn write_string_map<W: Write + Seek>(
228 s: &BTreeMap<String, String>,
229 w: &mut W,
230 opts: &WriteOptions,
231 args: (),
232) -> BinResult<()> {
233 let mut byte_len = 0;
235 for (k, v) in s {
236 byte_len += 8; byte_len += k.len();
238 byte_len += v.len();
239 }
240
241 (byte_len as u32).write_options(w, opts, args)?;
242 let pos = w.stream_position()?;
243
244 for (k, v) in s {
245 write_string(k, w, opts, args)?;
246 write_string(v, w, opts, args)?;
247 }
248 assert_eq!(w.stream_position()?, pos + byte_len as u64);
249 Ok(())
250}
251
252fn write_int_map<K: BinWrite<Args = ()>, V: BinWrite<Args = ()>, W: Write + Seek>(
253 s: &BTreeMap<K, V>,
254 w: &mut W,
255 opts: &WriteOptions,
256 args: (),
257) -> BinResult<()> {
258 let mut byte_len = 0;
260 for _ in s.values() {
261 byte_len += core::mem::size_of::<K>();
264 byte_len += core::mem::size_of::<V>();
265 }
266
267 (byte_len as u32).write_options(w, opts, args)?;
268 let pos = w.stream_position()?;
269
270 for (k, v) in s {
271 k.write_options(w, opts, args)?;
272 v.write_options(w, opts, args)?;
273 }
274 assert_eq!(w.stream_position()?, pos + byte_len as u64);
275 Ok(())
276}
277
278fn parse_int_map<K, V, R>(reader: &mut R, ro: &ReadOptions, args: ()) -> BinResult<BTreeMap<K, V>>
279where
280 K: BinRead<Args = ()> + std::cmp::Ord,
281 V: BinRead<Args = ()>,
282 R: Read + Seek,
283{
284 let mut parsed = BTreeMap::new();
285
286 let byte_len: u32 = BinRead::read_options(reader, ro, args)?;
288 let pos = reader.stream_position()?;
289
290 while (reader.stream_position()? - pos) < byte_len as u64 {
291 let k = K::read_options(reader, ro, args)?;
292 let v = V::read_options(reader, ro, args)?;
293 if let Some(_prev) = parsed.insert(k, v) {
294 return Err(binrw::Error::Custom {
295 pos,
296 err: Box::new("Duplicate keys in map"),
297 });
298 }
299 }
300
301 Ok(parsed)
302}
303
304#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
305pub struct Channel {
306 pub id: u16,
307 pub schema_id: u16,
308
309 #[br(map = |s: McapString| s.inner )]
310 #[bw(write_with = write_string)]
311 pub topic: String,
312
313 #[br(map = |s: McapString| s.inner )]
314 #[bw(write_with = write_string)]
315 pub message_encoding: String,
316
317 #[br(parse_with = parse_string_map)]
318 #[bw(write_with = write_string_map)]
319 pub metadata: BTreeMap<String, String>,
320}
321
322pub fn system_time_to_nanos(d: &SystemTime) -> u64 {
323 let ns = d.duration_since(UNIX_EPOCH).unwrap().as_nanos();
324 assert!(ns <= u64::MAX as u128);
325 ns as u64
326}
327
328pub fn nanos_to_system_time(n: u64) -> SystemTime {
329 UNIX_EPOCH + Duration::from_nanos(n)
330}
331
332#[derive(Debug, Copy, Clone, Eq, PartialEq, BinRead, BinWrite)]
333pub struct MessageHeader {
334 pub channel_id: u16,
335 pub sequence: u32,
336
337 pub log_time: u64,
338
339 pub publish_time: u64,
340}
341
342#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
343pub struct ChunkHeader {
344 pub message_start_time: u64,
345
346 pub message_end_time: u64,
347
348 pub uncompressed_size: u64,
349
350 pub uncompressed_crc: u32,
351
352 #[br(map = |s: McapString| s.inner )]
353 #[bw(write_with = write_string)]
354 pub compression: String,
355
356 pub compressed_size: u64,
357}
358
359#[derive(Debug, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
360pub struct MessageIndexEntry {
361 pub log_time: u64,
362
363 pub offset: u64,
364}
365
366#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
367pub struct MessageIndex {
368 pub channel_id: u16,
369
370 #[br(parse_with = parse_vec)]
371 #[bw(write_with = write_vec)]
372 pub records: Vec<MessageIndexEntry>,
373}
374
375#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
376pub struct ChunkIndex {
377 pub message_start_time: u64,
378
379 pub message_end_time: u64,
380
381 pub chunk_start_offset: u64,
382
383 pub chunk_length: u64,
384
385 #[br(parse_with = parse_int_map)]
386 #[bw(write_with = write_int_map)]
387 pub message_index_offsets: BTreeMap<u16, u64>,
388
389 pub message_index_length: u64,
390
391 #[br(map = |s: McapString| s.inner )]
392 #[bw(write_with = write_string)]
393 pub compression: String,
394
395 pub compressed_size: u64,
396
397 pub uncompressed_size: u64,
398}
399
400#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
401pub struct AttachmentHeader {
402 pub log_time: u64,
403
404 pub create_time: u64,
405
406 #[br(map = |s: McapString| s.inner )]
407 #[bw(write_with = write_string)]
408 pub name: String,
409
410 #[br(map = |s: McapString| s.inner )]
411 #[bw(write_with = write_string)]
412 pub content_type: String,
413
414 pub data_len: u64,
415}
416
417#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
418pub struct AttachmentIndex {
419 pub offset: u64,
420
421 pub length: u64,
422
423 pub log_time: u64,
424
425 pub create_time: u64,
426
427 pub data_size: u64,
428
429 #[br(map = |s: McapString| s.inner )]
430 #[bw(write_with = write_string)]
431 pub name: String,
432
433 #[br(map = |s: McapString| s.inner )]
434 #[bw(write_with = write_string)]
435 pub content_type: String,
436}
437
438#[derive(Debug, Default, Clone, Eq, PartialEq, BinRead, BinWrite)]
439pub struct Statistics {
440 pub message_count: u64,
441 pub schema_count: u16,
442 pub channel_count: u32,
443 pub attachment_count: u32,
444 pub metadata_count: u32,
445 pub chunk_count: u32,
446
447 pub message_start_time: u64,
448
449 pub message_end_time: u64,
450
451 #[br(parse_with = parse_int_map)]
452 #[bw(write_with = write_int_map)]
453 pub channel_message_counts: BTreeMap<u16, u64>,
454}
455
456#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
457pub struct Metadata {
458 #[br(map = |s: McapString| s.inner )]
459 #[bw(write_with = write_string)]
460 pub name: String,
461
462 #[br(parse_with = parse_string_map)]
463 #[bw(write_with = write_string_map)]
464 pub metadata: BTreeMap<String, String>,
465}
466
467#[derive(Debug, Clone, Eq, PartialEq, BinRead, BinWrite)]
468pub struct MetadataIndex {
469 pub offset: u64,
470
471 pub length: u64,
472
473 #[br(map = |s: McapString| s.inner )]
474 #[bw(write_with = write_string)]
475 pub name: String,
476}
477
478#[derive(Debug, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
479pub struct SummaryOffset {
480 pub group_opcode: u8,
481 pub group_start: u64,
482 pub group_length: u64,
483}
484
485#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, BinRead, BinWrite)]
486pub struct EndOfData {
487 pub data_section_crc: u32,
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use std::io::Cursor;
494
495 #[test]
496 fn string_parse() {
497 let ms: McapString = Cursor::new(b"\x04\0\0\0abcd").read_le().unwrap();
498 assert_eq!(
499 ms,
500 McapString {
501 inner: String::from("abcd")
502 }
503 );
504
505 assert!(Cursor::new(b"\x05\0\0\0abcd")
506 .read_le::<McapString>()
507 .is_err());
508
509 let mut written = Vec::new();
510 Cursor::new(&mut written)
511 .write_le(&McapString {
512 inner: String::from("hullo"),
513 })
514 .unwrap();
515 assert_eq!(&written, b"\x05\0\0\0hullo");
516 }
517
518 #[test]
519 fn header_parse() {
520 let expected = b"\x04\0\0\0abcd\x03\0\0\x00123";
521
522 let h: Header = Cursor::new(expected).read_le().unwrap();
523 assert_eq!(h.profile, "abcd");
524 assert_eq!(h.library, "123");
525
526 let mut written = Vec::new();
527 Cursor::new(&mut written).write_le(&h).unwrap();
528 assert_eq!(written, expected);
529 }
530}