1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
//! Websocket server
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use crate::sink_channel_filter::{SinkChannelFilter, SinkChannelFilterFn};
use crate::websocket::service::Service;
use crate::websocket::PlaybackState;
#[cfg(feature = "tls")]
use crate::websocket::TlsIdentity;
use crate::websocket::{
create_server, AssetHandler, AsyncAssetHandlerFn, BlockingAssetHandlerFn, Capability, Client,
ConnectionGraph, Parameter, Server, ServerOptions, ShutdownHandle, Status,
};
use crate::{get_runtime_handle, AppUrl, ChannelDescriptor, Context, FoxgloveError};
/// A WebSocket server for live visualization in Foxglove.
///
/// After your server is started, you can open the Foxglove app to visualize your data. See [Connecting to data].
///
/// ### Buffering
///
/// Logged messages are queued in a channel for each client and delivered in a background task. If a
/// queue fills, perhaps because of a slow client, then the oldest messages will be dropped. The
/// queue size is configurable with [`WebSocketServer::message_backlog_size`] when creating the
/// server.
///
/// Other protocol messages, including status updates, are delivered from a separate "control"
/// queue, using the same configured queue size. If the control queue fills, then the slow client is
/// dropped.
///
/// [Connecting to data]: https://docs.foxglove.dev/docs/connecting-to-data/introduction
#[must_use]
#[derive(Debug)]
pub struct WebSocketServer {
host: String,
port: u16,
options: ServerOptions,
context: Arc<Context>,
}
impl Default for WebSocketServer {
fn default() -> Self {
let options = ServerOptions {
session_id: Some(Server::generate_session_id()),
..ServerOptions::default()
};
Self {
host: "127.0.0.1".into(),
port: 8765,
options,
context: Context::get_default(),
}
}
}
impl WebSocketServer {
/// Creates a new websocket server with default options.
pub fn new() -> Self {
Self::default()
}
/// Set the websocket server name to advertise to clients.
///
/// By default, the server is not given a name.
pub fn name(mut self, name: impl Into<String>) -> Self {
self.options.name = Some(name.into());
self
}
/// Bind a TCP port.
///
/// `port` may be 0, in which case an available port will be automatically selected.
///
/// By default, the server will bind to `127.0.0.1:8765`.
pub fn bind(mut self, host: impl Into<String>, port: u16) -> Self {
self.host = host.into();
self.port = port;
self
}
/// Sets a [`SinkChannelFilter`] for connected clients.
///
/// The filter is a function that takes a channel and returns a boolean indicating whether the
/// channel should be logged.
pub fn channel_filter(mut self, filter: Arc<dyn SinkChannelFilter>) -> Self {
self.options.channel_filter = Some(filter);
self
}
/// Sets a channel filter for connected clients. See [`SinkChannelFilter`] for more information.
pub fn channel_filter_fn(
mut self,
filter: impl Fn(&ChannelDescriptor) -> bool + Sync + Send + 'static,
) -> Self {
self.options.channel_filter = Some(Arc::new(SinkChannelFilterFn(filter)));
self
}
/// Configure TLS with a PEM-formatted x509 certificate chain and pkcs8 private key.
/// If enabled, the server will only accept connections using wss://.
/// If TLS configuration fails, starting the server will result in an error.
#[doc(hidden)]
#[cfg(feature = "tls")]
pub fn tls(mut self, tls_identity: TlsIdentity) -> Self {
self.options.tls_identity = Some(tls_identity);
self
}
/// Sets the server capabilities to advertise to the client.
///
/// By default, the server does not advertise any capabilities.
pub fn capabilities(mut self, capabilities: impl IntoIterator<Item = Capability>) -> Self {
self.options.capabilities = Some(capabilities.into_iter().collect());
self
}
/// Sets server metadata.
#[doc(hidden)]
pub fn server_info(mut self, info: HashMap<String, String>) -> Self {
self.options.server_info = Some(info);
self
}
/// Declare the time range for playback, in absolute nanoseconds. This applies if the server is playing back a fixed time range of data.
/// This will add the PlaybackControl capability to the server.
pub fn playback_time_range(mut self, start_time: u64, end_time: u64) -> Self {
self.options.playback_time_range = Some((start_time, end_time));
self
}
/// Configure an event listener to receive client message events.
pub fn listener(mut self, listener: Arc<dyn crate::websocket::ServerListener>) -> Self {
self.options.listener = Some(listener);
self
}
/// Configure the handler for fetching assets.
/// There can only be one asset handler, exclusive with the other fetch_asset_handler methods.
pub fn fetch_asset_handler(mut self, handler: Box<dyn AssetHandler>) -> Self {
self.options.fetch_asset_handler = Some(handler);
self
}
/// Configure a synchronous, blocking function as a fetch asset handler.
/// There can only be one asset handler, exclusive with the other fetch_asset_handler methods.
pub fn fetch_asset_handler_blocking_fn<F, T, Err>(mut self, handler: F) -> Self
where
F: Fn(Client, String) -> Result<T, Err> + Send + Sync + 'static,
T: AsRef<[u8]>,
Err: Display,
{
self.options.fetch_asset_handler =
Some(Box::new(BlockingAssetHandlerFn(Arc::new(handler))));
self
}
/// Configure an asynchronous function as a fetch asset handler.
/// There can only be one asset handler, exclusive with the other fetch_asset_handler methods.
pub fn fetch_asset_handler_async_fn<F, Fut, T, Err>(mut self, handler: F) -> Self
where
F: Fn(Client, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<T, Err>> + Send + 'static,
T: AsRef<[u8]>,
Err: Display,
{
self.options.fetch_asset_handler = Some(Box::new(AsyncAssetHandlerFn(Arc::new(handler))));
self
}
/// Set the message backlog size.
///
/// The server buffers outgoing log entries into a queue. If the backlog size is exceeded, the
/// oldest entries will be dropped.
///
/// By default, the server will buffer 1024 messages.
pub fn message_backlog_size(mut self, size: usize) -> Self {
self.options.message_backlog_size = Some(size);
self
}
/// Configure the set of services to advertise to clients.
///
/// Automatically adds [`Capability::Services`] to the set of advertised capabilities.
///
/// Note that services can by dynamically registered and unregistered later using
/// [`WebSocketServerHandle::add_services`] and [`WebSocketServerHandle::remove_services`].
pub fn services(mut self, services: impl IntoIterator<Item = Service>) -> Self {
self.options.services.clear();
for service in services {
let name = service.name().to_string();
if let Some(s) = self.options.services.insert(name.clone(), service) {
tracing::warn!("Redefining service {}", s.name());
}
}
self
}
/// Configure the set of supported encodings for client requests.
///
/// This is used for both client-side publishing as well as service call request/responses.
pub fn supported_encodings(
mut self,
encodings: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.options.supported_encodings = Some(encodings.into_iter().map(|e| e.into()).collect());
self
}
/// Set a session ID.
///
/// This allows the client to understand if the connection is a re-connection or if it is
/// connecting to a new server instance. This can for example be a timestamp or a UUID.
///
/// By default, this is set to the number of milliseconds since the unix epoch.
pub fn session_id(mut self, id: impl Into<String>) -> Self {
self.options.session_id = Some(id.into());
self
}
/// Configure the tokio runtime for the server to use for async tasks.
///
/// By default, the server will use either the current runtime (if started with
/// [`WebSocketServer::start`]), or spawn its own internal runtime (if started with
/// [`WebSocketServer::start_blocking`]).
#[doc(hidden)]
pub fn tokio_runtime(mut self, handle: &tokio::runtime::Handle) -> Self {
self.options.runtime = Some(handle.clone());
self
}
/// Sets the context for this sink.
pub fn context(mut self, ctx: &Arc<Context>) -> Self {
self.context = ctx.clone();
self
}
/// Starts the websocket server.
///
/// Returns a handle that can optionally be used to gracefully shutdown the server. The caller
/// can safely drop the handle, and the server will run forever.
pub async fn start(self) -> Result<WebSocketServerHandle, FoxgloveError> {
let server = create_server(&self.context, self.options)?;
let addr = server.start(&self.host, self.port).await?;
Ok(WebSocketServerHandle(server, addr))
}
/// Starts the websocket server.
///
/// Returns a handle that can optionally be used to gracefully shutdown the server. The caller
/// can safely drop the handle, and the server will run forever.
///
/// If you choose to use this blocking interface rather than [`WebSocketServer::start`],
/// the SDK will spawn a multi-threaded [tokio] runtime.
///
/// This method will panic if invoked from an asynchronous execution context. Use
/// [`WebSocketServer::start`] instead.
///
/// [tokio]: https://docs.rs/tokio/latest/tokio/
pub fn start_blocking(mut self) -> Result<WebSocketServerHandle, FoxgloveError> {
let runtime = self
.options
.runtime
.get_or_insert_with(get_runtime_handle)
.clone();
let handle = runtime.block_on(self.start())?;
Ok(handle)
}
}
/// A handle to the websocket server.
///
/// This handle can safely be dropped and the server will run forever.
pub struct WebSocketServerHandle(Arc<Server>, SocketAddr);
impl Debug for WebSocketServerHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("WebSocketServerHandle").finish()
}
}
impl WebSocketServerHandle {
/// Returns the local port that the server is listening on.
pub fn port(&self) -> u16 {
self.1.port()
}
/// Returns the number of currently connected clients.
pub fn client_count(&self) -> usize {
self.0.client_count()
}
/// Returns an app URL to open the websocket as a data source.
pub fn app_url(&self) -> AppUrl {
let protocol = if self.0.is_tls_configured() {
"wss"
} else {
"ws"
};
AppUrl::new().with_websocket(format!("{protocol}://{}:{}", self.1.ip(), self.1.port()))
}
/// Advertises support for the provided services.
///
/// These services will be available for clients to use until they are removed with
/// [`remove_services`][WebSocketServerHandle::remove_services].
///
/// This method will fail if the server was not configured with [`Capability::Services`].
pub fn add_services(
&self,
services: impl IntoIterator<Item = Service>,
) -> Result<(), FoxgloveError> {
self.0.add_services(services.into_iter().collect())
}
/// Removes services that were previously advertised.
pub fn remove_services(&self, names: impl IntoIterator<Item = impl AsRef<str>>) {
self.0.remove_services(names);
}
/// Publishes the current server timestamp to all clients.
///
/// Requires the [`Time`](crate::websocket::Capability::Time) capability.
pub fn broadcast_time(&self, timestamp_nanos: u64) {
self.0.broadcast_time(timestamp_nanos);
}
/// Publish the current playback state to all clients.
///
/// Requires the [`PlaybackControl`](crate::websocket::Capability::PlaybackControl) capability.
pub fn broadcast_playback_state(&self, playback_state: PlaybackState) {
self.0.broadcast_playback_state(playback_state);
}
/// Sets a new session ID and notifies all clients, causing them to reset their state.
/// If no session ID is provided, generates a new one based on the current timestamp.
pub fn clear_session(&self, new_session_id: Option<String>) {
self.0.clear_session(new_session_id);
}
/// Publishes parameter values to all subscribed clients.
pub fn publish_parameter_values(&self, parameters: Vec<Parameter>) {
self.0.publish_parameter_values(parameters);
}
/// Publishes a status message to all clients.
///
/// This can be used to communicate information, warnings, and errors to the Foxglove app. An
/// ID may be included in the status to later remove it by referencing that ID.
pub fn publish_status(&self, status: Status) {
self.0.publish_status(status);
}
/// Removes status messages by id from all clients.
pub fn remove_status(&self, status_ids: Vec<String>) {
self.0.remove_status(status_ids);
}
/// Publishes a [ConnectionGraph] update to all subscribed clients.
///
/// Requires the [`ConnectionGraph`](crate::websocket::Capability::ConnectionGraph) capability.
///
/// The update is published as a difference from the current graph to replacement_graph.
/// When a client first subscribes to connection graph updates, it receives the current graph.
pub fn publish_connection_graph(
&self,
replacement_graph: ConnectionGraph,
) -> Result<(), FoxgloveError> {
self.0.replace_connection_graph(replacement_graph)
}
/// Gracefully shut down the websocket server.
///
/// Returns a handle that can be used to wait for the graceful shutdown to complete. If the
/// handle is dropped, all client tasks will be immediately aborted.
pub fn stop(self) -> ShutdownHandle {
self.0.stop().expect("this wrapper can only call stop once")
}
}