1use std::{
4 borrow::Cow,
5 collections::{BTreeMap, HashMap},
6 io::{self, prelude::*, Cursor, SeekFrom},
7};
8
9use binrw::prelude::*;
10use byteorder::{WriteBytesExt, LE};
11
12use crate::{
13 io_utils::CountingCrcWriter,
14 records::{self, op, MessageHeader, Record},
15 Attachment, Channel, Compression, McapError, McapResult, Message, Schema, MAGIC,
16};
17
18pub use records::Metadata;
19
20enum WriteMode<W: Write + Seek> {
21 Raw(W),
22 Chunk(ChunkWriter<W>),
23}
24
25fn op_and_len<W: Write>(w: &mut W, op: u8, len: usize) -> io::Result<()> {
26 w.write_u8(op)?;
27 w.write_u64::<LE>(len as u64)?;
28 Ok(())
29}
30
31fn write_record<W: Write>(w: &mut W, r: &Record) -> io::Result<()> {
32 macro_rules! record {
35 ($op:expr, $b:ident) => {{
36 let mut rec_buf = Vec::new();
37 Cursor::new(&mut rec_buf).write_le($b).unwrap();
38
39 op_and_len(w, $op, rec_buf.len())?;
40 w.write_all(&rec_buf)?;
41 }};
42 }
43
44 macro_rules! header_and_data {
45 ($op:expr, $header:ident, $data:ident) => {{
46 let mut header_buf = Vec::new();
47 Cursor::new(&mut header_buf).write_le($header).unwrap();
48
49 op_and_len(w, $op, header_buf.len() + $data.len())?;
50 w.write_all(&header_buf)?;
51 w.write_all($data)?;
52 }};
53 }
54
55 match r {
56 Record::Header(h) => record!(op::HEADER, h),
57 Record::Footer(_) => {
58 unreachable!("Footer handles its own serialization because its CRC is self-referencing")
59 }
60 Record::Schema { header, data } => header_and_data!(op::SCHEMA, header, data),
61 Record::Channel(c) => record!(op::CHANNEL, c),
62 Record::Message { header, data } => header_and_data!(op::MESSAGE, header, data),
63 Record::Chunk { .. } => {
64 unreachable!("Chunks handle their own serialization due to seeking shenanigans")
65 }
66 Record::MessageIndex(_) => {
67 unreachable!("MessageIndexes handle their own serialization to recycle the buffer between indexes")
68 }
69 Record::ChunkIndex(c) => record!(op::CHUNK_INDEX, c),
70 Record::Attachment { header, data } => {
71 assert_eq!(header.data_len, data.len() as u64);
72
73 let mut header_buf = Vec::new();
76 Cursor::new(&mut header_buf).write_le(header).unwrap();
77 op_and_len(w, op::ATTACHMENT, header_buf.len() + data.len() + 4)?; let mut checksummer = CountingCrcWriter::new(w);
80 checksummer.write_all(&header_buf)?;
81 checksummer.write_all(data)?;
82 let (w, crc) = checksummer.finalize();
83 w.write_u32::<LE>(crc)?;
84 }
85 Record::AttachmentIndex(ai) => record!(op::ATTACHMENT_INDEX, ai),
86 Record::Statistics(s) => record!(op::STATISTICS, s),
87 Record::Metadata(m) => record!(op::METADATA, m),
88 Record::MetadataIndex(mi) => record!(op::METADATA_INDEX, mi),
89 Record::SummaryOffset(so) => record!(op::SUMMARY_OFFSET, so),
90 Record::EndOfData(eod) => record!(op::END_OF_DATA, eod),
91 _ => todo!(),
92 };
93 Ok(())
94}
95
96#[derive(Debug, Clone)]
97pub struct WriteOptions {
98 compression: Option<Compression>,
99 profile: String,
100}
101
102impl Default for WriteOptions {
103 fn default() -> Self {
104 Self {
105 compression: Some(Compression::Zstd),
106 profile: String::new(),
107 }
108 }
109}
110
111impl WriteOptions {
112 pub fn new() -> Self {
113 Self::default()
114 }
115
116 pub fn compression(self, compression: Option<Compression>) -> Self {
117 Self {
118 compression,
119 ..self
120 }
121 }
122
123 pub fn profile<S: Into<String>>(self, profile: S) -> Self {
124 Self {
125 profile: profile.into(),
126 ..self
127 }
128 }
129
130 pub fn create<'a, W: Write + Seek>(self, w: W) -> McapResult<Writer<'a, W>> {
132 Writer::with_options(w, self)
133 }
134}
135
136pub struct Writer<'a, W: Write + Seek> {
141 writer: Option<WriteMode<W>>,
142 compression: Option<Compression>,
143 schemas: HashMap<Schema<'a>, u16>,
144 channels: HashMap<Channel<'a>, u16>,
145 stats: records::Statistics,
146 chunk_indexes: Vec<records::ChunkIndex>,
147 attachment_indexes: Vec<records::AttachmentIndex>,
148 metadata_indexes: Vec<records::MetadataIndex>,
149}
150
151impl<'a, W: Write + Seek> Writer<'a, W> {
152 pub fn new(writer: W) -> McapResult<Self> {
153 Self::with_options(writer, WriteOptions::default())
154 }
155
156 fn with_options(mut writer: W, opts: WriteOptions) -> McapResult<Self> {
157 writer.write_all(MAGIC)?;
158
159 write_record(
160 &mut writer,
161 &Record::Header(records::Header {
162 profile: opts.profile,
163 library: String::from("mcap-rs-") + env!("CARGO_PKG_VERSION"),
164 }),
165 )?;
166
167 Ok(Self {
168 writer: Some(WriteMode::Raw(writer)),
169 compression: opts.compression,
170 schemas: HashMap::new(),
171 channels: HashMap::new(),
172 stats: records::Statistics::default(),
173 chunk_indexes: Vec::new(),
174 attachment_indexes: Vec::new(),
175 metadata_indexes: Vec::new(),
176 })
177 }
178
179 pub fn add_channel(&mut self, chan: &Channel<'a>) -> McapResult<u16> {
183 let schema_id = match &chan.schema {
184 Some(s) => self.add_schema(s)?,
185 None => 0,
186 };
187
188 if let Some(id) = self.channels.get(chan) {
189 return Ok(*id);
190 }
191
192 self.stats.channel_count += 1;
193
194 let next_channel_id = self.channels.len() as u16;
195 assert!(self
196 .channels
197 .insert(chan.clone(), next_channel_id)
198 .is_none());
199 self.chunkin_time()?
200 .write_channel(next_channel_id, schema_id, chan)?;
201 Ok(next_channel_id)
202 }
203
204 fn add_schema(&mut self, schema: &Schema<'a>) -> McapResult<u16> {
205 if let Some(id) = self.schemas.get(schema) {
206 return Ok(*id);
207 }
208
209 self.stats.schema_count += 1;
210
211 let next_schema_id = self.schemas.len() as u16 + 1;
214 assert!(self
215 .schemas
216 .insert(schema.clone(), next_schema_id)
217 .is_none());
218 self.chunkin_time()?.write_schema(next_schema_id, schema)?;
219 Ok(next_schema_id)
220 }
221
222 pub fn write(&mut self, message: &Message<'a>) -> McapResult<()> {
224 let channel_id = self.add_channel(&message.channel)?;
225 let header = MessageHeader {
226 channel_id,
227 sequence: message.sequence,
228 log_time: message.log_time,
229 publish_time: message.publish_time,
230 };
231 let data: &[u8] = &message.data;
232 self.write_to_known_channel(&header, data)
233 }
234
235 pub fn write_to_known_channel(
239 &mut self,
240 header: &MessageHeader,
241 data: &[u8],
242 ) -> McapResult<()> {
243 if !self.channels.values().any(|id| *id == header.channel_id) {
246 return Err(McapError::UnknownChannel(
247 header.sequence,
248 header.channel_id,
249 ));
250 }
251
252 self.stats.message_count += 1;
253 self.stats.message_start_time = match self.stats.message_start_time {
254 0 => header.log_time,
255 nz => nz.min(header.log_time),
256 };
257 self.stats.message_end_time = match self.stats.message_end_time {
258 0 => header.log_time,
259 nz => nz.max(header.log_time),
260 };
261 *self
262 .stats
263 .channel_message_counts
264 .entry(header.channel_id)
265 .or_insert(0) += 1;
266
267 self.chunkin_time()?.write_message(header, data)?;
268 Ok(())
269 }
270
271 pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
272 self.stats.attachment_count += 1;
273
274 let header = records::AttachmentHeader {
275 log_time: attachment.log_time,
276 create_time: attachment.create_time,
277 name: attachment.name.clone(),
278 content_type: attachment.content_type.clone(),
279 data_len: attachment.data.len() as u64,
280 };
281
282 let w = self.finish_chunk()?;
284
285 let offset = w.stream_position()?;
286
287 write_record(
288 w,
289 &Record::Attachment {
290 header,
291 data: &attachment.data,
292 },
293 )?;
294
295 let length = w.stream_position()? - offset;
296 self.attachment_indexes.push(records::AttachmentIndex {
297 offset,
298 length,
299 log_time: attachment.log_time,
300 create_time: attachment.create_time,
301 data_size: attachment.data.len() as u64,
302 name: attachment.name.clone(),
303 content_type: attachment.content_type.clone(),
304 });
305
306 Ok(())
307 }
308
309 pub fn write_metadata(&mut self, metadata: &Metadata) -> McapResult<()> {
310 self.stats.metadata_count += 1;
311
312 let w = self.finish_chunk()?;
313 let offset = w.stream_position()?;
314
315 write_record(w, &Record::Metadata(metadata.clone()))?;
317
318 let length = w.stream_position()? - offset;
319
320 self.metadata_indexes.push(records::MetadataIndex {
321 offset,
322 length,
323 name: metadata.name.clone(),
324 });
325
326 Ok(())
327 }
328
329 pub fn flush(&mut self) -> McapResult<()> {
345 self.finish_chunk()?.flush()?;
346 Ok(())
347 }
348
349 const WHERE_WRITER: &'static str = "Trying to write a record on a finished MCAP";
352
353 fn chunkin_time(&mut self) -> McapResult<&mut ChunkWriter<W>> {
355 let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);
361
362 self.writer = Some(match prev_writer {
363 WriteMode::Raw(w) => {
364 self.stats.chunk_count += 1;
366 WriteMode::Chunk(ChunkWriter::new(w, self.compression)?)
367 }
368 chunk => chunk,
369 });
370
371 match &mut self.writer {
372 Some(WriteMode::Chunk(c)) => Ok(c),
373 _ => unreachable!(),
374 }
375 }
376
377 fn finish_chunk(&mut self) -> McapResult<&mut W> {
379 let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);
381
382 self.writer = Some(match prev_writer {
383 WriteMode::Chunk(c) => {
384 let (w, index) = c.finish()?;
385 self.chunk_indexes.push(index);
386 WriteMode::Raw(w)
387 }
388 raw => raw,
389 });
390
391 match &mut self.writer {
392 Some(WriteMode::Raw(w)) => Ok(w),
393 _ => unreachable!(),
394 }
395 }
396
397 pub fn finish(&mut self) -> McapResult<()> {
401 if self.writer.is_none() {
402 return Ok(());
405 }
406
407 self.finish_chunk()?;
409
410 let mut writer = match self.writer.take() {
412 Some(WriteMode::Raw(w)) => w,
415 _ => unreachable!(),
416 };
417 let writer = &mut writer;
418
419 write_record(writer, &Record::EndOfData(records::EndOfData::default()))?;
421
422 let mut stats = records::Statistics::default();
428 std::mem::swap(&mut stats, &mut self.stats);
429
430 let mut chunk_indexes = Vec::new();
431 std::mem::swap(&mut chunk_indexes, &mut self.chunk_indexes);
432
433 let mut attachment_indexes = Vec::new();
434 std::mem::swap(&mut attachment_indexes, &mut self.attachment_indexes);
435
436 let mut metadata_indexes = Vec::new();
437 std::mem::swap(&mut metadata_indexes, &mut self.metadata_indexes);
438
439 struct ChannelSummary<'a> {
442 channel: Channel<'a>,
443 channel_id: u16,
444 schema_id: u16,
445 }
446
447 let mut all_channels: Vec<ChannelSummary<'_>> = self
448 .channels
449 .drain()
450 .map(|(channel, channel_id)| {
451 let schema_id = match &channel.schema {
452 Some(s) => *self.schemas.get(s).unwrap(),
453 None => 0,
454 };
455
456 ChannelSummary {
457 channel,
458 channel_id,
459 schema_id,
460 }
461 })
462 .collect();
463 all_channels.sort_unstable_by_key(|cs| cs.channel_id);
464
465 let mut all_schemas: Vec<(Schema<'_>, u16)> = self.schemas.drain().collect();
466 all_schemas.sort_unstable_by_key(|(_, v)| *v);
467
468 let mut offsets = Vec::new();
469
470 let summary_start = writer.stream_position()?;
471
472 let mut ccw = CountingCrcWriter::new(writer);
474
475 fn posit<W: Write + Seek>(ccw: &mut CountingCrcWriter<W>) -> io::Result<u64> {
476 ccw.get_mut().stream_position()
477 }
478
479 let schemas_start = summary_start;
481 for (schema, id) in all_schemas {
482 let header = records::SchemaHeader {
483 id,
484 name: schema.name,
485 encoding: schema.encoding,
486 data_len: schema.data.len() as u32,
487 };
488 let data = schema.data;
489
490 write_record(&mut ccw, &Record::Schema { header, data })?;
491 }
492 let schemas_end = posit(&mut ccw)?;
493 if schemas_end - schemas_start > 0 {
494 offsets.push(records::SummaryOffset {
495 group_opcode: op::SCHEMA,
496 group_start: schemas_start,
497 group_length: schemas_end - schemas_start,
498 });
499 }
500
501 let channels_start = schemas_end;
503 for cs in all_channels {
504 let rec = records::Channel {
505 id: cs.channel_id,
506 schema_id: cs.schema_id,
507 topic: cs.channel.topic,
508 message_encoding: cs.channel.message_encoding,
509 metadata: cs.channel.metadata,
510 };
511 write_record(&mut ccw, &Record::Channel(rec))?;
512 }
513 let channels_end = posit(&mut ccw)?;
514 if channels_end - channels_start > 0 {
515 offsets.push(records::SummaryOffset {
516 group_opcode: op::CHANNEL,
517 group_start: channels_start,
518 group_length: channels_end - channels_start,
519 });
520 }
521
522 let chunk_indexes_start = channels_end;
524 for index in chunk_indexes {
525 write_record(&mut ccw, &Record::ChunkIndex(index))?;
526 }
527 let chunk_indexes_end = posit(&mut ccw)?;
528 if chunk_indexes_end - chunk_indexes_start > 0 {
529 offsets.push(records::SummaryOffset {
530 group_opcode: op::CHUNK_INDEX,
531 group_start: chunk_indexes_start,
532 group_length: chunk_indexes_end - chunk_indexes_start,
533 });
534 }
535
536 let attachment_indexes_start = chunk_indexes_end;
538 for index in attachment_indexes {
539 write_record(&mut ccw, &Record::AttachmentIndex(index))?;
540 }
541 let attachment_indexes_end = posit(&mut ccw)?;
542 if attachment_indexes_end - attachment_indexes_start > 0 {
543 offsets.push(records::SummaryOffset {
544 group_opcode: op::ATTACHMENT_INDEX,
545 group_start: attachment_indexes_start,
546 group_length: attachment_indexes_end - attachment_indexes_start,
547 });
548 }
549
550 let metadata_indexes_start = attachment_indexes_end;
552 for index in metadata_indexes {
553 write_record(&mut ccw, &Record::MetadataIndex(index))?;
554 }
555 let metadata_indexes_end = posit(&mut ccw)?;
556 if metadata_indexes_end - metadata_indexes_start > 0 {
557 offsets.push(records::SummaryOffset {
558 group_opcode: op::METADATA_INDEX,
559 group_start: metadata_indexes_start,
560 group_length: metadata_indexes_end - metadata_indexes_start,
561 });
562 }
563
564 let stats_start = metadata_indexes_end;
565 write_record(&mut ccw, &Record::Statistics(stats))?;
566 let stats_end = posit(&mut ccw)?;
567 assert!(stats_end > stats_start);
568 offsets.push(records::SummaryOffset {
569 group_opcode: op::STATISTICS,
570 group_start: stats_start,
571 group_length: stats_end - stats_start,
572 });
573
574 let summary_offset_start = stats_end;
576 for offset in offsets {
577 write_record(&mut ccw, &Record::SummaryOffset(offset))?;
578 }
579
580 op_and_len(&mut ccw, op::FOOTER, 20)?;
582 ccw.write_u64::<LE>(summary_start)?;
583 ccw.write_u64::<LE>(summary_offset_start)?;
584
585 let (writer, summary_crc) = ccw.finalize();
586
587 writer.write_u32::<LE>(summary_crc)?;
588
589 writer.write_all(MAGIC)?;
590 writer.flush()?;
591 Ok(())
592 }
593}
594
595impl<'a, W: Write + Seek> Drop for Writer<'a, W> {
596 fn drop(&mut self) {
597 self.finish().unwrap()
598 }
599}
600
601enum Compressor<W: Write> {
602 Null(W),
603 Zstd(zstd::Encoder<'static, W>),
604 Lz4(lz4::Encoder<W>),
605}
606
607impl<W: Write> Compressor<W> {
608 fn finish(self) -> io::Result<W> {
609 Ok(match self {
610 Compressor::Null(w) => w,
611 Compressor::Zstd(w) => w.finish()?,
612 Compressor::Lz4(w) => {
613 let (w, err) = w.finish();
614 err?;
615 w
616 }
617 })
618 }
619}
620
621impl<W: Write> Write for Compressor<W> {
622 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
623 match self {
624 Compressor::Null(w) => w.write(buf),
625 Compressor::Zstd(w) => w.write(buf),
626 Compressor::Lz4(w) => w.write(buf),
627 }
628 }
629
630 fn flush(&mut self) -> io::Result<()> {
631 match self {
632 Compressor::Null(w) => w.flush(),
633 Compressor::Zstd(w) => w.flush(),
634 Compressor::Lz4(w) => w.flush(),
635 }
636 }
637}
638
639struct ChunkWriter<W: Write> {
640 header_start: u64,
641 stream_start: u64,
642 header: records::ChunkHeader,
643 compressor: CountingCrcWriter<Compressor<W>>,
644 indexes: BTreeMap<u16, Vec<records::MessageIndexEntry>>,
645}
646
647impl<W: Write + Seek> ChunkWriter<W> {
648 fn new(mut writer: W, compression: Option<Compression>) -> McapResult<Self> {
649 let header_start = writer.stream_position()?;
650
651 op_and_len(&mut writer, op::CHUNK, !0)?;
652
653 let compression_name = match compression {
654 Some(Compression::Zstd) => "zstd",
655 Some(Compression::Lz4) => "lz4",
656 None => "",
657 };
658
659 let header = records::ChunkHeader {
660 message_start_time: 0,
661 message_end_time: 0,
662 uncompressed_size: !0,
663 uncompressed_crc: !0,
664 compression: String::from(compression_name),
665 compressed_size: !0,
666 };
667
668 writer.write_le(&header)?;
669 let stream_start = writer.stream_position()?;
670
671 let compressor = match compression {
672 Some(Compression::Zstd) => {
673 let mut enc = zstd::Encoder::new(writer, 0)?;
674 enc.multithread(num_cpus::get_physical() as u32)?;
675 Compressor::Zstd(enc)
676 }
677 Some(Compression::Lz4) => {
678 let b = lz4::EncoderBuilder::new();
679 Compressor::Lz4(b.build(writer)?)
680 }
681 None => Compressor::Null(writer),
682 };
683 let compressor = CountingCrcWriter::new(compressor);
684 Ok(Self {
685 compressor,
686 header_start,
687 stream_start,
688 header,
689 indexes: BTreeMap::new(),
690 })
691 }
692
693 fn write_schema(&mut self, id: u16, schema: &Schema) -> McapResult<()> {
694 let header = records::SchemaHeader {
695 id,
696 name: schema.name.clone(),
697 encoding: schema.encoding.clone(),
698 data_len: schema.data.len() as u32,
699 };
700 write_record(
701 &mut self.compressor,
702 &Record::Schema {
703 header,
704 data: Cow::Borrowed(&schema.data),
705 },
706 )?;
707 Ok(())
708 }
709
710 fn write_channel(&mut self, id: u16, schema_id: u16, chan: &Channel) -> McapResult<()> {
711 assert_eq!(schema_id == 0, chan.schema.is_none());
712
713 let rec = records::Channel {
714 id,
715 schema_id,
716 topic: chan.topic.clone(),
717 message_encoding: chan.message_encoding.clone(),
718 metadata: chan.metadata.clone(),
719 };
720
721 write_record(&mut self.compressor, &Record::Channel(rec))?;
722 Ok(())
723 }
724
725 fn write_message(&mut self, header: &MessageHeader, data: &[u8]) -> McapResult<()> {
726 self.header.message_start_time = match self.header.message_start_time {
728 0 => header.log_time,
729 nz => nz.min(header.log_time),
730 };
731 self.header.message_end_time = match self.header.message_end_time {
732 0 => header.log_time,
733 nz => nz.max(header.log_time),
734 };
735
736 self.indexes
738 .entry(header.channel_id)
739 .or_default()
740 .push(records::MessageIndexEntry {
741 log_time: header.log_time,
742 offset: self.compressor.position(),
743 });
744
745 write_record(
746 &mut self.compressor,
747 &Record::Message {
748 header: *header,
749 data: Cow::Borrowed(data),
750 },
751 )?;
752 Ok(())
753 }
754
755 fn finish(mut self) -> McapResult<(W, records::ChunkIndex)> {
756 self.header.uncompressed_size = self.compressor.position();
758 let (stream, crc) = self.compressor.finalize();
759 self.header.uncompressed_crc = crc;
760
761 let mut writer = stream.finish()?;
763 let end_of_stream = writer.stream_position()?;
764 self.header.compressed_size = end_of_stream - self.stream_start;
765 let record_size = (end_of_stream - self.header_start) as usize - 9; writer.seek(SeekFrom::Start(self.header_start))?;
769 op_and_len(&mut writer, op::CHUNK, record_size)?;
770 writer.write_le(&self.header)?;
771 assert_eq!(self.stream_start, writer.stream_position()?);
772 assert_eq!(writer.seek(SeekFrom::End(0))?, end_of_stream);
773
774 let mut message_index_offsets: BTreeMap<u16, u64> = BTreeMap::new();
776
777 let mut index_buf = Vec::new();
778 for (channel_id, records) in self.indexes {
779 assert!(message_index_offsets
780 .insert(channel_id, writer.stream_position()?)
781 .is_none());
782 index_buf.clear();
783 let index = records::MessageIndex {
784 channel_id,
785 records,
786 };
787
788 Cursor::new(&mut index_buf).write_le(&index)?;
789 op_and_len(&mut writer, op::MESSAGE_INDEX, index_buf.len())?;
790 writer.write_all(&index_buf)?;
791 }
792 let end_of_indexes = writer.stream_position()?;
793
794 let index = records::ChunkIndex {
795 message_start_time: self.header.message_start_time,
796 message_end_time: self.header.message_end_time,
797 chunk_start_offset: self.header_start,
798 chunk_length: end_of_stream - self.header_start,
799 message_index_offsets,
800 message_index_length: end_of_indexes - end_of_stream,
801 compression: self.header.compression,
802 compressed_size: self.header.compressed_size,
803 uncompressed_size: self.header.uncompressed_size,
804 };
805
806 Ok((writer, index))
807 }
808}