websocket-client-async 0.1.4

Asynchronous websocket client.
Documentation
mod websocket_io;

use crate::websocket_io::{WebSocketReader, WebSocketInner, WebsocketIO};
use anyhow::{bail, Error, Result};
use aqueue::Actor;
use futures_util::AsyncWriteExt;
use log::*;
use std::future::Future;
use std::ops::Deref;
use std::sync::Arc;

pub struct WebSocketClient {
    inner: WebSocketInner,
    disconnect: bool
}

impl WebSocketClient {
    #[inline]
    pub async fn connect<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
        addr: &str,
        input: impl FnOnce(A, Arc<Actor<Self>>, WebSocketReader) -> F + Send + 'static,
        token: A,
    ) -> Result<Arc<Actor<Self>>> {
        let ws = WebsocketIO::new(addr).await?;
        Self::init(input, token, ws)
    }

    #[inline]
    pub async fn connect_wss<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
        addr: &str,
        input: impl FnOnce(A, Arc<Actor<Self>>, WebSocketReader) -> F + Send + 'static,
        token: A,
    ) -> Result<Arc<Actor<Self>>> {
        let ws = WebsocketIO::new_wss(addr).await?;
        Self::init(input, token, ws)
    }

    #[inline]
    fn init<F: Future<Output = Result<bool>> + 'static, A: Send + 'static>(
        input: impl FnOnce(A, Arc<Actor<WebSocketClient>>, WebSocketReader) -> F + Send + 'static,
        token: A,
        ws: WebsocketIO,
    ) -> Result<Arc<Actor<WebSocketClient>>, Error> {
        let (reader, write) = ws.split();
        let client = Arc::new(Actor::new(WebSocketClient {
            disconnect: false,
            inner: write,
        }));
        let read_client = client.clone();
        wasm_bindgen_futures::spawn_local(async move {
            let disconnect_client = read_client.clone();
            let need_disconnect = match input(token, read_client, reader).await {
                Ok(disconnect) => disconnect,
                Err(err) => {
                    error!("reader error:{}", err);
                    true
                }
            };

            if need_disconnect {
                if let Err(er) = disconnect_client.disconnect().await {
                    error!("disconnect to{} err:{}", disconnect_client.get_addr(), er);
                } else {
                    debug!("disconnect to {}", disconnect_client.get_addr())
                }
            } else {
                debug!("{} reader is close", disconnect_client.get_addr());
            }
        });

        Ok(client)
    }

    #[inline]
    async fn close(&mut self) -> Result<()> {
        if !self.disconnect {
            match self.inner.ws.close() {
                Ok(_) => {
                    self.disconnect = true;
                }
                Err(err) => bail!("websocket close error:{:?}", err),
            }
        }
        Ok(())
    }

    #[inline]
    async fn send<'a>(&'a mut self, buff: &'a [u8]) -> Result<usize> {
        if !self.disconnect {
            Ok(self.inner.write(buff).await?)
        } else {
            bail!("Send Error,Disconnect")
        }
    }

    #[inline]
    async fn send_all<'a>(&'a mut self, buff: &'a [u8]) -> Result<()> {
        if !self.disconnect {
            Ok(self.inner.write_all(buff).await?)
        } else {
            bail!("Send Error,Disconnect")
        }
    }

    #[inline]
    async fn flush(&mut self) -> Result<()> {
        if !self.disconnect {
            Ok(self.inner.flush().await?)
        } else {
            bail!("Send Error,Disconnect")
        }
    }
}

#[async_trait::async_trait]
pub trait IWebSocketClient {
    fn get_addr(&self) -> String;
    async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(&self, buff: B)
        -> Result<usize>;
    async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
        &self,
        buff: B,
    ) -> Result<()>;
    async fn send_ref(&self, buff: &[u8]) -> Result<usize>;
    async fn send_all_ref(&self, buff: &[u8]) -> Result<()>;
    async fn flush(&self) -> Result<()>;
    async fn disconnect(&self) -> Result<()>;
    fn is_disconnect(&self) -> bool;
}

#[async_trait::async_trait]
impl IWebSocketClient for Actor<WebSocketClient> {
    #[inline]
    fn get_addr(&self) -> String {
        unsafe { self.deref_inner().inner.ws_url.clone() }
    }
    #[inline]
    async fn send<B: Deref<Target = [u8]> + Send + Sync + 'static>(
        &self,
        buff: B,
    ) -> Result<usize> {
        self.inner_call(|inner| async move { inner.get_mut().send(&buff).await })
            .await
    }

    #[inline]
    async fn send_all<B: Deref<Target = [u8]> + Send + Sync + 'static>(
        &self,
        buff: B,
    ) -> Result<()> {
        self.inner_call(|inner| async move { inner.get_mut().send_all(&buff).await })
            .await
    }

    #[inline]
    async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
        self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
            .await
    }

    #[inline]
    async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
        self.inner_call(|inner| async move { inner.get_mut().send_all(buff).await })
            .await
    }

    #[inline]
    async fn flush(&self) -> Result<()> {
        self.inner_call(|inner| async move { inner.get_mut().flush().await })
            .await
    }
    #[inline]
    async fn disconnect(&self) -> Result<()> {
        self.inner_call(|inner| async move { inner.get_mut().close().await })
            .await
    }
    #[inline]
    fn is_disconnect(&self) -> bool {
        unsafe { self.deref_inner().disconnect }
    }
}