tokio_websocket_client/
client.rs

1use crate::{Message, command::Command};
2use std::time::Duration;
3use flume::RecvTimeoutError;
4
5/// A connected client that can be used to send messages to the server.
6///
7/// # Example
8/// ```
9/// # use reqwest_websocket::RequestBuilderExt;
10/// # use tokio_websocket_client::{
11/// #     CloseCode,
12/// #     Connector,
13/// #     Handler,
14/// #     Message,
15/// #     RetryStrategy,
16/// #     StreamWrapper,
17/// #     connect,
18/// # };
19/// #
20/// # struct DummyHandler;
21/// #
22/// # impl Handler for DummyHandler {
23/// #
24/// #     async fn on_text(&mut self, text: &str) {
25/// #         log::info!("on_text received: {text}");
26/// #     }
27/// #
28/// #     async fn on_binary(&mut self, buffer: &[u8]) {
29/// #         log::info!("on_binary received: {buffer:?}");
30/// #     }
31/// #
32/// #     async fn on_close(&mut self, code: CloseCode, reason: &str) -> RetryStrategy {
33/// #         log::info!("on_close received: {code:?}: {reason}");
34/// #         RetryStrategy::Close
35/// #     }
36/// #
37/// #     async fn on_connect(&mut self) {
38/// #         log::info!("on_connect");
39/// #     }
40/// #
41/// #     async fn on_connect_failure(&mut self) -> RetryStrategy {
42/// #         log::info!("on_connect_failure");
43/// #         RetryStrategy::Close
44/// #     }
45/// #
46/// #     async fn on_disconnect(&mut self) -> RetryStrategy {
47/// #         log::info!("on_disconnect");
48/// #         RetryStrategy::Close
49/// #     }
50/// # }
51/// #
52/// # struct DummyMessage(reqwest_websocket::Message);
53/// #
54/// # impl From<reqwest_websocket::Message> for DummyMessage {
55/// #     fn from(message: reqwest_websocket::Message) -> Self {
56/// #         Self(message)
57/// #     }
58/// # }
59/// #
60/// # impl From<DummyMessage> for reqwest_websocket::Message {
61/// #     fn from(message: DummyMessage) -> Self {
62/// #         message.0
63/// #     }
64/// # }
65/// #
66/// # impl From<DummyMessage> for Message {
67/// #     fn from(other: DummyMessage) -> Message {
68/// #         match other {
69/// #             DummyMessage(reqwest_websocket::Message::Text(data)) => Message::Text(data),
70/// #             DummyMessage(reqwest_websocket::Message::Binary(data)) => Message::Binary(data),
71/// #             DummyMessage(reqwest_websocket::Message::Ping(data)) => Message::Ping(data),
72/// #             DummyMessage(reqwest_websocket::Message::Pong(data)) => Message::Pong(data),
73/// #             DummyMessage(reqwest_websocket::Message::Close { code, reason }) => {
74/// #                 Message::Close(CloseCode::from(u16::from(code)), reason)
75/// #             }
76/// #         }
77/// #     }
78/// # }
79/// #
80/// # impl From<Message> for DummyMessage {
81/// #     fn from(msg: Message) -> Self {
82/// #         match msg {
83/// #             Message::Text(data) => Self(reqwest_websocket::Message::Text(data)),
84/// #             Message::Binary(data) => Self(reqwest_websocket::Message::Binary(data)),
85/// #             Message::Ping(data) => Self(reqwest_websocket::Message::Ping(data)),
86/// #             Message::Pong(data) => Self(reqwest_websocket::Message::Pong(data)),
87/// #             Message::Close(code, reason) => Self(reqwest_websocket::Message::Close {
88/// #                 code: reqwest_websocket::CloseCode::from(u16::from(code)),
89/// #                 reason,
90/// #             }),
91/// #         }
92/// #     }
93/// # }
94/// #
95/// # struct DummyConnector;
96/// #
97/// # impl Connector for DummyConnector {
98/// #     type Item = DummyMessage;
99/// #     type BackendStream = reqwest_websocket::WebSocket;
100/// #     type BackendMessage = reqwest_websocket::Message;
101/// #     type Error = reqwest_websocket::Error;
102/// #
103/// #     async fn connect() -> Result<
104/// #         StreamWrapper<'static, Self::BackendStream, Self::BackendMessage, Self::Item, Self::Error>,
105/// #         Self::Error,
106/// #     > {
107/// #         let response = reqwest::Client::default()
108/// #             .get("ws://echo.websocket.org/")
109/// #             .upgrade()
110/// #             .send()
111/// #             .await?;
112/// #
113/// #         response.into_websocket().await.map(StreamWrapper::from)
114/// #     }
115/// # }
116/// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
117/// # rt.block_on(async move {
118///  let Some(client) = connect(DummyConnector, DummyHandler).await else {
119///     log::info!("Failed to connect");
120///     return;
121///  };
122///  
123///  client.text("hello world").await.unwrap();
124/// # });
125/// ```
126#[derive(Debug, Clone)]
127pub struct Client {
128    pub(crate) to_send: flume::Sender<Message>,
129    pub(crate) command_tx: flume::Sender<Command>,
130    pub(crate) confirm_close_rx: flume::Receiver<()>,
131}
132
133impl From<Client> for flume::Sender<Message> {
134    fn from(client: Client) -> Self {
135        client.to_send
136    }
137}
138
139impl Client {
140    /// Send a text message to the server.
141    ///
142    /// # Errors
143    /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
144    pub async fn text(&self, message: impl Into<String>) -> Result<(), flume::SendError<Message>> {
145        let message = message.into();
146        log::debug!("Sending text: {message}");
147
148        self.to_send.send_async(Message::Text(message)).await
149    }
150
151    /// Send a text message to the server.
152    ///
153    /// # Errors
154    /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
155    pub fn blocking_text(
156        &self,
157        message: impl Into<String>,
158    ) -> Result<(), flume::SendError<Message>> {
159        let message = message.into();
160        log::debug!("Sending text: {message}");
161
162        self.to_send.send(Message::Text(message))
163    }
164
165    /// Send a text message to the server allowing to set up a timeout.
166    ///
167    /// # Errors
168    /// Returns an [`Error`](flume::SendTimeoutError) if all receivers have been dropped or the timeout has been reached.
169    pub fn blocking_text_timeout(
170        &self,
171        message: impl Into<String>,
172        timeout: Duration,
173    ) -> Result<(), flume::SendTimeoutError<Message>> {
174        let message = message.into();
175        log::debug!("Sending text: {message}");
176
177        self.to_send.send_timeout(Message::Text(message), timeout)
178    }
179
180    /// Send a binary message to the server.
181    ///
182    /// # Errors
183    /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
184    pub async fn binary(
185        &self,
186        message: impl IntoIterator<Item = u8>,
187    ) -> Result<(), flume::SendError<Message>> {
188        let message = message.into_iter().collect();
189        log::debug!("Sending binary: {message:?}");
190
191        self.to_send.send_async(Message::Binary(message)).await
192    }
193
194    /// Send a binary message to the server.
195    ///
196    /// # Errors
197    /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
198    pub fn blocking_binary(
199        &self,
200        message: impl IntoIterator<Item = u8>,
201    ) -> Result<(), flume::SendError<Message>> {
202        let message = message.into_iter().collect();
203        log::debug!("Sending binary: {message:?}");
204
205        self.to_send.send(Message::Binary(message))
206    }
207
208    /// Send a binary message to the server.
209    ///
210    /// # Errors
211    /// Returns an [`Error`](flume::SendTimeoutError) if all receivers have been dropped or the timeout has been reached.
212    pub fn blocking_binary_timeout(
213        &self,
214        message: impl IntoIterator<Item = u8>,
215        timeout: Duration,
216    ) -> Result<(), flume::SendTimeoutError<Message>> {
217        let message = message.into_iter().collect();
218        log::debug!("Sending text: {:?}", &message);
219
220        self.to_send.send_timeout(Message::Binary(message), timeout)
221    }
222
223    /// Force the socket to reconnect, this can be useful when server address can change.
224    ///
225    /// # Errors
226    /// Return an [`Error`](flume::SendError) if the receiver is dropped.
227    pub async fn force_reconnect(&self) -> Result<(), flume::SendError<Command>> {
228        self.command_tx.send_async(Command::Reconnect).await
229    }
230
231    /// Force the socket to reconnect, this can be useful when server address can change.
232    ///
233    /// # Errors
234    /// Return an [`Error`](flume::SendError) if the receiver is dropped.
235    pub fn blocking_force_reconnect(&self) -> Result<(), flume::SendError<Command>> {
236        self.command_tx.send(Command::Reconnect)
237    }
238
239    /// Force the socket to reconnect, this can be useful when server address can change.
240    ///
241    /// # Errors
242    /// Return an [`Error`](flume::SendTimeoutError) if the receiver is dropped.
243    pub fn blocking_force_reconnect_timeout(
244        &self,
245        timeout: Duration,
246    ) -> Result<(), flume::SendTimeoutError<Command>> {
247        self.command_tx.send_timeout(Command::Reconnect, timeout)
248    }
249
250    /// Allow the [`Client`] to close the connection.
251    ///
252    /// This will also stop any background task to avoid any reconnection.
253    ///
254    /// # Errors
255    /// Return an [`Error`](flume::SendError) if the receiver is dropped.
256    pub async fn close(&self) -> std::io::Result<()> {
257        self.command_tx.send_async(Command::Close).await.map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))?;
258        self.confirm_close_rx.recv_async().await.map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))
259    }
260
261    /// Allow the [`Client`] to close the connection.
262    ///
263    /// This will also stop any background task to avoid any reconnection.
264    ///
265    /// # Errors
266    /// Return an [`Error`](flume::SendError) if the receiver is dropped.
267    pub fn blocking_close(self) -> std::io::Result<()> {
268        self.command_tx.send(Command::Close).map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))?;
269        self.confirm_close_rx.recv().map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))
270    }
271
272    /// Allow the [`Client`] to close the connection.
273    ///
274    /// This will also stop any background task to avoid any reconnection.
275    ///
276    /// # Errors
277    /// Return an [`Error`](flume::SendError) if the receiver is dropped.
278    pub fn blocking_close_timeout(
279        self,
280        request_timeout: Duration,
281        confirmation_timeout: Duration,
282    ) -> std::io::Result<()> {
283        match self.command_tx.send_timeout(Command::Close, request_timeout) {
284            Ok(()) => {},
285            Err(reason) => {
286                return match &reason {
287                    flume::SendTimeoutError::Disconnected(_) => Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, reason)),
288                    flume::SendTimeoutError::Timeout(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, reason)),
289                }
290            },
291        }
292        self.confirm_close_rx.recv_timeout(confirmation_timeout).map_err(|err| match &err {
293            RecvTimeoutError::Disconnected => std::io::Error::new(std::io::ErrorKind::BrokenPipe, err),
294            RecvTimeoutError::Timeout => std::io::Error::new(std::io::ErrorKind::TimedOut, err),
295        })
296    }
297}