1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
//! MCAP writer
use std::fs::File;
use std::io::{BufWriter, Seek};
use std::path::Path;
use std::sync::{Arc, Weak};
use std::{fmt::Debug, io::Write};
use crate::library_version::get_library_version;
use crate::sink_channel_filter::SinkChannelFilterFn;
use crate::{ChannelDescriptor, Context, FoxgloveError, Sink, SinkChannelFilter};
/// An attachment to store in an MCAP file.
///
/// Attachments are arbitrary binary data that can be stored alongside messages.
/// Common uses include storing configuration files, calibration data, or other
/// reference material related to the recording.
pub use mcap::Attachment as McapAttachment;
/// Compression options for content in an MCAP file
pub use mcap::Compression as McapCompression;
/// Options for use with an [`McapWriter`][crate::McapWriter].
pub use mcap::WriteOptions as McapWriteOptions;
mod mcap_sink;
use mcap_sink::McapSink;
/// An MCAP writer for logging events.
///
/// ### Buffering
///
/// Logged messages are buffered in a [`BufWriter`]. When the writer is dropped, the buffered
/// messages are flushed to the writer and the writer is closed.
#[must_use]
#[derive(Clone)]
pub struct McapWriter {
options: McapWriteOptions,
context: Arc<Context>,
channel_filter: Option<Arc<dyn SinkChannelFilter>>,
}
impl Debug for McapWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("McapWriter")
.field("options", &self.options)
.field("context", &self.context)
.finish_non_exhaustive()
}
}
impl From<McapWriteOptions> for McapWriter {
fn from(value: McapWriteOptions) -> Self {
let options = value.library(get_library_version());
Self {
options,
context: Context::get_default(),
channel_filter: None,
}
}
}
impl Default for McapWriter {
fn default() -> Self {
Self::from(McapWriteOptions::default())
}
}
impl McapWriter {
/// Instantiates a new MCAP writer with default options.
pub fn new() -> Self {
Self::default()
}
/// Instantiates a new MCAP writer with the provided options.
/// The library option is ignored.
pub fn with_options(options: McapWriteOptions) -> Self {
options.into()
}
/// Sets the context for this sink.
#[doc(hidden)]
pub fn context(mut self, ctx: &Arc<Context>) -> Self {
self.context = ctx.clone();
self
}
/// Sets a [`SinkChannelFilter`] for this file.
pub fn channel_filter(mut self, filter: Arc<dyn SinkChannelFilter>) -> Self {
self.channel_filter = Some(filter);
self
}
/// Sets a channel filter for this file. See [`SinkChannelFilter`] for more information.
pub fn channel_filter_fn(
mut self,
filter: impl Fn(&ChannelDescriptor) -> bool + Sync + Send + 'static,
) -> Self {
self.channel_filter = Some(Arc::new(SinkChannelFilterFn(filter)));
self
}
/// Begins logging events to the specified writer.
///
/// Returns a handle. When the handle is dropped, the recording will be flushed to the writer
/// and closed. Alternatively, the caller may choose to call [`McapWriterHandle::close`] to
/// manually flush the recording and recover the writer.
pub fn create<W>(self, writer: W) -> Result<McapWriterHandle<W>, FoxgloveError>
where
W: Write + Seek + Send + 'static,
{
let sink = McapSink::new(writer, self.options, self.channel_filter)?;
self.context.add_sink(sink.clone());
Ok(McapWriterHandle {
sink,
context: Arc::downgrade(&self.context),
})
}
/// Creates a new write-only buffered file, and begins logging events to it.
///
/// If the file already exists, this call will fail with
/// [`AlreadyExists`](`std::io::ErrorKind::AlreadyExists`).
///
/// If you want more control over how the file is opened, or you want to write to something
/// other than a file, use [`McapWriter::create`].
pub fn create_new_buffered_file<P>(
self,
path: P,
) -> Result<McapWriterHandle<BufWriter<File>>, FoxgloveError>
where
P: AsRef<Path>,
{
let file = File::create_new(path)?;
let writer = BufWriter::new(file);
self.create(writer)
}
}
/// A handle to an MCAP file writer.
///
/// When this handle is dropped, the writer will unregister from the [`Context`], stop logging
/// events, and flush any buffered data to the writer.
#[must_use]
#[derive(Debug)]
pub struct McapWriterHandle<W: Write + Seek + Send + 'static> {
sink: Arc<McapSink<W>>,
context: Weak<Context>,
}
impl<W: Write + Seek + Send + 'static> McapWriterHandle<W> {
/// Stops logging events, flushes buffered data, and returns the writer.
pub fn close(self) -> Result<W, FoxgloveError> {
// It's safe to unwrap the `Option<W>` because `McapWriterHandle` doesn't implement clone,
// and this method consumes self.
self.finish().map(|w| w.expect("not finished"))
}
fn finish(&self) -> Result<Option<W>, FoxgloveError> {
if let Some(context) = self.context.upgrade() {
context.remove_sink(self.sink.id());
}
self.sink.finish()
}
/// Writes MCAP metadata to the file.
///
/// If the metadata map is empty, this method returns early without writing anything.
///
/// # Arguments
/// * `name` - Name identifier for this metadata record
/// * `metadata` - Key-value pairs to store (empty map will be skipped)
///
pub fn write_metadata(
&self,
name: &str,
metadata: std::collections::BTreeMap<String, String>,
) -> Result<(), FoxgloveError> {
self.sink.write_metadata(name, metadata)
}
/// Writes an attachment to the MCAP file.
///
/// Attachments are arbitrary binary data that can be stored alongside messages.
/// Common uses include storing configuration files, calibration data, or other
/// reference material related to the recording.
///
/// # Example
/// ```no_run
/// use std::borrow::Cow;
/// use foxglove::{McapWriter, McapAttachment};
///
/// let mcap = McapWriter::new()
/// .create_new_buffered_file("test.mcap")
/// .expect("create failed");
///
/// mcap.attach(&McapAttachment {
/// log_time: 0,
/// create_time: 0,
/// name: "config.json".to_string(),
/// media_type: "application/json".to_string(),
/// data: Cow::Borrowed(br#"{"setting": true}"#),
/// }).expect("attach failed");
///
/// mcap.close().expect("close failed");
/// ```
pub fn attach(&self, attachment: &McapAttachment<'_>) -> Result<(), FoxgloveError> {
self.sink.attach(attachment)
}
}
impl<W: Write + Seek + Send + 'static> Drop for McapWriterHandle<W> {
fn drop(&mut self) {
if let Err(e) = self.finish() {
tracing::warn!("{e}");
}
}
}