tokio_websocket_client/client.rs
1use crate::{Message, command::Command};
2use std::time::Duration;
3use flume::RecvTimeoutError;
4
5/// A connected client that can be used to send messages to the server.
6///
7/// # Example
8/// ```
9/// # use reqwest_websocket::RequestBuilderExt;
10/// # use tokio_websocket_client::{
11/// # CloseCode,
12/// # Connector,
13/// # Handler,
14/// # Message,
15/// # RetryStrategy,
16/// # StreamWrapper,
17/// # connect,
18/// # };
19/// #
20/// # struct DummyHandler;
21/// #
22/// # impl Handler for DummyHandler {
23/// #
24/// # async fn on_text(&mut self, text: &str) {
25/// # log::info!("on_text received: {text}");
26/// # }
27/// #
28/// # async fn on_binary(&mut self, buffer: &[u8]) {
29/// # log::info!("on_binary received: {buffer:?}");
30/// # }
31/// #
32/// # async fn on_close(&mut self, code: CloseCode, reason: &str) -> RetryStrategy {
33/// # log::info!("on_close received: {code:?}: {reason}");
34/// # RetryStrategy::Close
35/// # }
36/// #
37/// # async fn on_connect(&mut self) {
38/// # log::info!("on_connect");
39/// # }
40/// #
41/// # async fn on_connect_failure(&mut self) -> RetryStrategy {
42/// # log::info!("on_connect_failure");
43/// # RetryStrategy::Close
44/// # }
45/// #
46/// # async fn on_disconnect(&mut self) -> RetryStrategy {
47/// # log::info!("on_disconnect");
48/// # RetryStrategy::Close
49/// # }
50/// # }
51/// #
52/// # struct DummyMessage(reqwest_websocket::Message);
53/// #
54/// # impl From<reqwest_websocket::Message> for DummyMessage {
55/// # fn from(message: reqwest_websocket::Message) -> Self {
56/// # Self(message)
57/// # }
58/// # }
59/// #
60/// # impl From<DummyMessage> for reqwest_websocket::Message {
61/// # fn from(message: DummyMessage) -> Self {
62/// # message.0
63/// # }
64/// # }
65/// #
66/// # impl From<DummyMessage> for Message {
67/// # fn from(other: DummyMessage) -> Message {
68/// # match other {
69/// # DummyMessage(reqwest_websocket::Message::Text(data)) => Message::Text(data),
70/// # DummyMessage(reqwest_websocket::Message::Binary(data)) => Message::Binary(data),
71/// # DummyMessage(reqwest_websocket::Message::Ping(data)) => Message::Ping(data),
72/// # DummyMessage(reqwest_websocket::Message::Pong(data)) => Message::Pong(data),
73/// # DummyMessage(reqwest_websocket::Message::Close { code, reason }) => {
74/// # Message::Close(CloseCode::from(u16::from(code)), reason)
75/// # }
76/// # }
77/// # }
78/// # }
79/// #
80/// # impl From<Message> for DummyMessage {
81/// # fn from(msg: Message) -> Self {
82/// # match msg {
83/// # Message::Text(data) => Self(reqwest_websocket::Message::Text(data)),
84/// # Message::Binary(data) => Self(reqwest_websocket::Message::Binary(data)),
85/// # Message::Ping(data) => Self(reqwest_websocket::Message::Ping(data)),
86/// # Message::Pong(data) => Self(reqwest_websocket::Message::Pong(data)),
87/// # Message::Close(code, reason) => Self(reqwest_websocket::Message::Close {
88/// # code: reqwest_websocket::CloseCode::from(u16::from(code)),
89/// # reason,
90/// # }),
91/// # }
92/// # }
93/// # }
94/// #
95/// # struct DummyConnector;
96/// #
97/// # impl Connector for DummyConnector {
98/// # type Item = DummyMessage;
99/// # type BackendStream = reqwest_websocket::WebSocket;
100/// # type BackendMessage = reqwest_websocket::Message;
101/// # type Error = reqwest_websocket::Error;
102/// #
103/// # async fn connect() -> Result<
104/// # StreamWrapper<'static, Self::BackendStream, Self::BackendMessage, Self::Item, Self::Error>,
105/// # Self::Error,
106/// # > {
107/// # let response = reqwest::Client::default()
108/// # .get("ws://echo.websocket.org/")
109/// # .upgrade()
110/// # .send()
111/// # .await?;
112/// #
113/// # response.into_websocket().await.map(StreamWrapper::from)
114/// # }
115/// # }
116/// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
117/// # rt.block_on(async move {
118/// let Some(client) = connect(DummyConnector, DummyHandler).await else {
119/// log::info!("Failed to connect");
120/// return;
121/// };
122///
123/// client.text("hello world").await.unwrap();
124/// # });
125/// ```
126#[derive(Debug, Clone)]
127pub struct Client {
128 pub(crate) to_send: flume::Sender<Message>,
129 pub(crate) command_tx: flume::Sender<Command>,
130 pub(crate) confirm_close_rx: flume::Receiver<()>,
131}
132
133impl From<Client> for flume::Sender<Message> {
134 fn from(client: Client) -> Self {
135 client.to_send
136 }
137}
138
139impl Client {
140 /// Send a text message to the server.
141 ///
142 /// # Errors
143 /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
144 pub async fn text(&self, message: impl Into<String>) -> Result<(), flume::SendError<Message>> {
145 let message = message.into();
146 log::debug!("Sending text: {message}");
147
148 self.to_send.send_async(Message::Text(message)).await
149 }
150
151 /// Send a text message to the server.
152 ///
153 /// # Errors
154 /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
155 pub fn blocking_text(
156 &self,
157 message: impl Into<String>,
158 ) -> Result<(), flume::SendError<Message>> {
159 let message = message.into();
160 log::debug!("Sending text: {message}");
161
162 self.to_send.send(Message::Text(message))
163 }
164
165 /// Send a text message to the server allowing to set up a timeout.
166 ///
167 /// # Errors
168 /// Returns an [`Error`](flume::SendTimeoutError) if all receivers have been dropped or the timeout has been reached.
169 pub fn blocking_text_timeout(
170 &self,
171 message: impl Into<String>,
172 timeout: Duration,
173 ) -> Result<(), flume::SendTimeoutError<Message>> {
174 let message = message.into();
175 log::debug!("Sending text: {message}");
176
177 self.to_send.send_timeout(Message::Text(message), timeout)
178 }
179
180 /// Send a binary message to the server.
181 ///
182 /// # Errors
183 /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
184 pub async fn binary(
185 &self,
186 message: impl IntoIterator<Item = u8>,
187 ) -> Result<(), flume::SendError<Message>> {
188 let message = message.into_iter().collect();
189 log::debug!("Sending binary: {message:?}");
190
191 self.to_send.send_async(Message::Binary(message)).await
192 }
193
194 /// Send a binary message to the server.
195 ///
196 /// # Errors
197 /// Returns an [`Error`](flume::SendError) if all receivers have been dropped.
198 pub fn blocking_binary(
199 &self,
200 message: impl IntoIterator<Item = u8>,
201 ) -> Result<(), flume::SendError<Message>> {
202 let message = message.into_iter().collect();
203 log::debug!("Sending binary: {message:?}");
204
205 self.to_send.send(Message::Binary(message))
206 }
207
208 /// Send a binary message to the server.
209 ///
210 /// # Errors
211 /// Returns an [`Error`](flume::SendTimeoutError) if all receivers have been dropped or the timeout has been reached.
212 pub fn blocking_binary_timeout(
213 &self,
214 message: impl IntoIterator<Item = u8>,
215 timeout: Duration,
216 ) -> Result<(), flume::SendTimeoutError<Message>> {
217 let message = message.into_iter().collect();
218 log::debug!("Sending text: {:?}", &message);
219
220 self.to_send.send_timeout(Message::Binary(message), timeout)
221 }
222
223 /// Force the socket to reconnect, this can be useful when server address can change.
224 ///
225 /// # Errors
226 /// Return an [`Error`](flume::SendError) if the receiver is dropped.
227 pub async fn force_reconnect(&self) -> Result<(), flume::SendError<Command>> {
228 self.command_tx.send_async(Command::Reconnect).await
229 }
230
231 /// Force the socket to reconnect, this can be useful when server address can change.
232 ///
233 /// # Errors
234 /// Return an [`Error`](flume::SendError) if the receiver is dropped.
235 pub fn blocking_force_reconnect(&self) -> Result<(), flume::SendError<Command>> {
236 self.command_tx.send(Command::Reconnect)
237 }
238
239 /// Force the socket to reconnect, this can be useful when server address can change.
240 ///
241 /// # Errors
242 /// Return an [`Error`](flume::SendTimeoutError) if the receiver is dropped.
243 pub fn blocking_force_reconnect_timeout(
244 &self,
245 timeout: Duration,
246 ) -> Result<(), flume::SendTimeoutError<Command>> {
247 self.command_tx.send_timeout(Command::Reconnect, timeout)
248 }
249
250 /// Allow the [`Client`] to close the connection.
251 ///
252 /// This will also stop any background task to avoid any reconnection.
253 ///
254 /// # Errors
255 /// Return an [`Error`](flume::SendError) if the receiver is dropped.
256 pub async fn close(&self) -> std::io::Result<()> {
257 self.command_tx.send_async(Command::Close).await.map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))?;
258 self.confirm_close_rx.recv_async().await.map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))
259 }
260
261 /// Allow the [`Client`] to close the connection.
262 ///
263 /// This will also stop any background task to avoid any reconnection.
264 ///
265 /// # Errors
266 /// Return an [`Error`](flume::SendError) if the receiver is dropped.
267 pub fn blocking_close(self) -> std::io::Result<()> {
268 self.command_tx.send(Command::Close).map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))?;
269 self.confirm_close_rx.recv().map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))
270 }
271
272 /// Allow the [`Client`] to close the connection.
273 ///
274 /// This will also stop any background task to avoid any reconnection.
275 ///
276 /// # Errors
277 /// Return an [`Error`](flume::SendError) if the receiver is dropped.
278 pub fn blocking_close_timeout(
279 self,
280 request_timeout: Duration,
281 confirmation_timeout: Duration,
282 ) -> std::io::Result<()> {
283 match self.command_tx.send_timeout(Command::Close, request_timeout) {
284 Ok(()) => {},
285 Err(reason) => {
286 return match &reason {
287 flume::SendTimeoutError::Disconnected(_) => Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, reason)),
288 flume::SendTimeoutError::Timeout(_) => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, reason)),
289 }
290 },
291 }
292 self.confirm_close_rx.recv_timeout(confirmation_timeout).map_err(|err| match &err {
293 RecvTimeoutError::Disconnected => std::io::Error::new(std::io::ErrorKind::BrokenPipe, err),
294 RecvTimeoutError::Timeout => std::io::Error::new(std::io::ErrorKind::TimedOut, err),
295 })
296 }
297}