1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use smallvec::SmallVec;
use crate::metadata::Metadata;
use crate::{ChannelId, FoxgloveError, RawChannel};
/// Uniquely identifies a [`Sink`] in the context of this program.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct SinkId(NonZeroU64);
impl SinkId {
/// Returns a new SinkId
pub fn new(id: NonZeroU64) -> Self {
Self(id)
}
/// Allocates the next sink ID.
pub fn next() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
// SAFETY: NEXT_ID starts at 1 and only increments, so it's never zero
let non_zero_id = unsafe { NonZeroU64::new_unchecked(id) };
Self::new(non_zero_id)
}
}
impl std::fmt::Display for SinkId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<SinkId> for u64 {
fn from(id: SinkId) -> Self {
id.0.get()
}
}
impl From<SinkId> for NonZeroU64 {
fn from(id: SinkId) -> Self {
id.0
}
}
/// A [`Sink`] writes a message from a channel to a destination.
///
/// Sinks are thread-safe and can be shared between threads. Usually you'd use our implementations
/// like [`McapWriter`](crate::McapWriter) or [`WebSocketServer`](crate::WebSocketServer).
#[doc(hidden)]
pub trait Sink: Send + Sync {
/// Returns the sink's unique ID.
fn id(&self) -> SinkId;
/// Writes the message for the channel to the sink.
///
/// Metadata contains optional message metadata that may be used by some sink implementations.
fn log(
&self,
channel: &RawChannel,
msg: &[u8],
metadata: &Metadata,
) -> Result<(), FoxgloveError>;
/// Called when new channels are made available within the [`Context`][ctx].
///
/// Sinks can track channels seen, and do new channel-related things the first time they see a
/// channel, rather than in this method. The choice is up to the implementor.
///
/// When the sink is first registered with a context, this callback is automatically invoked
/// with each of the channels registered to that context.
///
/// For sinks that manage their channel subscriptions dynamically, note that it is NOT safe to
/// call [`Context::subscribe_channels`][sub] from the context of this callback. If the sink
/// wants to subscribe to channels immediately, it may return a list of corresponding channel
/// IDs.
///
/// For sinks that [auto-subscribe][Sink::auto_subscribe] to all channels, the return value of
/// this method is ignored.
///
/// [ctx]: crate::Context
/// [sub]: crate::Context::subscribe_channels
fn add_channels(&self, _channel: &[&Arc<RawChannel>]) -> Option<Vec<ChannelId>> {
None
}
/// Called when a new channel is made available within the [`Context`][crate::Context].
///
/// See [`Sink::add_channels`] for additional details.
///
/// For sinks that manage their channel subscriptions dynamically, this function may return
/// true to immediately subscribe to the channel.
#[doc(hidden)]
fn add_channel(&self, channel: &Arc<RawChannel>) -> bool {
self.add_channels(&[channel])
.is_some_and(|ids| ids.contains(&channel.id()))
}
/// Called when a channel is unregistered from the [`Context`][ctx].
///
/// Sinks can clean up any channel-related state they have or take other actions.
///
/// For sinks that manage their channel subscriptions dynamically, it is not necessary to call
/// [`Context::unsubscribe_channels`][unsub] for this sink; subscriptions for a channel are
/// automatically removed when that channel is removed.
///
/// [ctx]: crate::Context
/// [unsub]: crate::Context::unsubscribe_channels
fn remove_channel(&self, _channel: &RawChannel) {}
/// Indicates whether this sink automatically subscribes to all channels.
///
/// The default implementation returns true.
///
/// A sink implementation may return false to indicate that it intends to manage its
/// subscriptions dynamically using [`Sink::add_channel`],
/// [`Context::subscribe_channels`][sub], and [`Context::unsubscribe_channels`][unsub].
///
/// [sub]: crate::Context::subscribe_channels
/// [unsub]: crate::Context::unsubscribe_channels
fn auto_subscribe(&self) -> bool {
true
}
}
/// A small group of sinks.
///
/// We use a [`SmallVec`] to improve cache locality and reduce heap allocations when working with a
/// small number of sinks, which is typically the case.
pub(crate) type SmallSinkVec = SmallVec<[Arc<dyn Sink>; 6]>;