ezrpc 0.1.1

Ergonomic, flexible and Zero-cost RPC framework
Documentation
//! Websocket adapter implementation, for network communication, can also be used in WebPage via WASM

use crate::Service;
use async_lock::Mutex;
use std::{net::SocketAddr, sync::Arc};
use tokio::net::{
    tcp::{OwnedReadHalf, OwnedWriteHalf},
    TcpListener, TcpStream,
};
use ws_tool::{
    codec::{AsyncFrameCodec, AsyncFrameRecv, AsyncFrameSend, FrameConfig},
    frame::OpCode,
    http::Request,
    *,
};

pub struct WsAdapter {
    pub peer_addr: Option<SocketAddr>,
    pub request: Option<Request<()>>,
    sender: Mutex<AsyncFrameSend<OwnedWriteHalf>>,
    recver: Mutex<AsyncFrameRecv<OwnedReadHalf>>,
}

impl WsAdapter {
    pub async fn bind_accept(addr: SocketAddr) -> anyhow::Result<Self> {
        let listener = TcpListener::bind(addr).await?;
        Self::accept(&listener).await
    }

    pub async fn accept(listener: &TcpListener) -> anyhow::Result<Self> {
        let (stream, addr) = listener.accept().await?;
        let (request, server) =
            ServerBuilder::async_accept(stream, codec::default_handshake_handler, |req, stream| {
                let config = FrameConfig {
                    mask_send_frame: false,
                    ..Default::default()
                };
                Ok((req, AsyncFrameCodec::new_with(stream, config)))
            })
            .await?;
        let (r, s) = server.split();
        Ok(Self {
            peer_addr: Some(addr),
            request: Some(request),
            sender: Mutex::new(s),
            recver: Mutex::new(r),
        })
    }

    pub async fn accept_from_stream(stream: TcpStream) -> anyhow::Result<Self> {
        let peer_addr = stream.peer_addr().ok();
        let (request, server) =
            ServerBuilder::async_accept(stream, codec::default_handshake_handler, |req, stream| {
                let config = FrameConfig {
                    mask_send_frame: false,
                    ..Default::default()
                };
                Ok((req, AsyncFrameCodec::new_with(stream, config)))
            })
            .await?;
        let (r, s) = server.split();
        Ok(Self {
            peer_addr,
            request: Some(request),
            sender: Mutex::new(s),
            recver: Mutex::new(r),
        })
    }

    pub async fn connect<A: ToString>(address: A) -> anyhow::Result<Self> {
        let client = ClientBuilder::new()
            .async_connect(address.to_string().parse()?, AsyncFrameCodec::check_fn)
            .await?;
        let (r, s) = client.split();
        Ok(Self {
            peer_addr: None,
            request: None,
            sender: Mutex::new(s),
            recver: Mutex::new(r),
        })
    }
}

const MAX_PACK_SIZE: usize = 60 * 1024;

#[async_trait::async_trait]
impl crate::Adapter for WsAdapter {
    async fn send(&self, pack: Vec<u8>) -> anyhow::Result<()> {
        let mut sender = self.sender.lock().await;

        for c in pack.chunks(MAX_PACK_SIZE) {
            sender.send(OpCode::Binary, c).await?;
        }

        Ok(())
    }

    async fn recv(&self) -> anyhow::Result<Vec<u8>> {
        let mut recver = self.recver.lock().await;
        let mut result = vec![];

        loop {
            let msg = recver.receive().await?;
            if msg.code().is_close() {
                return Err(crate::error::Disconnect.into());
            }

            let msg = msg.payload();
            result.extend_from_slice(msg.as_ref());
            if msg.as_ref().len() < MAX_PACK_SIZE {
                break;
            }
        }

        Ok(result)
    }
}

impl crate::session::Session {
    pub fn ws_adapter(&self) -> Option<Arc<WsAdapter>> {
        self.downcast_adapter()
    }

    pub async fn ws_connect<A: ToString + AsRef<str>, S: Service>(
        address: A,
        service: S,
    ) -> anyhow::Result<Self> {
        let a = WsAdapter::connect(address).await?;
        Ok(Self::from(a, service))
    }

    #[cfg(not(target_arch = "wasm32"))]
    pub async fn ws_accept<A: tokio::net::ToSocketAddrs, S: Service>(
        address: A,
        service: S,
    ) -> anyhow::Result<Self> {
        let a = WsAdapter::accept(&tokio::net::TcpListener::bind(address).await?).await?;
        Ok(Self::from(a, service))
    }
}