websocket_client_async/
lib.rs

1mod websocket_io;
2
3use crate::websocket_io::{WebSocketReader, WebSocketInner, WebsocketIO};
4use anyhow::{bail, Error, Result};
5use aqueue::Actor;
6use futures_util::AsyncWriteExt;
7use log::*;
8use std::future::Future;
9use std::ops::Deref;
10use std::sync::Arc;
11
12pub struct WebSocketClient {
13    inner: WebSocketInner,
14    disconnect: bool
15}
16
17impl WebSocketClient {
18    #[inline]
19    pub async fn connect<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
20        addr: &str,
21        input: impl FnOnce(A, Arc<Actor<Self>>, WebSocketReader) -> F + Send + 'static,
22        token: A,
23    ) -> Result<Arc<Actor<Self>>> {
24        let ws = WebsocketIO::new(addr).await?;
25        Self::init(input, token, ws)
26    }
27
28    #[inline]
29    pub async fn connect_wss<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
30        addr: &str,
31        input: impl FnOnce(A, Arc<Actor<Self>>, WebSocketReader) -> F + Send + 'static,
32        token: A,
33    ) -> Result<Arc<Actor<Self>>> {
34        let ws = WebsocketIO::new_wss(addr).await?;
35        Self::init(input, token, ws)
36    }
37
38    #[inline]
39    fn init<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
40        input: impl FnOnce(A, Arc<Actor<WebSocketClient>>, WebSocketReader) -> F + Send + 'static,
41        token: A,
42        ws: WebsocketIO,
43    ) -> Result<Arc<Actor<WebSocketClient>>, Error> {
44        let (reader, write) = ws.split();
45        let client = Arc::new(Actor::new(WebSocketClient {
46            disconnect: false,
47            inner: write,
48        }));
49        let read_client = client.clone();
50        wasm_bindgen_futures::spawn_local(async move {
51            let disconnect_client = read_client.clone();
52            let need_disconnect = match input(token, read_client, reader).await {
53                Ok(disconnect) => disconnect,
54                Err(err) => {
55                    error!("reader error:{}", err);
56                    true
57                }
58            };
59
60            if need_disconnect {
61                if let Err(er) = disconnect_client.disconnect().await {
62                    error!("disconnect to{} err:{}", disconnect_client.get_addr(), er);
63                } else {
64                    debug!("disconnect to {}", disconnect_client.get_addr())
65                }
66            } else {
67                debug!("{} reader is close", disconnect_client.get_addr());
68            }
69        });
70
71        Ok(client)
72    }
73
74    #[inline]
75    async fn close(&mut self) -> Result<()> {
76        if !self.disconnect {
77            match self.inner.ws.close() {
78                Ok(_) => {
79                    self.disconnect = true;
80                }
81                Err(err) => bail!("websocket close error:{:?}", err),
82            }
83        }
84        Ok(())
85    }
86
87    #[inline]
88    async fn send<'a>(&'a mut self, buff: &'a [u8]) -> Result<usize> {
89        if !self.disconnect {
90            Ok(self.inner.write(buff).await?)
91        } else {
92            bail!("Send Error,Disconnect")
93        }
94    }
95
96    #[inline]
97    async fn send_all<'a>(&'a mut self, buff: &'a [u8]) -> Result<()> {
98        if !self.disconnect {
99            Ok(self.inner.write_all(buff).await?)
100        } else {
101            bail!("Send Error,Disconnect")
102        }
103    }
104
105    #[inline]
106    async fn flush(&mut self) -> Result<()> {
107        if !self.disconnect {
108            Ok(self.inner.flush().await?)
109        } else {
110            bail!("Send Error,Disconnect")
111        }
112    }
113}
114
115#[async_trait::async_trait]
116pub trait IWebSocketClient {
117    fn get_addr(&self) -> String;
118    async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(&self, buff: B)
119        -> Result<usize>;
120    async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
121        &self,
122        buff: B,
123    ) -> Result<()>;
124    async fn send_ref(&self, buff: &[u8]) -> Result<usize>;
125    async fn send_all_ref(&self, buff: &[u8]) -> Result<()>;
126    async fn flush(&self) -> Result<()>;
127    async fn disconnect(&self) -> Result<()>;
128    fn is_disconnect(&self) -> bool;
129}
130
131#[async_trait::async_trait]
132impl IWebSocketClient for Actor<WebSocketClient> {
133    #[inline]
134    fn get_addr(&self) -> String {
135        unsafe { self.deref_inner().inner.ws_url.clone() }
136    }
137    #[inline]
138    async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
139        &self,
140        buff: B,
141    ) -> Result<usize> {
142        self.inner_call(|inner| async move { inner.get_mut().send(&buff).await })
143            .await
144    }
145
146    #[inline]
147    async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
148        &self,
149        buff: B,
150    ) -> Result<()> {
151        self.inner_call(|inner| async move { inner.get_mut().send_all(&buff).await })
152            .await
153    }
154
155    #[inline]
156    async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
157        self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
158            .await
159    }
160
161    #[inline]
162    async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
163        self.inner_call(|inner| async move { inner.get_mut().send_all(buff).await })
164            .await
165    }
166
167    #[inline]
168    async fn flush(&self) -> Result<()> {
169        self.inner_call(|inner| async move { inner.get_mut().flush().await })
170            .await
171    }
172    #[inline]
173    async fn disconnect(&self) -> Result<()> {
174        self.inner_call(|inner| async move { inner.get_mut().close().await })
175            .await
176    }
177    #[inline]
178    fn is_disconnect(&self) -> bool {
179        unsafe { self.deref_inner().disconnect }
180    }
181}