1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
//! MCAP Stream
//!
//! This module contains utilities for writing to stream of MCAP bytes, commonly used for streaming
//! responses from HTTP frameworks such as Axum.
use std::{
fmt::Debug,
io::{self, Seek, SeekFrom, Write},
pin::Pin,
sync::Arc,
task::Poll,
};
use tokio::sync::mpsc::{Receiver as TokioReceiver, Sender as TokioSender};
use bytes::{Bytes, BytesMut};
use futures::{Stream, ready};
use parking_lot::Mutex;
use crate::{ChannelBuilder, Context, FoxgloveError, McapWriteOptions, McapWriterHandle};
#[derive(Default)]
struct Inner {
buffer: BytesMut,
position: u64,
}
#[derive(Default, Clone)]
struct SharedBuffer(Arc<Mutex<Inner>>);
impl Debug for SharedBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedBuffer").finish_non_exhaustive()
}
}
impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let inner = &mut *self.0.lock();
inner.buffer.extend_from_slice(buf);
inner.position += buf.len() as u64;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Seek for SharedBuffer {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let inner = self.0.lock();
match pos {
SeekFrom::Start(n) if inner.position == n => Ok(n),
SeekFrom::Current(0) => Ok(inner.position),
_ => Err(std::io::Error::other("seek on unseekable file")),
}
}
}
/// Creates an [`McapStream`] and [`McapStreamHandle`] pair that can be used to encode logged
/// messages as a [`futures::Stream`] of MCAP bytes.
///
/// The returned [`McapStreamHandle`] can be used to create channels which will log messages to the
/// [`McapStream`]. [`McapStreamHandle::flush`] must be routinely called on the handle to push bytes
/// from the writer to the [`McapStream`]. When the recording is finished
/// [`McapStreamHandle::close`] must be called to ensure that all bytes have been flushed to the
/// [`McapStream`].
pub fn create_mcap_stream() -> (McapStreamHandle, McapStream) {
let buffer = SharedBuffer::default();
let context = Context::new();
let writer = context
.mcap_writer_with_options(McapWriteOptions::new().disable_seeking(true))
.create(buffer.clone())
.expect("writer has valid configuration");
let (sender, receiver) = tokio::sync::mpsc::channel(1);
let handle = McapStreamHandle {
buffer,
writer: Some(writer),
sender,
context,
};
(handle, McapStream { receiver })
}
/// A handle to an MCAP stream writer.
///
/// When this handle is dropped, the writer will unregister from the [`Context`] and stop logging
/// events. It will attempt to flush any buffered data but may fail if the [`McapStream`] is
/// currently full.
///
/// To ensure no data is lost, call the [`McapStreamHandle::close`] method instead of dropping.
#[must_use]
#[derive(Debug)]
pub struct McapStreamHandle {
writer: Option<McapWriterHandle<SharedBuffer>>,
buffer: SharedBuffer,
sender: TokioSender<BytesMut>,
context: Arc<Context>,
}
impl McapStreamHandle {
/// Returns a channel builder for a channel in the stream writer.
///
/// You should choose a unique topic name per channel for compatibility with the Foxglove app.
pub fn channel_builder(&self, topic: impl Into<String>) -> ChannelBuilder {
self.context.channel_builder(topic)
}
/// Stop logging events and flush any buffered data.
///
/// This method will return an error if the MCAP writer fails to finish or if the
/// [`McapStream`] has already been closed.
pub async fn close(mut self) -> Result<(), FoxgloveError> {
if let Some(writer) = self.writer.take() {
if let Err(e) = writer.close() {
// If an error occurred still flush the buffer. We'll likely get a truncated MCAP
// but anything that was successfully written will be there.
let _ = Self::flush_shared_buffer(&mut self.sender, &mut self.buffer).await;
return Err(e);
}
}
Ok(Self::flush_shared_buffer(&mut self.sender, &mut self.buffer).await?)
}
async fn flush_shared_buffer(
sender: &mut TokioSender<BytesMut>,
buffer: &mut SharedBuffer,
) -> io::Result<()> {
let bytes = {
let mut inner = buffer.0.lock();
inner.buffer.split()
};
if bytes.is_empty() {
return Ok(());
}
if sender.send(bytes).await.is_err() {
return Err(std::io::Error::other("McapStream channel was closed"));
}
Ok(())
}
/// Get the current size of the buffer.
///
/// This can be used in conjunction with [`McapStreamHandle::flush`] to ensure the buffer does
/// not grow unbounded.
pub fn buffer_size(&mut self) -> usize {
self.buffer.0.lock().buffer.len()
}
/// Flush the buffer from the MCAP writer to the [`McapStream`].
///
/// This method returns a future that will wait until the [`McapStream`] has capacity for the
/// flushed buffer.
pub async fn flush(&mut self) -> Result<(), FoxgloveError> {
Self::flush_shared_buffer(&mut self.sender, &mut self.buffer).await?;
Ok(())
}
}
impl Drop for McapStreamHandle {
fn drop(&mut self) {
if let Some(writer) = self.writer.take() {
if let Err(e) = writer.close() {
tracing::warn!("{e}");
}
}
let mut inner = self.buffer.0.lock();
let buffer = inner.buffer.split();
if !buffer.is_empty() {
// When the handle is dropped try and send the final buffer. If the channel is full or
// closed log as a warning.
if let Err(e) = self.sender.try_send(buffer) {
tracing::warn!("{e}");
}
}
}
}
/// A stream of MCAP bytes from a writer.
pub struct McapStream {
receiver: TokioReceiver<BytesMut>,
}
impl Stream for McapStream {
type Item = Bytes;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let Some(bytes) = ready!(self.receiver.poll_recv(cx)) else {
return Poll::Ready(None);
};
Poll::Ready(Some(bytes.freeze()))
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use std::convert::Infallible;
use crate::Encode;
use super::*;
struct Message {
data: f64,
}
impl Encode for Message {
type Error = Infallible;
fn get_schema() -> Option<crate::Schema> {
None
}
fn get_message_encoding() -> String {
"foo".to_string()
}
fn encode(&self, buf: &mut impl bytes::BufMut) -> Result<(), Self::Error> {
buf.put_f64(self.data);
Ok(())
}
}
#[tokio::test]
async fn test_write_to_stream() {
let (mut handle, mut stream) = create_mcap_stream();
let channel = handle.channel_builder("/topic").build::<Message>();
// Use another thread to write messages to the stream
tokio::spawn(async move {
for i in 0..100 {
channel.log(&Message { data: i as f64 });
handle.flush().await.unwrap();
}
handle.close().await.unwrap();
});
let mut mcap_bytes = vec![];
// Consume the stream and write the output to a vector.
//
// This stream will commonly be returned from an Axum handler as a streaming response.
while let Some(bytes) = stream.next().await {
mcap_bytes.extend_from_slice(&bytes[..]);
}
// The stream produces a complete MCAP file.
// Verify by loading the summary from the file.
let summary = mcap::Summary::read(&mcap_bytes[..]).unwrap().unwrap();
let stats = summary.stats.unwrap();
assert_eq!(stats.message_count, 100);
assert_eq!(stats.channel_count, 1);
}
}