websocket_client_async/
lib.rs1mod websocket_io;
2
3use crate::websocket_io::{WebSocketReader, WebSocketInner, WebsocketIO};
4use anyhow::{bail, Error, Result};
5use aqueue::Actor;
6use futures_util::AsyncWriteExt;
7use log::*;
8use std::future::Future;
9use std::ops::Deref;
10use std::sync::Arc;
11
12pub struct WebSocketClient {
13 inner: WebSocketInner,
14 disconnect: bool
15}
16
17impl WebSocketClient {
18 #[inline]
19 pub async fn connect<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
20 addr: &str,
21 input: impl FnOnce(A, Arc<Actor<Self>>, WebSocketReader) -> F + Send + 'static,
22 token: A,
23 ) -> Result<Arc<Actor<Self>>> {
24 let ws = WebsocketIO::new(addr).await?;
25 Self::init(input, token, ws)
26 }
27
28 #[inline]
29 pub async fn connect_wss<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
30 addr: &str,
31 input: impl FnOnce(A, Arc<Actor<Self>>, WebSocketReader) -> F + Send + 'static,
32 token: A,
33 ) -> Result<Arc<Actor<Self>>> {
34 let ws = WebsocketIO::new_wss(addr).await?;
35 Self::init(input, token, ws)
36 }
37
38 #[inline]
39 fn init<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
40 input: impl FnOnce(A, Arc<Actor<WebSocketClient>>, WebSocketReader) -> F + Send + 'static,
41 token: A,
42 ws: WebsocketIO,
43 ) -> Result<Arc<Actor<WebSocketClient>>, Error> {
44 let (reader, write) = ws.split();
45 let client = Arc::new(Actor::new(WebSocketClient {
46 disconnect: false,
47 inner: write,
48 }));
49 let read_client = client.clone();
50 wasm_bindgen_futures::spawn_local(async move {
51 let disconnect_client = read_client.clone();
52 let need_disconnect = match input(token, read_client, reader).await {
53 Ok(disconnect) => disconnect,
54 Err(err) => {
55 error!("reader error:{}", err);
56 true
57 }
58 };
59
60 if need_disconnect {
61 if let Err(er) = disconnect_client.disconnect().await {
62 error!("disconnect to{} err:{}", disconnect_client.get_addr(), er);
63 } else {
64 debug!("disconnect to {}", disconnect_client.get_addr())
65 }
66 } else {
67 debug!("{} reader is close", disconnect_client.get_addr());
68 }
69 });
70
71 Ok(client)
72 }
73
74 #[inline]
75 async fn close(&mut self) -> Result<()> {
76 if !self.disconnect {
77 match self.inner.ws.close() {
78 Ok(_) => {
79 self.disconnect = true;
80 }
81 Err(err) => bail!("websocket close error:{:?}", err),
82 }
83 }
84 Ok(())
85 }
86
87 #[inline]
88 async fn send<'a>(&'a mut self, buff: &'a [u8]) -> Result<usize> {
89 if !self.disconnect {
90 Ok(self.inner.write(buff).await?)
91 } else {
92 bail!("Send Error,Disconnect")
93 }
94 }
95
96 #[inline]
97 async fn send_all<'a>(&'a mut self, buff: &'a [u8]) -> Result<()> {
98 if !self.disconnect {
99 Ok(self.inner.write_all(buff).await?)
100 } else {
101 bail!("Send Error,Disconnect")
102 }
103 }
104
105 #[inline]
106 async fn flush(&mut self) -> Result<()> {
107 if !self.disconnect {
108 Ok(self.inner.flush().await?)
109 } else {
110 bail!("Send Error,Disconnect")
111 }
112 }
113}
114
115#[async_trait::async_trait]
116pub trait IWebSocketClient {
117 fn get_addr(&self) -> String;
118 async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(&self, buff: B)
119 -> Result<usize>;
120 async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
121 &self,
122 buff: B,
123 ) -> Result<()>;
124 async fn send_ref(&self, buff: &[u8]) -> Result<usize>;
125 async fn send_all_ref(&self, buff: &[u8]) -> Result<()>;
126 async fn flush(&self) -> Result<()>;
127 async fn disconnect(&self) -> Result<()>;
128 fn is_disconnect(&self) -> bool;
129}
130
131#[async_trait::async_trait]
132impl IWebSocketClient for Actor<WebSocketClient> {
133 #[inline]
134 fn get_addr(&self) -> String {
135 unsafe { self.deref_inner().inner.ws_url.clone() }
136 }
137 #[inline]
138 async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
139 &self,
140 buff: B,
141 ) -> Result<usize> {
142 self.inner_call(|inner| async move { inner.get_mut().send(&buff).await })
143 .await
144 }
145
146 #[inline]
147 async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
148 &self,
149 buff: B,
150 ) -> Result<()> {
151 self.inner_call(|inner| async move { inner.get_mut().send_all(&buff).await })
152 .await
153 }
154
155 #[inline]
156 async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
157 self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
158 .await
159 }
160
161 #[inline]
162 async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
163 self.inner_call(|inner| async move { inner.get_mut().send_all(buff).await })
164 .await
165 }
166
167 #[inline]
168 async fn flush(&self) -> Result<()> {
169 self.inner_call(|inner| async move { inner.get_mut().flush().await })
170 .await
171 }
172 #[inline]
173 async fn disconnect(&self) -> Result<()> {
174 self.inner_call(|inner| async move { inner.get_mut().close().await })
175 .await
176 }
177 #[inline]
178 fn is_disconnect(&self) -> bool {
179 unsafe { self.deref_inner().disconnect }
180 }
181}