1use std::fmt::Display;
2use std::future::{poll_fn, Future};
3use std::num::NonZeroUsize;
4use std::panic;
5use std::pin::{pin, Pin};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11use async_stream::stream;
12use axum::extract::{Path, Query, State};
13use axum::response::IntoResponse;
14use axum::Extension;
15use axum_extra::TypedHeader;
16use bytes::Bytes;
17use bytestring::ByteString;
18use derive_more::From;
19use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt};
20use http::{HeaderValue, StatusCode};
21use prometheus::IntGauge;
22use scopeguard::{defer, ScopeGuard};
23use serde::Deserialize;
24use spacetimedb::client::messages::{
25 serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer, SwitchedServerMessage, ToProtocol,
26};
27use spacetimedb::client::{
28 ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageExecutionError, MessageHandleError,
29 MeteredReceiver, Protocol,
30};
31use spacetimedb::host::module_host::ClientConnectedError;
32use spacetimedb::host::NoSuchModule;
33use spacetimedb::util::spawn_rayon;
34use spacetimedb::worker_metrics::WORKER_METRICS;
35use spacetimedb::Identity;
36use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
37use spacetimedb_datastore::execution_context::WorkloadType;
38use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
39use std::time::Instant;
40use tokio::sync::{mpsc, watch};
41use tokio::task::JoinHandle;
42use tokio::time::error::Elapsed;
43use tokio::time::{sleep_until, timeout};
44use tokio_tungstenite::tungstenite::Utf8Bytes;
45
46use crate::auth::SpacetimeAuth;
47use crate::util::serde::humantime_duration;
48use crate::util::websocket::{
49 CloseCode, CloseFrame, Message as WsMessage, WebSocketConfig, WebSocketStream, WebSocketUpgrade, WsError,
50};
51use crate::util::{NameOrIdentity, XForwardedFor};
52use crate::{log_and_500, ControlStateDelegate, NodeDelegate};
53
54#[allow(clippy::declare_interior_mutable_const)]
55pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_api::TEXT_PROTOCOL);
56#[allow(clippy::declare_interior_mutable_const)]
57pub const BIN_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_api::BIN_PROTOCOL);
58
59pub trait HasWebSocketOptions {
60 fn websocket_options(&self) -> WebSocketOptions;
61}
62
63impl<T: HasWebSocketOptions> HasWebSocketOptions for Arc<T> {
64 fn websocket_options(&self) -> WebSocketOptions {
65 (**self).websocket_options()
66 }
67}
68
69#[derive(Deserialize)]
70pub struct SubscribeParams {
71 pub name_or_identity: NameOrIdentity,
72}
73
74#[derive(Deserialize)]
75pub struct SubscribeQueryParams {
76 pub connection_id: Option<ConnectionIdForUrl>,
77 #[serde(default)]
78 pub compression: Compression,
79 #[serde(default)]
82 pub light: bool,
83}
84
85pub fn generate_random_connection_id() -> ConnectionId {
86 ConnectionId::from_le_byte_array(rand::random())
87}
88
89pub async fn handle_websocket<S>(
90 State(ctx): State<S>,
91 Path(SubscribeParams { name_or_identity }): Path<SubscribeParams>,
92 Query(SubscribeQueryParams {
93 connection_id,
94 compression,
95 light,
96 }): Query<SubscribeQueryParams>,
97 forwarded_for: Option<TypedHeader<XForwardedFor>>,
98 Extension(auth): Extension<SpacetimeAuth>,
99 ws: WebSocketUpgrade,
100) -> axum::response::Result<impl IntoResponse>
101where
102 S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions,
103{
104 if connection_id.is_some() {
105 log::debug!("The connection_id query parameter to the subscribe HTTP endpoint is internal and will be removed in a future version of SpacetimeDB.");
107 }
108
109 let connection_id = connection_id
110 .map(ConnectionId::from)
111 .unwrap_or_else(generate_random_connection_id);
112
113 if connection_id == ConnectionId::ZERO {
114 Err((
115 StatusCode::BAD_REQUEST,
116 "Invalid connection ID: the all-zeros ConnectionId is reserved.",
117 ))?;
118 }
119
120 let db_identity = name_or_identity.resolve(&ctx).await?;
121
122 let (res, ws_upgrade, protocol) =
123 ws.select_protocol([(BIN_PROTOCOL, Protocol::Binary), (TEXT_PROTOCOL, Protocol::Text)]);
124
125 let protocol = protocol.ok_or((StatusCode::BAD_REQUEST, "no valid protocol selected"))?;
126 let client_config = ClientConfig {
127 protocol,
128 compression,
129 tx_update_full: !light,
130 };
131
132 let database = ctx
136 .get_database_by_identity(&db_identity)
137 .unwrap()
138 .ok_or(StatusCode::NOT_FOUND)?;
139
140 let leader = ctx
141 .leader(database.id)
142 .await
143 .map_err(log_and_500)?
144 .ok_or(StatusCode::NOT_FOUND)?;
145
146 let identity_token = auth.creds.token().into();
147
148 let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?;
149
150 let client_id = ClientActorId {
151 identity: auth.identity,
152 connection_id,
153 name: ctx.client_actor_index().next_client_name(),
154 };
155
156 let ws_config = WebSocketConfig::default()
157 .max_message_size(Some(0x2000000))
158 .max_frame_size(None)
159 .accept_unmasked_frames(false);
160 let ws_opts = ctx.websocket_options();
161
162 tokio::spawn(async move {
163 let ws = match ws_upgrade.upgrade(ws_config).await {
164 Ok(ws) => ws,
165 Err(err) => {
166 log::error!("websocket: WebSocket init error: {err}");
167 return;
168 }
169 };
170
171 let identity = client_id.identity;
172 let client_log_string = match forwarded_for {
173 Some(TypedHeader(XForwardedFor(ip))) => {
174 format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}")
175 }
176 None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
177 };
178
179 log::debug!("websocket: New client connected from {client_log_string}");
180
181 let connected = match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await {
182 Ok(connected) => {
183 log::debug!("websocket: client_connected returned Ok for {client_log_string}");
184 connected
185 }
186 Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
187 log::info!(
188 "websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
189 );
190 return;
191 }
192 Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
193 log::warn!("websocket: ModuleHost died while {client_log_string} was connecting: {e:#}");
194 return;
195 }
196 };
197
198 log::debug!(
199 "websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection"
200 );
201
202 let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
203 let client =
204 ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await;
205
206 let message = IdentityTokenMessage {
212 identity: auth.identity,
213 token: identity_token,
214 connection_id,
215 };
216 if let Err(e) = client.send_message(message) {
217 log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}");
218 }
219 });
220
221 Ok(res)
222}
223
224struct ActorState {
225 pub client_id: ClientActorId,
226 pub database: Identity,
227 config: WebSocketOptions,
228 closed: AtomicBool,
229 got_pong: AtomicBool,
230}
231
232impl ActorState {
233 pub fn new(database: Identity, client_id: ClientActorId, config: WebSocketOptions) -> Self {
234 Self {
235 database,
236 client_id,
237 config,
238 closed: AtomicBool::new(false),
239 got_pong: AtomicBool::new(true),
240 }
241 }
242
243 pub fn closed(&self) -> bool {
244 self.closed.load(Ordering::Relaxed)
245 }
246
247 pub fn close(&self) -> bool {
248 self.closed.swap(true, Ordering::Relaxed)
249 }
250
251 pub fn set_ponged(&self) {
252 self.got_pong.store(true, Ordering::Relaxed);
253 }
254
255 pub fn reset_ponged(&self) -> bool {
256 self.got_pong.swap(false, Ordering::Relaxed)
257 }
258
259 pub fn next_idle_deadline(&self) -> Instant {
260 Instant::now() + self.config.idle_timeout
261 }
262}
263
264#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
266#[serde(rename_all = "kebab-case")]
267pub struct WebSocketOptions {
268 #[serde(with = "humantime_duration")]
275 #[serde(default = "WebSocketOptions::default_ping_interval")]
276 pub ping_interval: Duration,
277 #[serde(with = "humantime_duration")]
286 #[serde(default = "WebSocketOptions::default_idle_timeout")]
287 pub idle_timeout: Duration,
288 #[serde(with = "humantime_duration")]
293 #[serde(default = "WebSocketOptions::default_close_handshake_timeout")]
294 pub close_handshake_timeout: Duration,
295 #[serde(default = "WebSocketOptions::default_incoming_queue_length")]
301 pub incoming_queue_length: NonZeroUsize,
302}
303
304impl Default for WebSocketOptions {
305 fn default() -> Self {
306 Self::DEFAULT
307 }
308}
309
310impl WebSocketOptions {
311 const DEFAULT_PING_INTERVAL: Duration = Duration::from_secs(15);
312 const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
313 const DEFAULT_CLOSE_HANDSHAKE_TIMEOUT: Duration = Duration::from_millis(250);
314 const DEFAULT_INCOMING_QUEUE_LENGTH: NonZeroUsize = NonZeroUsize::new(2048).expect("2048 > 0, qed");
315
316 const DEFAULT: Self = Self {
317 ping_interval: Self::DEFAULT_PING_INTERVAL,
318 idle_timeout: Self::DEFAULT_IDLE_TIMEOUT,
319 close_handshake_timeout: Self::DEFAULT_CLOSE_HANDSHAKE_TIMEOUT,
320 incoming_queue_length: Self::DEFAULT_INCOMING_QUEUE_LENGTH,
321 };
322
323 const fn default_ping_interval() -> Duration {
324 Self::DEFAULT_PING_INTERVAL
325 }
326
327 const fn default_idle_timeout() -> Duration {
328 Self::DEFAULT_IDLE_TIMEOUT
329 }
330
331 const fn default_close_handshake_timeout() -> Duration {
332 Self::DEFAULT_CLOSE_HANDSHAKE_TIMEOUT
333 }
334
335 const fn default_incoming_queue_length() -> NonZeroUsize {
336 Self::DEFAULT_INCOMING_QUEUE_LENGTH
337 }
338}
339
340async fn ws_client_actor(
341 options: WebSocketOptions,
342 client: ClientConnection,
343 ws: WebSocketStream,
344 sendrx: MeteredReceiver<SerializableMessage>,
345) {
346 let mut client = scopeguard::guard(client, |client| {
348 tokio::spawn(client.disconnect());
349 });
350
351 ws_client_actor_inner(&mut client, options, ws, sendrx).await;
352
353 ScopeGuard::into_inner(client).disconnect().await;
354}
355
356async fn ws_client_actor_inner(
357 client: &mut ClientConnection,
358 config: WebSocketOptions,
359 ws: WebSocketStream,
360 sendrx: MeteredReceiver<SerializableMessage>,
361) {
362 let database = client.module.info().database_identity;
363 let client_id = client.id;
364 let client_closed_metric = WORKER_METRICS.ws_clients_closed_connection.with_label_values(&database);
365 let state = Arc::new(ActorState::new(database, client_id, config));
366
367 let (unordered_tx, unordered_rx) = mpsc::unbounded_channel();
369
370 let (ws_send, ws_recv) = ws.split();
372
373 let (idle_tx, idle_rx) = watch::channel(state.next_idle_deadline());
375 let idle_timer = ws_idle_timer(idle_rx);
376
377 let send_task = tokio::spawn(ws_send_loop(
380 state.clone(),
381 client.config,
382 ws_send,
383 sendrx,
384 unordered_rx,
385 ));
386 let recv_task = tokio::spawn(ws_recv_task(
388 state.clone(),
389 idle_tx,
390 client_closed_metric,
391 {
392 let client = client.clone();
393 move |data, timer| {
394 let client = client.clone();
395 async move { client.handle_message(data, timer).await }
396 }
397 },
398 unordered_tx.clone(),
399 ws_recv,
400 ));
401 let hotswap = {
402 let client = client.clone();
403 move || {
404 let mut client = client.clone();
405 async move { client.watch_module_host().await }
406 }
407 };
408
409 ws_main_loop(state, hotswap, idle_timer, send_task, recv_task, move |msg| {
410 let _ = unordered_tx.send(msg);
411 })
412 .await;
413 log::info!("Client connection ended: {client_id}");
414}
415
416async fn ws_main_loop<HotswapWatcher>(
520 state: Arc<ActorState>,
521 hotswap: impl Fn() -> HotswapWatcher,
522 idle_timer: impl Future<Output = ()>,
523 mut send_task: JoinHandle<()>,
524 mut recv_task: JoinHandle<()>,
525 unordered_tx: impl Fn(UnorderedWsMessage),
526) where
527 HotswapWatcher: Future<Output = Result<(), NoSuchModule>>,
528{
529 let abort_send = send_task.abort_handle();
531 let abort_recv = recv_task.abort_handle();
532 defer! {
533 abort_send.abort();
534 abort_recv.abort();
535 };
536 let mut ping_interval = tokio::time::interval(state.config.ping_interval);
538 let watch_hotswap = hotswap();
540
541 pin_mut!(watch_hotswap);
542 pin_mut!(idle_timer);
543
544 loop {
545 let closed = state.closed();
546
547 tokio::select! {
548 res = &mut send_task => {
561 if let Err(e) = res {
562 if e.is_panic() {
563 panic::resume_unwind(e.into_panic())
564 }
565 }
566 break;
567 },
568 res = &mut recv_task => {
569 if let Err(e) = res {
570 if e.is_panic() {
571 panic::resume_unwind(e.into_panic())
572 }
573 }
574 break;
575 },
576
577 _ = &mut idle_timer => {
579 log::warn!("Client {} timed out", state.client_id);
580 break;
581 },
582
583 res = &mut watch_hotswap, if !closed => {
588 if let Err(NoSuchModule) = res {
589 let close = CloseFrame {
590 code: CloseCode::Away,
591 reason: "module exited".into()
592 };
593 unordered_tx(close.into());
594 }
595 watch_hotswap.set(hotswap());
596 },
597
598 _ = ping_interval.tick(), if !closed => {
610 let was_ponged = state.reset_ponged();
611 if was_ponged {
612 unordered_tx(UnorderedWsMessage::Ping(Bytes::new()));
613 }
614 }
615 }
616 }
617}
618
619async fn ws_idle_timer(mut activity: watch::Receiver<Instant>) {
627 let mut deadline = *activity.borrow();
628 let sleep = sleep_until(deadline.into());
629 pin_mut!(sleep);
630
631 loop {
632 tokio::select! {
633 biased;
634
635 Ok(()) = activity.changed() => {
636 let new_deadline = *activity.borrow_and_update();
637 if new_deadline != deadline {
638 deadline = new_deadline;
639 sleep.as_mut().reset(deadline.into());
640 }
641 },
642
643 () = &mut sleep => {
644 break;
645 },
646 }
647 }
648}
649
650async fn ws_recv_task<MessageHandler>(
670 state: Arc<ActorState>,
671 idle_tx: watch::Sender<Instant>,
672 client_closed_metric: IntGauge,
673 message_handler: impl Fn(DataMessage, Instant) -> MessageHandler,
674 unordered_tx: mpsc::UnboundedSender<UnorderedWsMessage>,
675 ws: impl Stream<Item = Result<WsMessage, WsError>> + Unpin + Send + 'static,
676) where
677 MessageHandler: Future<Output = Result<(), MessageHandleError>>,
678{
679 let recv_queue = ws_recv_queue(state.clone(), unordered_tx.clone(), ws);
680 let recv_loop = pin!(ws_recv_loop(state.clone(), idle_tx, recv_queue));
681 let recv_handler = ws_client_message_handler(state.clone(), client_closed_metric, recv_loop);
682 pin_mut!(recv_handler);
683
684 while let Some((data, timer)) = recv_handler.next().await {
685 let result = message_handler(data, timer).await;
686 if let Err(e) = result {
687 if let MessageHandleError::Execution(err) = e {
688 log::error!("{err:#}");
689 if unordered_tx.send(err.into()).is_err() {
691 break;
692 }
693 continue;
694 }
695 log::debug!("Client caused error: {e}");
696 let close = CloseFrame {
697 code: CloseCode::Error,
698 reason: format!("{e:#}").into(),
699 };
700 if unordered_tx.send(close.into()).is_err() {
703 break;
704 };
705 }
706 }
707}
708
709fn ws_recv_loop(
720 state: Arc<ActorState>,
721 idle_tx: watch::Sender<Instant>,
722 mut ws: impl Stream<Item = Result<WsMessage, WsError>> + Unpin,
723) -> impl Stream<Item = ClientMessage> {
724 async fn next_message(
729 state: &ActorState,
730 ws: &mut (impl Stream<Item = Result<WsMessage, WsError>> + Unpin),
731 ) -> Option<Result<WsMessage, WsError>> {
732 if state.closed() {
733 log::trace!("drain websocket waiting for client close");
734 let res: Result<Option<Result<WsMessage, WsError>>, Elapsed> =
735 timeout(state.config.close_handshake_timeout, async {
736 while let Some(item) = ws.next().await {
737 match item {
738 Ok(message) => drop(message),
739 Err(e) => return Some(Err(e)),
740 }
741 }
742 None
743 })
744 .await;
745 match res {
746 Err(_elapsed) => {
747 log::warn!("timeout waiting for client close");
748 None
749 }
750 Ok(item) => item, }
752 } else {
753 log::trace!("await next client message without timeout");
754 ws.next().await
755 }
756 }
757
758 stream! {
759 loop {
760 let Some(res) = next_message(&state, &mut ws).await else {
761 log::trace!("recv stream exhausted");
762 break;
763 };
764 match res {
765 Ok(m) => {
766 idle_tx.send(state.next_idle_deadline()).ok();
767
768 if !state.closed() {
769 yield ClientMessage::from_message(m);
770 }
771 log::trace!("message received while already closed");
776 }
777 Err(e) => match e {
782 e @ (WsError::ConnectionClosed
783 | WsError::AlreadyClosed
784 | WsError::Io(_)
785 | WsError::Tls(_)
786 | WsError::Capacity(_)
787 | WsError::Protocol(_)
788 | WsError::WriteBufferFull(_)
789 | WsError::Utf8(_)
790 | WsError::AttackAttempt
791 | WsError::Url(_)
792 | WsError::Http(_)
793 | WsError::HttpFormat(_)) => {
794 log::warn!("Websocket receive error: {e}");
795 break;
796 }
797 },
798 }
799 }
800 }
801}
802
803fn ws_recv_queue(
817 state: Arc<ActorState>,
818 unordered_tx: mpsc::UnboundedSender<UnorderedWsMessage>,
819 mut ws: impl Stream<Item = Result<WsMessage, WsError>> + Unpin + Send + 'static,
820) -> impl Stream<Item = Result<WsMessage, WsError>> {
821 const CLOSE: UnorderedWsMessage = UnorderedWsMessage::Close(CloseFrame {
822 code: CloseCode::Again,
823 reason: Utf8Bytes::from_static("too many requests"),
824 });
825 let on_message_after_close = move |client_id| {
826 log::warn!("client {client_id} sent message after close or error");
827 };
828
829 let (tx, rx) = mpsc::channel(state.config.incoming_queue_length.get());
830 let rx = MeteredReceiverStream {
831 inner: MeteredReceiver::with_gauge(
832 rx,
833 WORKER_METRICS
834 .total_incoming_queue_length
835 .with_label_values(&state.database),
836 ),
837 };
838
839 tokio::spawn(async move {
840 while let Some(item) = ws.next().await {
841 if let Err(e) = tx.try_send(item) {
842 match e {
843 mpsc::error::TrySendError::Full(item) => {
845 if unordered_tx.send(CLOSE).is_err() {
854 state.close();
855 break;
856 }
857 if tx.send(item).await.is_err() {
868 on_message_after_close(state.client_id);
869 break;
870 }
871 }
872 mpsc::error::TrySendError::Closed(_item) => {
881 on_message_after_close(state.client_id);
882 break;
883 }
884 }
885 }
886 }
887 });
888
889 rx
890}
891
892struct MeteredReceiverStream<T> {
895 inner: MeteredReceiver<T>,
896}
897
898impl<T> Stream for MeteredReceiverStream<T> {
899 type Item = T;
900
901 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
902 self.inner.poll_recv(cx)
903 }
904}
905
906fn ws_client_message_handler(
916 state: Arc<ActorState>,
917 client_closed_metric: IntGauge,
918 mut messages: impl Stream<Item = ClientMessage> + Unpin,
919) -> impl Stream<Item = (DataMessage, Instant)> {
920 stream! {
921 while let Some(message) = messages.next().await {
922 match message {
923 ClientMessage::Message(message) => {
924 log::trace!("Received client message");
925 yield (message, Instant::now());
926 },
927 ClientMessage::Ping(_bytes) => {
928 log::trace!("Received ping from client {}", state.client_id);
929 },
932 ClientMessage::Pong(_bytes) => {
933 log::trace!("Received pong from client {}", state.client_id);
934 state.set_ponged();
935 },
936 ClientMessage::Close(close_frame) => {
937 log::trace!("Received Close frame from client {}: {:?}", state.client_id, close_frame);
938 let was_closed = state.close();
939 if !was_closed {
941 client_closed_metric.inc();
942 }
943 }
944 }
945 }
946 log::trace!("client message handler done");
947 }
948}
949
950#[derive(Debug, From)]
952enum UnorderedWsMessage {
953 Close(CloseFrame),
955 Ping(Bytes),
957 Error(MessageExecutionError),
962}
963
964async fn ws_send_loop(
986 state: Arc<ActorState>,
987 config: ClientConfig,
988 mut ws: impl Sink<WsMessage, Error: Display> + Unpin,
989 mut messages: MeteredReceiver<SerializableMessage>,
990 mut unordered: mpsc::UnboundedReceiver<UnorderedWsMessage>,
991) {
992 let mut messages_buf = Vec::with_capacity(32);
993 let mut serialize_buf = SerializeBuffer::new(config);
994
995 loop {
996 let closed = state.closed();
997
998 tokio::select! {
999 biased;
1002
1003 maybe_msg = unordered.recv() => {
1004 let Some(msg) = maybe_msg else {
1005 break;
1006 };
1007 if closed {
1012 continue;
1013 }
1014 match msg {
1015 UnorderedWsMessage::Close(close_frame) => {
1016 log::trace!("sending close frame");
1017 if let Err(e) = ws.send(WsMessage::Close(Some(close_frame))).await {
1018 log::warn!("error sending close frame: {e:#}");
1019 break;
1020 }
1021 state.close();
1025 messages.close();
1028 },
1029 UnorderedWsMessage::Ping(bytes) => {
1030 log::trace!("sending ping");
1031 if let Err(e) = ws.feed(WsMessage::Ping(bytes)).await {
1032 log::warn!("error sending ping: {e:#}");
1033 break;
1034 }
1035 },
1036 UnorderedWsMessage::Error(err) => {
1037 log::trace!("sending error result");
1038 let (msg_alloc, res) = send_message(
1039 &state.database,
1040 config,
1041 serialize_buf,
1042 None,
1043 &mut ws,
1044 err
1045 ).await;
1046 serialize_buf = msg_alloc;
1047
1048 if let Err(e) = res {
1049 log::warn!("websocket send error: {e}");
1050 break;
1051 }
1052 },
1053 }
1054 },
1055
1056 n = messages.recv_many(&mut messages_buf, 32), if !closed => {
1057 if n == 0 {
1058 continue;
1059 }
1060 log::trace!("sending {n} outgoing messages");
1061 for msg in messages_buf.drain(..n) {
1062 let (msg_alloc, res) = send_message(
1063 &state.database,
1064 config,
1065 serialize_buf,
1066 msg.workload().zip(msg.num_rows()),
1067 &mut ws,
1068 msg
1069 ).await;
1070 serialize_buf = msg_alloc;
1071
1072 if let Err(e) = res {
1073 log::warn!("websocket send error: {e}");
1074 return;
1075 }
1076 }
1077 },
1078 }
1079
1080 if let Err(e) = ws.flush().await {
1081 log::warn!("error flushing websocket: {e}");
1082 break;
1083 }
1084 }
1085}
1086
1087async fn send_message<S: Sink<WsMessage> + Unpin>(
1089 database_identity: &Identity,
1090 config: ClientConfig,
1091 serialize_buf: SerializeBuffer,
1092 metrics_metadata: Option<(WorkloadType, usize)>,
1093 ws: &mut S,
1094 message: impl ToProtocol<Encoded = SwitchedServerMessage> + Send + 'static,
1095) -> (SerializeBuffer, Result<(), S::Error>) {
1096 let (workload, num_rows) = metrics_metadata.unzip();
1097 let serialize_and_compress = |serialize_buf, message, config| {
1101 let start = Instant::now();
1102 let (msg_alloc, msg_data) = serialize(serialize_buf, message, config);
1103 (start.elapsed(), msg_alloc, msg_data)
1104 };
1105 let (timing, msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) {
1106 spawn_rayon(move || serialize_and_compress(serialize_buf, message, config)).await
1107 } else {
1108 serialize_and_compress(serialize_buf, message, config)
1109 };
1110 report_ws_sent_metrics(database_identity, workload, num_rows, timing, &msg_data);
1111
1112 let res = async {
1113 ws.feed(datamsg_to_wsmsg(msg_data)).await?;
1114 poll_fn(|cx| ws.poll_ready_unpin(cx)).await
1121 }
1122 .await;
1123 let buf = msg_alloc.try_reclaim().unwrap_or_else(|| SerializeBuffer::new(config));
1126
1127 (buf, res)
1128}
1129
1130#[derive(Debug)]
1131enum ClientMessage {
1132 Message(DataMessage),
1133 Ping(Bytes),
1134 Pong(Bytes),
1135 Close(Option<CloseFrame>),
1136}
1137
1138impl ClientMessage {
1139 fn from_message(msg: WsMessage) -> Self {
1140 match msg {
1141 WsMessage::Text(s) => Self::Message(DataMessage::Text(utf8bytes_to_bytestring(s))),
1142 WsMessage::Binary(b) => Self::Message(DataMessage::Binary(b)),
1143 WsMessage::Ping(b) => Self::Ping(b),
1144 WsMessage::Pong(b) => Self::Pong(b),
1145 WsMessage::Close(frame) => Self::Close(frame),
1146 WsMessage::Frame(_) => unreachable!(),
1148 }
1149 }
1150}
1151
1152fn report_ws_sent_metrics(
1154 addr: &Identity,
1155 workload: Option<WorkloadType>,
1156 num_rows: Option<usize>,
1157 serialize_duration: Duration,
1158 msg_ws: &DataMessage,
1159) {
1160 if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
1163 WORKER_METRICS
1164 .websocket_sent_num_rows
1165 .with_label_values(addr, &workload)
1166 .observe(num_rows as f64);
1167 WORKER_METRICS
1168 .websocket_sent_msg_size
1169 .with_label_values(addr, &workload)
1170 .observe(msg_ws.len() as f64);
1171 }
1172
1173 WORKER_METRICS
1174 .websocket_serialize_secs
1175 .with_label_values(addr)
1176 .observe(serialize_duration.as_secs_f64());
1177}
1178
1179fn datamsg_to_wsmsg(msg: DataMessage) -> WsMessage {
1180 match msg {
1181 DataMessage::Text(text) => WsMessage::Text(bytestring_to_utf8bytes(text)),
1182 DataMessage::Binary(bin) => WsMessage::Binary(bin),
1183 }
1184}
1185
1186fn utf8bytes_to_bytestring(s: Utf8Bytes) -> ByteString {
1187 unsafe { ByteString::from_bytes_unchecked(Bytes::from(s)) }
1189}
1190fn bytestring_to_utf8bytes(s: ByteString) -> Utf8Bytes {
1191 unsafe { Utf8Bytes::from_bytes_unchecked(s.into_bytes()) }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197 use std::{
1198 future::Future,
1199 pin::Pin,
1200 sync::atomic::AtomicUsize,
1201 task::{Context, Poll},
1202 };
1203
1204 use anyhow::anyhow;
1205 use futures::{
1206 future::{self, Either, FutureExt as _},
1207 sink, stream,
1208 };
1209 use pretty_assertions::assert_matches;
1210 use spacetimedb::client::ClientName;
1211 use tokio::time::sleep;
1212
1213 use super::*;
1214
1215 fn dummy_client_id() -> ClientActorId {
1216 ClientActorId {
1217 identity: Identity::ZERO,
1218 connection_id: ConnectionId::ZERO,
1219 name: ClientName(0),
1220 }
1221 }
1222
1223 fn dummy_actor_state() -> ActorState {
1224 dummy_actor_state_with_config(<_>::default())
1225 }
1226
1227 fn dummy_actor_state_with_config(config: WebSocketOptions) -> ActorState {
1228 ActorState::new(Identity::ZERO, dummy_client_id(), config)
1229 }
1230
1231 #[tokio::test]
1232 async fn idle_timer_extends_sleep() {
1233 let timeout = Duration::from_millis(10);
1234
1235 let start = Instant::now();
1236 let (tx, rx) = watch::channel(start + timeout);
1237 tokio::join!(ws_idle_timer(rx), async {
1238 for _ in 0..5 {
1239 sleep(Duration::from_millis(1)).await;
1240 tx.send(Instant::now() + timeout).unwrap();
1241 }
1242 });
1243 let elapsed = start.elapsed();
1244 let expected = timeout + Duration::from_millis(5);
1245 assert!(
1246 elapsed >= expected,
1247 "{}ms elapsed, expected >= {}ms",
1248 elapsed.as_millis(),
1249 expected.as_millis(),
1250 );
1251 }
1252
1253 #[tokio::test]
1254 async fn recv_loop_terminates_when_input_exhausted() {
1255 let state = Arc::new(dummy_actor_state());
1256 let (idle_tx, _idle_rx) = watch::channel(Instant::now() + state.config.idle_timeout);
1257
1258 let input = stream::iter(vec![Ok(WsMessage::Ping(Bytes::new()))]);
1259 pin_mut!(input);
1260
1261 let recv_loop = ws_recv_loop(state, idle_tx, input);
1262 pin_mut!(recv_loop);
1263
1264 assert_matches!(recv_loop.next().await, Some(ClientMessage::Ping(_)));
1265 assert_matches!(recv_loop.next().await, None);
1266 }
1267
1268 #[tokio::test]
1269 async fn recv_loop_terminates_when_input_yields_err() {
1270 let state = Arc::new(dummy_actor_state());
1271 let (idle_tx, _idle_rx) = watch::channel(Instant::now() + state.config.idle_timeout);
1272
1273 let input = stream::iter(vec![
1274 Ok(WsMessage::Ping(Bytes::new())),
1275 Err(WsError::ConnectionClosed),
1276 Ok(WsMessage::Pong(Bytes::new())),
1277 ]);
1278 pin_mut!(input);
1279
1280 let recv_loop = ws_recv_loop(state, idle_tx, input);
1281 pin_mut!(recv_loop);
1282
1283 assert_matches!(recv_loop.next().await, Some(ClientMessage::Ping(_)));
1284 assert_matches!(recv_loop.next().await, None);
1285 }
1286
1287 #[tokio::test]
1288 async fn recv_loop_drains_remaining_messages_when_closed() {
1289 let state = Arc::new(dummy_actor_state());
1290 let (idle_tx, _idle_rx) = watch::channel(Instant::now() + state.config.idle_timeout);
1291
1292 let input = stream::iter(vec![
1293 Ok(WsMessage::Ping(Bytes::new())),
1294 Ok(WsMessage::Pong(Bytes::new())),
1295 ]);
1296 pin_mut!(input);
1297 {
1298 let recv_loop = ws_recv_loop(state.clone(), idle_tx, &mut input);
1299 pin_mut!(recv_loop);
1300
1301 state.close();
1302 assert_matches!(recv_loop.next().await, None);
1303 }
1304 assert_matches!(input.next().await, None);
1305 }
1306
1307 #[tokio::test]
1308 async fn recv_loop_stops_at_error_while_draining() {
1309 let state = Arc::new(dummy_actor_state());
1310 let (idle_tx, _idle_rx) = watch::channel(Instant::now() + state.config.idle_timeout);
1311
1312 let input = stream::iter(vec![
1313 Ok(WsMessage::Ping(Bytes::new())),
1314 Err(WsError::ConnectionClosed),
1315 Ok(WsMessage::Pong(Bytes::new())),
1316 ]);
1317 pin_mut!(input);
1318 {
1319 let recv_loop = ws_recv_loop(state.clone(), idle_tx, &mut input);
1320 pin_mut!(recv_loop);
1321
1322 state.close();
1323 assert_matches!(recv_loop.next().await, None);
1324 }
1325 assert_matches!(input.next().await, Some(Ok(WsMessage::Pong(_))));
1326 }
1327
1328 #[tokio::test]
1329 async fn recv_loop_updates_idle_channel() {
1330 let state = Arc::new(dummy_actor_state());
1331 let idle_deadline = Instant::now() + state.config.idle_timeout;
1332 let (idle_tx, mut idle_rx) = watch::channel(idle_deadline);
1333
1334 let input = stream::iter(vec![
1335 Ok(WsMessage::Ping(Bytes::new())),
1336 Ok(WsMessage::Pong(Bytes::new())),
1337 ]);
1338 let recv_loop = ws_recv_loop(state, idle_tx, input);
1339 pin_mut!(recv_loop);
1340
1341 let mut new_idle_deadline = *idle_rx.borrow();
1342 while let Some(message) = recv_loop.next().await {
1343 drop(message);
1344 assert!(idle_rx.has_changed().unwrap());
1345 new_idle_deadline = *idle_rx.borrow_and_update();
1346 }
1347 assert!(new_idle_deadline > idle_deadline);
1348 }
1349
1350 #[tokio::test]
1351 async fn client_message_handler_terminates_when_input_exhausted() {
1352 let state = Arc::new(dummy_actor_state());
1353 let metric = IntGauge::new("bleep", "unhelpful").unwrap();
1354
1355 let input = stream::iter(vec![
1356 ClientMessage::Ping(Bytes::new()),
1357 ClientMessage::Message(DataMessage::from("hello".to_owned())),
1358 ]);
1359 let handler = ws_client_message_handler(state, metric, input);
1360 pin_mut!(handler);
1361
1362 assert_matches!(
1363 handler.next().await,
1364 Some((DataMessage::Text(data), _instant)) if data == "hello"
1365 );
1366 assert_matches!(handler.next().await, None);
1367 }
1368
1369 #[tokio::test]
1370 async fn client_message_handler_updates_pong_and_closed_states_and_metric() {
1371 let state = Arc::new(dummy_actor_state());
1372 state.reset_ponged();
1373 let metric = IntGauge::new("bleep", "unhelpful").unwrap();
1374
1375 let input = stream::iter(vec![ClientMessage::Pong(Bytes::new()), ClientMessage::Close(None)]);
1376 let handler = ws_client_message_handler(state.clone(), metric.clone(), input);
1377 handler.map(drop).for_each(future::ready).await;
1378
1379 assert!(state.closed());
1380 assert!(state.reset_ponged());
1381 assert_eq!(metric.get(), 1);
1382 }
1383
1384 #[tokio::test]
1385 async fn send_loop_terminates_when_unordered_closed() {
1386 let state = Arc::new(dummy_actor_state());
1387 let (messages_tx, messages_rx) = mpsc::channel(64);
1388 let messages = MeteredReceiver::new(messages_rx);
1389 let (unordered_tx, unordered_rx) = mpsc::unbounded_channel();
1390
1391 let send_loop = ws_send_loop(state, ClientConfig::for_test(), sink::drain(), messages, unordered_rx);
1392 pin_mut!(send_loop);
1393
1394 assert!(is_pending(&mut send_loop).await);
1395 drop(messages_tx);
1396 assert!(is_pending(&mut send_loop).await);
1397
1398 drop(unordered_tx);
1399 send_loop.await;
1400 }
1401
1402 #[tokio::test]
1403 async fn send_loop_close_message_closes_state_and_messages() {
1404 let state = Arc::new(dummy_actor_state());
1405 let (messages_tx, messages_rx) = mpsc::channel(64);
1406 let messages = MeteredReceiver::new(messages_rx);
1407 let (unordered_tx, unordered_rx) = mpsc::unbounded_channel();
1408
1409 let send_loop = ws_send_loop(
1410 state.clone(),
1411 ClientConfig::for_test(),
1412 sink::drain(),
1413 messages,
1414 unordered_rx,
1415 );
1416 pin_mut!(send_loop);
1417
1418 unordered_tx
1419 .send(UnorderedWsMessage::Close(CloseFrame {
1420 code: CloseCode::Away,
1421 reason: "done".into(),
1422 }))
1423 .unwrap();
1424
1425 assert!(is_pending(&mut send_loop).await);
1426 assert!(state.closed());
1427 assert!(messages_tx.is_closed());
1428 }
1429
1430 #[tokio::test]
1431 async fn send_loop_terminates_if_sink_cant_be_fed() {
1432 let input = [
1433 Either::Left(UnorderedWsMessage::Close(CloseFrame {
1434 code: CloseCode::Away,
1435 reason: "bah!".into(),
1436 })),
1437 Either::Left(UnorderedWsMessage::Ping(Bytes::new())),
1438 Either::Left(UnorderedWsMessage::Error(MessageExecutionError {
1439 reducer: None,
1440 reducer_id: None,
1441 caller_identity: Identity::ZERO,
1442 caller_connection_id: None,
1443 err: anyhow!("it did not work"),
1444 })),
1445 Either::Right(SerializableMessage::Identity(IdentityTokenMessage {
1448 identity: Identity::ZERO,
1449 token: "macaron".into(),
1450 connection_id: ConnectionId::ZERO,
1451 })),
1452 ];
1453
1454 for msg in input {
1455 let state = Arc::new(dummy_actor_state());
1456 let (messages_tx, messages_rx) = mpsc::channel(64);
1457 let messages = MeteredReceiver::new(messages_rx);
1458 let (unordered_tx, unordered_rx) = mpsc::unbounded_channel();
1459
1460 let send_loop = ws_send_loop(
1461 state.clone(),
1462 ClientConfig::for_test(),
1463 UnfeedableSink,
1464 messages,
1465 unordered_rx,
1466 );
1467 pin_mut!(send_loop);
1468
1469 match msg {
1470 Either::Left(unordered) => unordered_tx.send(unordered).unwrap(),
1471 Either::Right(msg) => messages_tx.send(msg).await.unwrap(),
1472 }
1473 send_loop.await;
1474 }
1475 }
1476
1477 #[tokio::test]
1478 async fn send_loop_terminates_if_sink_cant_be_flushed() {
1479 let input = [
1480 Either::Left(UnorderedWsMessage::Close(CloseFrame {
1481 code: CloseCode::Away,
1482 reason: "bah!".into(),
1483 })),
1484 Either::Left(UnorderedWsMessage::Ping(Bytes::new())),
1485 Either::Left(UnorderedWsMessage::Error(MessageExecutionError {
1486 reducer: None,
1487 reducer_id: None,
1488 caller_identity: Identity::ZERO,
1489 caller_connection_id: None,
1490 err: anyhow!("it did not work"),
1491 })),
1492 Either::Right(SerializableMessage::Identity(IdentityTokenMessage {
1495 identity: Identity::ZERO,
1496 token: "macaron".into(),
1497 connection_id: ConnectionId::ZERO,
1498 })),
1499 ];
1500
1501 for msg in input {
1502 let state = Arc::new(dummy_actor_state());
1503 let (messages_tx, messages_rx) = mpsc::channel(64);
1504 let messages = MeteredReceiver::new(messages_rx);
1505 let (unordered_tx, unordered_rx) = mpsc::unbounded_channel();
1506
1507 let send_loop = ws_send_loop(
1508 state.clone(),
1509 ClientConfig::for_test(),
1510 UnflushableSink,
1511 messages,
1512 unordered_rx,
1513 );
1514 pin_mut!(send_loop);
1515
1516 match msg {
1517 Either::Left(unordered) => unordered_tx.send(unordered).unwrap(),
1518 Either::Right(msg) => messages_tx.send(msg).await.unwrap(),
1519 }
1520 send_loop.await;
1521 }
1522 }
1523
1524 #[tokio::test]
1525 async fn main_loop_terminates_if_either_send_or_recv_terminates() {
1526 let state = Arc::new(dummy_actor_state());
1527 ws_main_loop(
1528 state.clone(),
1529 future::pending,
1530 future::pending(),
1531 tokio::spawn(sleep(Duration::from_millis(10))),
1532 tokio::spawn(future::pending()),
1533 drop,
1534 )
1535 .await;
1536 ws_main_loop(
1537 state,
1538 future::pending,
1539 future::pending(),
1540 tokio::spawn(future::pending()),
1541 tokio::spawn(sleep(Duration::from_millis(10))),
1542 drop,
1543 )
1544 .await;
1545 }
1546
1547 #[tokio::test]
1548 async fn main_loop_terminates_on_idle_timeout() {
1549 let state = Arc::new(dummy_actor_state_with_config(WebSocketOptions {
1550 idle_timeout: Duration::from_millis(10),
1551 ..<_>::default()
1552 }));
1553 let (idle_tx, idle_rx) = watch::channel(state.next_idle_deadline());
1554
1555 let start = Instant::now();
1556 let mut t = tokio::spawn({
1557 let state = state.clone();
1558 async move {
1559 ws_main_loop(
1560 state,
1561 future::pending,
1562 ws_idle_timer(idle_rx),
1563 tokio::spawn(future::pending()),
1564 tokio::spawn(future::pending()),
1565 drop,
1566 )
1567 .await
1568 }
1569 });
1570
1571 let loop_start = Instant::now();
1572 for _ in 0..5 {
1573 sleep(Duration::from_millis(5)).await;
1574 idle_tx.send(state.next_idle_deadline()).unwrap();
1575 assert!(is_pending(&mut t).await);
1576 }
1577 let timeout = loop_start.elapsed() + Duration::from_millis(10);
1578
1579 t.await.unwrap();
1580 let elapsed = start.elapsed();
1581 assert!(elapsed >= timeout);
1582 assert!(elapsed < timeout + Duration::from_millis(10));
1583 }
1584
1585 #[tokio::test]
1586 async fn main_loop_keepalive_keeps_alive() {
1587 let state = Arc::new(dummy_actor_state_with_config(WebSocketOptions {
1588 ping_interval: Duration::from_millis(5),
1589 idle_timeout: Duration::from_millis(10),
1590 ..<_>::default()
1591 }));
1592 let (idle_tx, idle_rx) = watch::channel(state.next_idle_deadline());
1593 let unordered_tx = {
1596 let state = state.clone();
1597 let pings = AtomicUsize::new(0);
1598 move |m| {
1599 if let UnorderedWsMessage::Ping(_) = m {
1600 let n = pings.fetch_add(1, Ordering::Relaxed);
1601 if n < 5 {
1602 state.set_ponged();
1603 idle_tx.send(state.next_idle_deadline()).ok();
1604 }
1605 }
1606 }
1607 };
1608
1609 let start = Instant::now();
1610 let t = tokio::spawn({
1611 let state = state.clone();
1612 async move {
1613 ws_main_loop(
1614 state,
1615 future::pending,
1616 ws_idle_timer(idle_rx),
1617 tokio::spawn(future::pending()),
1618 tokio::spawn(future::pending()),
1619 unordered_tx,
1620 )
1621 .await
1622 }
1623 });
1624
1625 let expected_timeout = (5 * state.config.ping_interval) + state.config.idle_timeout;
1626 let res = timeout(expected_timeout, t).await;
1627 let elapsed = start.elapsed();
1628
1629 assert_matches!(res, Ok(Ok(())));
1631 assert!(elapsed >= expected_timeout - state.config.ping_interval);
1633 }
1634
1635 #[tokio::test]
1636 async fn main_loop_terminates_when_module_exits() {
1637 let state = Arc::new(dummy_actor_state());
1638
1639 let (_idle_tx, idle_rx) = watch::channel(state.next_idle_deadline());
1640 let unordered_tx = {
1641 let state = state.clone();
1642 move |m| {
1643 if let UnorderedWsMessage::Close(_) = m {
1644 state.close();
1645 }
1646 }
1647 };
1648
1649 let start = Instant::now();
1650 tokio::spawn(async move {
1651 let hotswap = || async {
1652 sleep(Duration::from_millis(5)).await;
1653 Err(NoSuchModule)
1654 };
1655
1656 ws_main_loop(
1657 state.clone(),
1658 hotswap,
1659 ws_idle_timer(idle_rx),
1660 tokio::spawn(async move {
1662 loop {
1663 if state.closed() {
1664 break;
1665 }
1666 sleep(Duration::from_millis(1)).await
1667 }
1668 }),
1669 tokio::spawn(future::pending()),
1670 unordered_tx,
1671 )
1672 .await
1673 })
1674 .await
1675 .unwrap();
1676 let elapsed = start.elapsed();
1677 assert!(elapsed >= Duration::from_millis(5));
1678 assert!(elapsed < Duration::from_millis(10));
1679 }
1680
1681 #[tokio::test]
1682 async fn recv_queue_sends_close_when_at_capacity() {
1683 let state = Arc::new(dummy_actor_state_with_config(WebSocketOptions {
1684 incoming_queue_length: 10.try_into().unwrap(),
1685 ..<_>::default()
1686 }));
1687
1688 let (unordered_tx, mut unordered_rx) = mpsc::unbounded_channel();
1689 let input = stream::iter((0..20).map(|i| Ok(WsMessage::text(format!("message {i}")))));
1690
1691 let received = ws_recv_queue(state, unordered_tx, input).collect::<Vec<_>>().await;
1692 assert_matches!(unordered_rx.recv().await, Some(UnorderedWsMessage::Close(_)));
1693 assert_eq!(received.len(), 20);
1695 }
1696
1697 #[tokio::test]
1698 async fn recv_queue_closes_state_if_sender_gone() {
1699 let state = Arc::new(dummy_actor_state_with_config(WebSocketOptions {
1700 incoming_queue_length: 10.try_into().unwrap(),
1701 ..<_>::default()
1702 }));
1703
1704 let (unordered_tx, _) = mpsc::unbounded_channel();
1705 let input = stream::iter((0..20).map(|i| Ok(WsMessage::text(format!("message {i}")))));
1706
1707 let received = ws_recv_queue(state.clone(), unordered_tx, input)
1708 .collect::<Vec<_>>()
1709 .await;
1710 assert!(state.closed());
1711 assert_eq!(received.len(), 10);
1713 }
1714
1715 async fn is_pending(fut: &mut (impl Future + Unpin)) -> bool {
1716 poll_fn(|cx| Poll::Ready(fut.poll_unpin(cx).is_pending())).await
1717 }
1718
1719 struct UnfeedableSink;
1720
1721 impl<T> Sink<T> for UnfeedableSink {
1722 type Error = &'static str;
1723
1724 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1725 Poll::Ready(Ok(()))
1726 }
1727
1728 fn start_send(self: Pin<&mut Self>, _: T) -> Result<(), Self::Error> {
1729 Err("don't feed the sink")
1730 }
1731
1732 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1733 Poll::Ready(Ok(()))
1734 }
1735
1736 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1737 Poll::Ready(Ok(()))
1738 }
1739 }
1740
1741 struct UnflushableSink;
1742
1743 impl<T> Sink<T> for UnflushableSink {
1744 type Error = &'static str;
1745
1746 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1747 Poll::Ready(Ok(()))
1748 }
1749
1750 fn start_send(self: Pin<&mut Self>, _: T) -> Result<(), Self::Error> {
1751 Ok(())
1752 }
1753
1754 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1755 Poll::Ready(Err("don't flush the sink"))
1756 }
1757
1758 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1759 Poll::Ready(Ok(()))
1760 }
1761 }
1762
1763 #[test]
1764 fn options_toml_roundtrip() {
1765 let options = WebSocketOptions::default();
1766 let toml = toml::to_string(&options).unwrap();
1767 assert_eq!(options, toml::from_str::<WebSocketOptions>(&toml).unwrap());
1768 }
1769
1770 #[test]
1771 fn options_from_partial_toml() {
1772 let toml = r#"
1773 ping-interval = "53s"
1774 idle-timeout = "1m 3s"
1775"#;
1776
1777 let expected = WebSocketOptions {
1778 ping_interval: Duration::from_secs(53),
1779 idle_timeout: Duration::from_secs(63),
1780 ..<_>::default()
1781 };
1782
1783 assert_eq!(expected, toml::from_str(toml).unwrap());
1784 }
1785}