mcap_rs/
write.rs

1//! Write MCAP files
2
3use std::{
4    borrow::Cow,
5    collections::{BTreeMap, HashMap},
6    io::{self, prelude::*, Cursor, SeekFrom},
7};
8
9use binrw::prelude::*;
10use byteorder::{WriteBytesExt, LE};
11
12use crate::{
13    io_utils::CountingCrcWriter,
14    records::{self, op, MessageHeader, Record},
15    Attachment, Channel, Compression, McapError, McapResult, Message, Schema, MAGIC,
16};
17
18pub use records::Metadata;
19
20enum WriteMode<W: Write + Seek> {
21    Raw(W),
22    Chunk(ChunkWriter<W>),
23}
24
25fn op_and_len<W: Write>(w: &mut W, op: u8, len: usize) -> io::Result<()> {
26    w.write_u8(op)?;
27    w.write_u64::<LE>(len as u64)?;
28    Ok(())
29}
30
31fn write_record<W: Write>(w: &mut W, r: &Record) -> io::Result<()> {
32    // Annoying: our stream isn't Seek if we're writing to a compressed chunk stream,
33    // so we need an intermediate buffer.
34    macro_rules! record {
35        ($op:expr, $b:ident) => {{
36            let mut rec_buf = Vec::new();
37            Cursor::new(&mut rec_buf).write_le($b).unwrap();
38
39            op_and_len(w, $op, rec_buf.len())?;
40            w.write_all(&rec_buf)?;
41        }};
42    }
43
44    macro_rules! header_and_data {
45        ($op:expr, $header:ident, $data:ident) => {{
46            let mut header_buf = Vec::new();
47            Cursor::new(&mut header_buf).write_le($header).unwrap();
48
49            op_and_len(w, $op, header_buf.len() + $data.len())?;
50            w.write_all(&header_buf)?;
51            w.write_all($data)?;
52        }};
53    }
54
55    match r {
56        Record::Header(h) => record!(op::HEADER, h),
57        Record::Footer(_) => {
58            unreachable!("Footer handles its own serialization because its CRC is self-referencing")
59        }
60        Record::Schema { header, data } => header_and_data!(op::SCHEMA, header, data),
61        Record::Channel(c) => record!(op::CHANNEL, c),
62        Record::Message { header, data } => header_and_data!(op::MESSAGE, header, data),
63        Record::Chunk { .. } => {
64            unreachable!("Chunks handle their own serialization due to seeking shenanigans")
65        }
66        Record::MessageIndex(_) => {
67            unreachable!("MessageIndexes handle their own serialization to recycle the buffer between indexes")
68        }
69        Record::ChunkIndex(c) => record!(op::CHUNK_INDEX, c),
70        Record::Attachment { header, data } => {
71            assert_eq!(header.data_len, data.len() as u64);
72
73            // Can't use header_and_data since we need to checksum those,
74            // but not the op and len
75            let mut header_buf = Vec::new();
76            Cursor::new(&mut header_buf).write_le(header).unwrap();
77            op_and_len(w, op::ATTACHMENT, header_buf.len() + data.len() + 4)?; // 4 for crc
78
79            let mut checksummer = CountingCrcWriter::new(w);
80            checksummer.write_all(&header_buf)?;
81            checksummer.write_all(data)?;
82            let (w, crc) = checksummer.finalize();
83            w.write_u32::<LE>(crc)?;
84        }
85        Record::AttachmentIndex(ai) => record!(op::ATTACHMENT_INDEX, ai),
86        Record::Statistics(s) => record!(op::STATISTICS, s),
87        Record::Metadata(m) => record!(op::METADATA, m),
88        Record::MetadataIndex(mi) => record!(op::METADATA_INDEX, mi),
89        Record::SummaryOffset(so) => record!(op::SUMMARY_OFFSET, so),
90        Record::EndOfData(eod) => record!(op::END_OF_DATA, eod),
91        _ => todo!(),
92    };
93    Ok(())
94}
95
96#[derive(Debug, Clone)]
97pub struct WriteOptions {
98    compression: Option<Compression>,
99    profile: String,
100}
101
102impl Default for WriteOptions {
103    fn default() -> Self {
104        Self {
105            compression: Some(Compression::Zstd),
106            profile: String::new(),
107        }
108    }
109}
110
111impl WriteOptions {
112    pub fn new() -> Self {
113        Self::default()
114    }
115
116    pub fn compression(self, compression: Option<Compression>) -> Self {
117        Self {
118            compression,
119            ..self
120        }
121    }
122
123    pub fn profile<S: Into<String>>(self, profile: S) -> Self {
124        Self {
125            profile: profile.into(),
126            ..self
127        }
128    }
129
130    /// Creates a [`Writer`] whch writes to `w` using the given options
131    pub fn create<'a, W: Write + Seek>(self, w: W) -> McapResult<Writer<'a, W>> {
132        Writer::with_options(w, self)
133    }
134}
135
136/// Writes an MCAP file to the given [writer](Write).
137///
138/// Users should call [`finish()`](Self::finish) to flush the stream
139/// and check for errors when done; otherwise the result will be unwrapped on drop.
140pub struct Writer<'a, W: Write + Seek> {
141    writer: Option<WriteMode<W>>,
142    compression: Option<Compression>,
143    schemas: HashMap<Schema<'a>, u16>,
144    channels: HashMap<Channel<'a>, u16>,
145    stats: records::Statistics,
146    chunk_indexes: Vec<records::ChunkIndex>,
147    attachment_indexes: Vec<records::AttachmentIndex>,
148    metadata_indexes: Vec<records::MetadataIndex>,
149}
150
151impl<'a, W: Write + Seek> Writer<'a, W> {
152    pub fn new(writer: W) -> McapResult<Self> {
153        Self::with_options(writer, WriteOptions::default())
154    }
155
156    fn with_options(mut writer: W, opts: WriteOptions) -> McapResult<Self> {
157        writer.write_all(MAGIC)?;
158
159        write_record(
160            &mut writer,
161            &Record::Header(records::Header {
162                profile: opts.profile,
163                library: String::from("mcap-rs-") + env!("CARGO_PKG_VERSION"),
164            }),
165        )?;
166
167        Ok(Self {
168            writer: Some(WriteMode::Raw(writer)),
169            compression: opts.compression,
170            schemas: HashMap::new(),
171            channels: HashMap::new(),
172            stats: records::Statistics::default(),
173            chunk_indexes: Vec::new(),
174            attachment_indexes: Vec::new(),
175            metadata_indexes: Vec::new(),
176        })
177    }
178
179    /// Adds a channel (and its provided schema, if any), returning its ID.
180    ///
181    /// Useful with subequent calls to [`write_to_known_channel()`](Self::write_to_known_channel)
182    pub fn add_channel(&mut self, chan: &Channel<'a>) -> McapResult<u16> {
183        let schema_id = match &chan.schema {
184            Some(s) => self.add_schema(s)?,
185            None => 0,
186        };
187
188        if let Some(id) = self.channels.get(chan) {
189            return Ok(*id);
190        }
191
192        self.stats.channel_count += 1;
193
194        let next_channel_id = self.channels.len() as u16;
195        assert!(self
196            .channels
197            .insert(chan.clone(), next_channel_id)
198            .is_none());
199        self.chunkin_time()?
200            .write_channel(next_channel_id, schema_id, chan)?;
201        Ok(next_channel_id)
202    }
203
204    fn add_schema(&mut self, schema: &Schema<'a>) -> McapResult<u16> {
205        if let Some(id) = self.schemas.get(schema) {
206            return Ok(*id);
207        }
208
209        self.stats.schema_count += 1;
210
211        // Schema IDs cannot be zero, that's the sentinel value in a channel
212        // for "no schema"
213        let next_schema_id = self.schemas.len() as u16 + 1;
214        assert!(self
215            .schemas
216            .insert(schema.clone(), next_schema_id)
217            .is_none());
218        self.chunkin_time()?.write_schema(next_schema_id, schema)?;
219        Ok(next_schema_id)
220    }
221
222    /// Write the given message (and its provided channel, if needed).
223    pub fn write(&mut self, message: &Message<'a>) -> McapResult<()> {
224        let channel_id = self.add_channel(&message.channel)?;
225        let header = MessageHeader {
226            channel_id,
227            sequence: message.sequence,
228            log_time: message.log_time,
229            publish_time: message.publish_time,
230        };
231        let data: &[u8] = &message.data;
232        self.write_to_known_channel(&header, data)
233    }
234
235    /// Write a message to an added channel, given its ID.
236    ///
237    /// This skips hash lookups of the channel and schema if you already added them.
238    pub fn write_to_known_channel(
239        &mut self,
240        header: &MessageHeader,
241        data: &[u8],
242    ) -> McapResult<()> {
243        // The number of channels should be relatively small,
244        // do a quick linear search to make sure we're not being given a bogus ID
245        if !self.channels.values().any(|id| *id == header.channel_id) {
246            return Err(McapError::UnknownChannel(
247                header.sequence,
248                header.channel_id,
249            ));
250        }
251
252        self.stats.message_count += 1;
253        self.stats.message_start_time = match self.stats.message_start_time {
254            0 => header.log_time,
255            nz => nz.min(header.log_time),
256        };
257        self.stats.message_end_time = match self.stats.message_end_time {
258            0 => header.log_time,
259            nz => nz.max(header.log_time),
260        };
261        *self
262            .stats
263            .channel_message_counts
264            .entry(header.channel_id)
265            .or_insert(0) += 1;
266
267        self.chunkin_time()?.write_message(header, data)?;
268        Ok(())
269    }
270
271    pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
272        self.stats.attachment_count += 1;
273
274        let header = records::AttachmentHeader {
275            log_time: attachment.log_time,
276            create_time: attachment.create_time,
277            name: attachment.name.clone(),
278            content_type: attachment.content_type.clone(),
279            data_len: attachment.data.len() as u64,
280        };
281
282        // Attachments don't live in chunks.
283        let w = self.finish_chunk()?;
284
285        let offset = w.stream_position()?;
286
287        write_record(
288            w,
289            &Record::Attachment {
290                header,
291                data: &attachment.data,
292            },
293        )?;
294
295        let length = w.stream_position()? - offset;
296        self.attachment_indexes.push(records::AttachmentIndex {
297            offset,
298            length,
299            log_time: attachment.log_time,
300            create_time: attachment.create_time,
301            data_size: attachment.data.len() as u64,
302            name: attachment.name.clone(),
303            content_type: attachment.content_type.clone(),
304        });
305
306        Ok(())
307    }
308
309    pub fn write_metadata(&mut self, metadata: &Metadata) -> McapResult<()> {
310        self.stats.metadata_count += 1;
311
312        let w = self.finish_chunk()?;
313        let offset = w.stream_position()?;
314
315        // Should we specialize this to avoid taking a clone of the map?
316        write_record(w, &Record::Metadata(metadata.clone()))?;
317
318        let length = w.stream_position()? - offset;
319
320        self.metadata_indexes.push(records::MetadataIndex {
321            offset,
322            length,
323            name: metadata.name.clone(),
324        });
325
326        Ok(())
327    }
328
329    /// Finishes the current chunk, if we have one, and flushes the underlying
330    /// [writer](Write).
331    ///
332    /// We finish the chunk to guarantee that the file can be streamed by future
333    /// readers at least up to this point.
334    /// (The alternative is to just flush the writer mid-chunk.
335    /// But if we did that, and then writing was suddenly interrupted afterwards,
336    /// readers would have to try to recover a half-written chunk,
337    /// probably with an unfinished compresion stream.)
338    ///
339    /// Note that lossless compression schemes like LZ4 and Zstd improve
340    /// as they go, so larger chunks will tend to have better compression.
341    /// (Of course, this depends heavily on the entropy of what's being compressed!
342    /// A stream of zeroes will compress great at any chunk size, and a stream
343    /// of random data will compress terribly at any chunk size.)
344    pub fn flush(&mut self) -> McapResult<()> {
345        self.finish_chunk()?.flush()?;
346        Ok(())
347    }
348
349    /// `.expect()` message when we go to write and self.writer is `None`,
350    /// which should only happen when [`Writer::finish()`] was called.
351    const WHERE_WRITER: &'static str = "Trying to write a record on a finished MCAP";
352
353    /// Starts a new chunk if we haven't done so already.
354    fn chunkin_time(&mut self) -> McapResult<&mut ChunkWriter<W>> {
355        // Some Rust tricky: we can't move the writer out of self.writer,
356        // leave that empty for a bit, and then replace it with a ChunkWriter.
357        // (That would leave it in an unspecified state if we bailed here!)
358        // Instead briefly swap it out for a null writer while we set up the chunker
359        // The writer will only be None if finish() was called.
360        let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);
361
362        self.writer = Some(match prev_writer {
363            WriteMode::Raw(w) => {
364                // It's chunkin time.
365                self.stats.chunk_count += 1;
366                WriteMode::Chunk(ChunkWriter::new(w, self.compression)?)
367            }
368            chunk => chunk,
369        });
370
371        match &mut self.writer {
372            Some(WriteMode::Chunk(c)) => Ok(c),
373            _ => unreachable!(),
374        }
375    }
376
377    /// Finish the current chunk, if we have one.
378    fn finish_chunk(&mut self) -> McapResult<&mut W> {
379        // See above
380        let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);
381
382        self.writer = Some(match prev_writer {
383            WriteMode::Chunk(c) => {
384                let (w, index) = c.finish()?;
385                self.chunk_indexes.push(index);
386                WriteMode::Raw(w)
387            }
388            raw => raw,
389        });
390
391        match &mut self.writer {
392            Some(WriteMode::Raw(w)) => Ok(w),
393            _ => unreachable!(),
394        }
395    }
396
397    /// Finishes any current chunk and writes out the rest of the file.
398    ///
399    /// Subsequent calls to other methods will panic.
400    pub fn finish(&mut self) -> McapResult<()> {
401        if self.writer.is_none() {
402            // We already called finish().
403            // Maybe we're dropping after the user called it?
404            return Ok(());
405        }
406
407        // Finish any chunk we were working on and update stats, indexes, etc.
408        self.finish_chunk()?;
409
410        // Grab the writer - self.writer becoming None makes subsequent writes fail.
411        let mut writer = match self.writer.take() {
412            // We called finish_chunk() above, so we're back to raw writes for
413            // the summary section.
414            Some(WriteMode::Raw(w)) => w,
415            _ => unreachable!(),
416        };
417        let writer = &mut writer;
418
419        // We're done with the data secton!
420        write_record(writer, &Record::EndOfData(records::EndOfData::default()))?;
421
422        // Take all the data we need, swapping in empty containers.
423        // Without this, we get yelled at for moving things out of a mutable ref
424        // (&mut self).
425        // (We could get around all this noise by having finish() take self,
426        // but then it wouldn't be droppable _and_ finish...able.)
427        let mut stats = records::Statistics::default();
428        std::mem::swap(&mut stats, &mut self.stats);
429
430        let mut chunk_indexes = Vec::new();
431        std::mem::swap(&mut chunk_indexes, &mut self.chunk_indexes);
432
433        let mut attachment_indexes = Vec::new();
434        std::mem::swap(&mut attachment_indexes, &mut self.attachment_indexes);
435
436        let mut metadata_indexes = Vec::new();
437        std::mem::swap(&mut metadata_indexes, &mut self.metadata_indexes);
438
439        // Make some Schema and Channel lists for the summary section.
440        // Be sure to grab schema IDs for the channels from the schema hash map before we drain it!
441        struct ChannelSummary<'a> {
442            channel: Channel<'a>,
443            channel_id: u16,
444            schema_id: u16,
445        }
446
447        let mut all_channels: Vec<ChannelSummary<'_>> = self
448            .channels
449            .drain()
450            .map(|(channel, channel_id)| {
451                let schema_id = match &channel.schema {
452                    Some(s) => *self.schemas.get(s).unwrap(),
453                    None => 0,
454                };
455
456                ChannelSummary {
457                    channel,
458                    channel_id,
459                    schema_id,
460                }
461            })
462            .collect();
463        all_channels.sort_unstable_by_key(|cs| cs.channel_id);
464
465        let mut all_schemas: Vec<(Schema<'_>, u16)> = self.schemas.drain().collect();
466        all_schemas.sort_unstable_by_key(|(_, v)| *v);
467
468        let mut offsets = Vec::new();
469
470        let summary_start = writer.stream_position()?;
471
472        // Let's get a CRC of the summary section.
473        let mut ccw = CountingCrcWriter::new(writer);
474
475        fn posit<W: Write + Seek>(ccw: &mut CountingCrcWriter<W>) -> io::Result<u64> {
476            ccw.get_mut().stream_position()
477        }
478
479        // Write all schemas.
480        let schemas_start = summary_start;
481        for (schema, id) in all_schemas {
482            let header = records::SchemaHeader {
483                id,
484                name: schema.name,
485                encoding: schema.encoding,
486                data_len: schema.data.len() as u32,
487            };
488            let data = schema.data;
489
490            write_record(&mut ccw, &Record::Schema { header, data })?;
491        }
492        let schemas_end = posit(&mut ccw)?;
493        if schemas_end - schemas_start > 0 {
494            offsets.push(records::SummaryOffset {
495                group_opcode: op::SCHEMA,
496                group_start: schemas_start,
497                group_length: schemas_end - schemas_start,
498            });
499        }
500
501        // Write all channels.
502        let channels_start = schemas_end;
503        for cs in all_channels {
504            let rec = records::Channel {
505                id: cs.channel_id,
506                schema_id: cs.schema_id,
507                topic: cs.channel.topic,
508                message_encoding: cs.channel.message_encoding,
509                metadata: cs.channel.metadata,
510            };
511            write_record(&mut ccw, &Record::Channel(rec))?;
512        }
513        let channels_end = posit(&mut ccw)?;
514        if channels_end - channels_start > 0 {
515            offsets.push(records::SummaryOffset {
516                group_opcode: op::CHANNEL,
517                group_start: channels_start,
518                group_length: channels_end - channels_start,
519            });
520        }
521
522        // Write all chunk indexes.
523        let chunk_indexes_start = channels_end;
524        for index in chunk_indexes {
525            write_record(&mut ccw, &Record::ChunkIndex(index))?;
526        }
527        let chunk_indexes_end = posit(&mut ccw)?;
528        if chunk_indexes_end - chunk_indexes_start > 0 {
529            offsets.push(records::SummaryOffset {
530                group_opcode: op::CHUNK_INDEX,
531                group_start: chunk_indexes_start,
532                group_length: chunk_indexes_end - chunk_indexes_start,
533            });
534        }
535
536        // ...and attachment indexes
537        let attachment_indexes_start = chunk_indexes_end;
538        for index in attachment_indexes {
539            write_record(&mut ccw, &Record::AttachmentIndex(index))?;
540        }
541        let attachment_indexes_end = posit(&mut ccw)?;
542        if attachment_indexes_end - attachment_indexes_start > 0 {
543            offsets.push(records::SummaryOffset {
544                group_opcode: op::ATTACHMENT_INDEX,
545                group_start: attachment_indexes_start,
546                group_length: attachment_indexes_end - attachment_indexes_start,
547            });
548        }
549
550        // ...and metadata indexes
551        let metadata_indexes_start = attachment_indexes_end;
552        for index in metadata_indexes {
553            write_record(&mut ccw, &Record::MetadataIndex(index))?;
554        }
555        let metadata_indexes_end = posit(&mut ccw)?;
556        if metadata_indexes_end - metadata_indexes_start > 0 {
557            offsets.push(records::SummaryOffset {
558                group_opcode: op::METADATA_INDEX,
559                group_start: metadata_indexes_start,
560                group_length: metadata_indexes_end - metadata_indexes_start,
561            });
562        }
563
564        let stats_start = metadata_indexes_end;
565        write_record(&mut ccw, &Record::Statistics(stats))?;
566        let stats_end = posit(&mut ccw)?;
567        assert!(stats_end > stats_start);
568        offsets.push(records::SummaryOffset {
569            group_opcode: op::STATISTICS,
570            group_start: stats_start,
571            group_length: stats_end - stats_start,
572        });
573
574        // Write the summary offsets we've been accumulating
575        let summary_offset_start = stats_end;
576        for offset in offsets {
577            write_record(&mut ccw, &Record::SummaryOffset(offset))?;
578        }
579
580        // Wat: the CRC in the footer _includes_ part of the footer.
581        op_and_len(&mut ccw, op::FOOTER, 20)?;
582        ccw.write_u64::<LE>(summary_start)?;
583        ccw.write_u64::<LE>(summary_offset_start)?;
584
585        let (writer, summary_crc) = ccw.finalize();
586
587        writer.write_u32::<LE>(summary_crc)?;
588
589        writer.write_all(MAGIC)?;
590        writer.flush()?;
591        Ok(())
592    }
593}
594
595impl<'a, W: Write + Seek> Drop for Writer<'a, W> {
596    fn drop(&mut self) {
597        self.finish().unwrap()
598    }
599}
600
601enum Compressor<W: Write> {
602    Null(W),
603    Zstd(zstd::Encoder<'static, W>),
604    Lz4(lz4::Encoder<W>),
605}
606
607impl<W: Write> Compressor<W> {
608    fn finish(self) -> io::Result<W> {
609        Ok(match self {
610            Compressor::Null(w) => w,
611            Compressor::Zstd(w) => w.finish()?,
612            Compressor::Lz4(w) => {
613                let (w, err) = w.finish();
614                err?;
615                w
616            }
617        })
618    }
619}
620
621impl<W: Write> Write for Compressor<W> {
622    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
623        match self {
624            Compressor::Null(w) => w.write(buf),
625            Compressor::Zstd(w) => w.write(buf),
626            Compressor::Lz4(w) => w.write(buf),
627        }
628    }
629
630    fn flush(&mut self) -> io::Result<()> {
631        match self {
632            Compressor::Null(w) => w.flush(),
633            Compressor::Zstd(w) => w.flush(),
634            Compressor::Lz4(w) => w.flush(),
635        }
636    }
637}
638
639struct ChunkWriter<W: Write> {
640    header_start: u64,
641    stream_start: u64,
642    header: records::ChunkHeader,
643    compressor: CountingCrcWriter<Compressor<W>>,
644    indexes: BTreeMap<u16, Vec<records::MessageIndexEntry>>,
645}
646
647impl<W: Write + Seek> ChunkWriter<W> {
648    fn new(mut writer: W, compression: Option<Compression>) -> McapResult<Self> {
649        let header_start = writer.stream_position()?;
650
651        op_and_len(&mut writer, op::CHUNK, !0)?;
652
653        let compression_name = match compression {
654            Some(Compression::Zstd) => "zstd",
655            Some(Compression::Lz4) => "lz4",
656            None => "",
657        };
658
659        let header = records::ChunkHeader {
660            message_start_time: 0,
661            message_end_time: 0,
662            uncompressed_size: !0,
663            uncompressed_crc: !0,
664            compression: String::from(compression_name),
665            compressed_size: !0,
666        };
667
668        writer.write_le(&header)?;
669        let stream_start = writer.stream_position()?;
670
671        let compressor = match compression {
672            Some(Compression::Zstd) => {
673                let mut enc = zstd::Encoder::new(writer, 0)?;
674                enc.multithread(num_cpus::get_physical() as u32)?;
675                Compressor::Zstd(enc)
676            }
677            Some(Compression::Lz4) => {
678                let b = lz4::EncoderBuilder::new();
679                Compressor::Lz4(b.build(writer)?)
680            }
681            None => Compressor::Null(writer),
682        };
683        let compressor = CountingCrcWriter::new(compressor);
684        Ok(Self {
685            compressor,
686            header_start,
687            stream_start,
688            header,
689            indexes: BTreeMap::new(),
690        })
691    }
692
693    fn write_schema(&mut self, id: u16, schema: &Schema) -> McapResult<()> {
694        let header = records::SchemaHeader {
695            id,
696            name: schema.name.clone(),
697            encoding: schema.encoding.clone(),
698            data_len: schema.data.len() as u32,
699        };
700        write_record(
701            &mut self.compressor,
702            &Record::Schema {
703                header,
704                data: Cow::Borrowed(&schema.data),
705            },
706        )?;
707        Ok(())
708    }
709
710    fn write_channel(&mut self, id: u16, schema_id: u16, chan: &Channel) -> McapResult<()> {
711        assert_eq!(schema_id == 0, chan.schema.is_none());
712
713        let rec = records::Channel {
714            id,
715            schema_id,
716            topic: chan.topic.clone(),
717            message_encoding: chan.message_encoding.clone(),
718            metadata: chan.metadata.clone(),
719        };
720
721        write_record(&mut self.compressor, &Record::Channel(rec))?;
722        Ok(())
723    }
724
725    fn write_message(&mut self, header: &MessageHeader, data: &[u8]) -> McapResult<()> {
726        // Update min/max time
727        self.header.message_start_time = match self.header.message_start_time {
728            0 => header.log_time,
729            nz => nz.min(header.log_time),
730        };
731        self.header.message_end_time = match self.header.message_end_time {
732            0 => header.log_time,
733            nz => nz.max(header.log_time),
734        };
735
736        // Add an index for this message
737        self.indexes
738            .entry(header.channel_id)
739            .or_default()
740            .push(records::MessageIndexEntry {
741                log_time: header.log_time,
742                offset: self.compressor.position(),
743            });
744
745        write_record(
746            &mut self.compressor,
747            &Record::Message {
748                header: *header,
749                data: Cow::Borrowed(data),
750            },
751        )?;
752        Ok(())
753    }
754
755    fn finish(mut self) -> McapResult<(W, records::ChunkIndex)> {
756        // Get the number of uncompressed bytes written and the CRC.
757        self.header.uncompressed_size = self.compressor.position();
758        let (stream, crc) = self.compressor.finalize();
759        self.header.uncompressed_crc = crc;
760
761        // Finalize the compression stream - it maintains an internal buffer.
762        let mut writer = stream.finish()?;
763        let end_of_stream = writer.stream_position()?;
764        self.header.compressed_size = end_of_stream - self.stream_start;
765        let record_size = (end_of_stream - self.header_start) as usize - 9; // 1 byte op, 8 byte len
766
767        // Back up, write our finished header, then continue at the end of the stream.
768        writer.seek(SeekFrom::Start(self.header_start))?;
769        op_and_len(&mut writer, op::CHUNK, record_size)?;
770        writer.write_le(&self.header)?;
771        assert_eq!(self.stream_start, writer.stream_position()?);
772        assert_eq!(writer.seek(SeekFrom::End(0))?, end_of_stream);
773
774        // Write our message indexes
775        let mut message_index_offsets: BTreeMap<u16, u64> = BTreeMap::new();
776
777        let mut index_buf = Vec::new();
778        for (channel_id, records) in self.indexes {
779            assert!(message_index_offsets
780                .insert(channel_id, writer.stream_position()?)
781                .is_none());
782            index_buf.clear();
783            let index = records::MessageIndex {
784                channel_id,
785                records,
786            };
787
788            Cursor::new(&mut index_buf).write_le(&index)?;
789            op_and_len(&mut writer, op::MESSAGE_INDEX, index_buf.len())?;
790            writer.write_all(&index_buf)?;
791        }
792        let end_of_indexes = writer.stream_position()?;
793
794        let index = records::ChunkIndex {
795            message_start_time: self.header.message_start_time,
796            message_end_time: self.header.message_end_time,
797            chunk_start_offset: self.header_start,
798            chunk_length: end_of_stream - self.header_start,
799            message_index_offsets,
800            message_index_length: end_of_indexes - end_of_stream,
801            compression: self.header.compression,
802            compressed_size: self.header.compressed_size,
803            uncompressed_size: self.header.uncompressed_size,
804        };
805
806        Ok((writer, index))
807    }
808}