websocket_server_async/
peer.rs

1use crate::stream::MaybeRustlsStream;
2use anyhow::{bail, ensure, Result};
3use aqueue::Actor;
4use futures_util::stream::SplitSink;
5use futures_util::SinkExt;
6use std::net::SocketAddr;
7use std::sync::Arc;
8use tokio::net::TcpStream;
9use tokio_tungstenite::tungstenite::Message;
10use tokio_tungstenite::WebSocketStream;
11
12pub struct WSPeer {
13    addr: SocketAddr,
14    sender: Option<SplitSink<WebSocketStream<MaybeRustlsStream<TcpStream>>, Message>>,
15}
16
17impl WSPeer {
18    /// 创建一个TCP PEER
19    #[inline]
20    pub fn new(
21        addr: SocketAddr,
22        sender: SplitSink<WebSocketStream<MaybeRustlsStream<TcpStream>>, Message>,
23    ) -> Arc<Actor<WSPeer>> {
24        Arc::new(Actor::new(WSPeer {
25            addr,
26            sender: Some(sender),
27        }))
28    }
29    /// 是否断线
30    #[inline]
31    fn is_disconnect(&self) -> bool {
32        self.sender.is_none()
33    }
34
35    /// 发送
36    #[inline]
37    async fn send_message(&mut self, message: Message) -> Result<()> {
38        if let Some(ref mut sender) = self.sender {
39            sender.send(message).await?;
40            Ok(())
41        } else {
42            bail!("ConnectionReset")
43        }
44    }
45
46    /// 发送
47    #[inline]
48    async fn send_vec(&mut self, buff: Vec<u8>) -> Result<usize> {
49        if let Some(ref mut sender) = self.sender {
50            let len = buff.len();
51            sender.send(Message::Binary(buff)).await?;
52            Ok(len)
53        } else {
54            bail!("ConnectionReset")
55        }
56    }
57
58    /// 发送
59    #[inline]
60    async fn send<'a>(&'a mut self, buff: &'a [u8]) -> Result<usize> {
61        if let Some(ref mut sender) = self.sender {
62            sender.send(Message::binary(buff)).await?;
63            Ok(buff.len())
64        } else {
65            bail!("ConnectionReset")
66        }
67    }
68
69    /// flush
70    #[inline]
71    async fn flush(&mut self) -> Result<()> {
72        if let Some(ref mut sender) = self.sender {
73            sender.flush().await?;
74            Ok(())
75        } else {
76            bail!("ConnectionReset")
77        }
78    }
79
80    /// 掐线
81    #[inline]
82    async fn disconnect(&mut self) -> Result<()> {
83        if let Some(mut sender) = self.sender.take() {
84            sender.close().await?;
85            Ok(())
86        } else {
87            Ok(())
88        }
89    }
90}
91
92pub trait IPeer: Sync + Send {
93    fn addr(&self) -> SocketAddr;
94    fn is_disconnect(&self) -> impl std::future::Future<Output = Result<bool>>;
95    fn send_message(&self, message: Message) -> impl std::future::Future<Output = Result<()>>;
96    fn send(&self, buff: Vec<u8>) -> impl std::future::Future<Output = Result<usize>>;
97    fn send_all(&self, buff: Vec<u8>) -> impl std::future::Future<Output = Result<()>>;
98    fn send_ref(&self, buff: &[u8]) -> impl std::future::Future<Output = Result<usize>>;
99    fn send_all_ref(&self, buff: &[u8]) -> impl std::future::Future<Output = Result<()>>;
100    fn flush(&self) -> impl std::future::Future<Output = Result<()>>;
101    fn disconnect(&self) -> impl std::future::Future<Output = Result<()>>;
102}
103
104impl IPeer for Actor<WSPeer> {
105    #[inline]
106    fn addr(&self) -> SocketAddr {
107        unsafe { self.deref_inner().addr }
108    }
109
110    #[inline]
111    async fn is_disconnect(&self) -> Result<bool> {
112        self.inner_call(|inner| async move { Ok(inner.get().is_disconnect()) })
113            .await
114    }
115    #[inline]
116    async fn send_message(&self, message: Message) -> Result<()> {
117        self.inner_call(|inner| async move { inner.get_mut().send_message(message).await })
118            .await
119    }
120
121    #[inline]
122    async fn send(&self, buff: Vec<u8>) -> Result<usize> {
123        ensure!(!buff.is_empty(), "send buff is null");
124        self.inner_call(|inner| async move { inner.get_mut().send_vec(buff).await })
125            .await
126    }
127    #[inline]
128    async fn send_all(&self, buff: Vec<u8>) -> Result<()> {
129        ensure!(!buff.is_empty(), "send buff is null");
130        self.inner_call(|inner| async move {
131            inner.get_mut().send_vec(buff).await?;
132            Ok(())
133        })
134        .await
135    }
136    #[inline]
137    async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
138        self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
139            .await
140    }
141
142    #[inline]
143    async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
144        self.inner_call(|inner| async move {
145            inner.get_mut().send(buff).await?;
146            Ok(())
147        })
148        .await
149    }
150
151    #[inline]
152    async fn flush(&self) -> Result<()> {
153        self.inner_call(|inner| async move { inner.get_mut().flush().await })
154            .await
155    }
156
157    #[inline]
158    async fn disconnect(&self) -> Result<()> {
159        self.inner_call(|inner| async move { inner.get_mut().disconnect().await })
160            .await
161    }
162}