websocket_server_async/
peer.rs1use crate::stream::MaybeRustlsStream;
2use anyhow::{bail, ensure, Result};
3use aqueue::Actor;
4use futures_util::stream::SplitSink;
5use futures_util::SinkExt;
6use std::net::SocketAddr;
7use std::sync::Arc;
8use tokio::net::TcpStream;
9use tokio_tungstenite::tungstenite::Message;
10use tokio_tungstenite::WebSocketStream;
11
12pub struct WSPeer {
13 addr: SocketAddr,
14 sender: Option<SplitSink<WebSocketStream<MaybeRustlsStream<TcpStream>>, Message>>,
15}
16
17impl WSPeer {
18 #[inline]
20 pub fn new(
21 addr: SocketAddr,
22 sender: SplitSink<WebSocketStream<MaybeRustlsStream<TcpStream>>, Message>,
23 ) -> Arc<Actor<WSPeer>> {
24 Arc::new(Actor::new(WSPeer {
25 addr,
26 sender: Some(sender),
27 }))
28 }
29 #[inline]
31 fn is_disconnect(&self) -> bool {
32 self.sender.is_none()
33 }
34
35 #[inline]
37 async fn send_message(&mut self, message: Message) -> Result<()> {
38 if let Some(ref mut sender) = self.sender {
39 sender.send(message).await?;
40 Ok(())
41 } else {
42 bail!("ConnectionReset")
43 }
44 }
45
46 #[inline]
48 async fn send_vec(&mut self, buff: Vec<u8>) -> Result<usize> {
49 if let Some(ref mut sender) = self.sender {
50 let len = buff.len();
51 sender.send(Message::Binary(buff)).await?;
52 Ok(len)
53 } else {
54 bail!("ConnectionReset")
55 }
56 }
57
58 #[inline]
60 async fn send<'a>(&'a mut self, buff: &'a [u8]) -> Result<usize> {
61 if let Some(ref mut sender) = self.sender {
62 sender.send(Message::binary(buff)).await?;
63 Ok(buff.len())
64 } else {
65 bail!("ConnectionReset")
66 }
67 }
68
69 #[inline]
71 async fn flush(&mut self) -> Result<()> {
72 if let Some(ref mut sender) = self.sender {
73 sender.flush().await?;
74 Ok(())
75 } else {
76 bail!("ConnectionReset")
77 }
78 }
79
80 #[inline]
82 async fn disconnect(&mut self) -> Result<()> {
83 if let Some(mut sender) = self.sender.take() {
84 sender.close().await?;
85 Ok(())
86 } else {
87 Ok(())
88 }
89 }
90}
91
92pub trait IPeer: Sync + Send {
93 fn addr(&self) -> SocketAddr;
94 fn is_disconnect(&self) -> impl std::future::Future<Output = Result<bool>>;
95 fn send_message(&self, message: Message) -> impl std::future::Future<Output = Result<()>>;
96 fn send(&self, buff: Vec<u8>) -> impl std::future::Future<Output = Result<usize>>;
97 fn send_all(&self, buff: Vec<u8>) -> impl std::future::Future<Output = Result<()>>;
98 fn send_ref(&self, buff: &[u8]) -> impl std::future::Future<Output = Result<usize>>;
99 fn send_all_ref(&self, buff: &[u8]) -> impl std::future::Future<Output = Result<()>>;
100 fn flush(&self) -> impl std::future::Future<Output = Result<()>>;
101 fn disconnect(&self) -> impl std::future::Future<Output = Result<()>>;
102}
103
104impl IPeer for Actor<WSPeer> {
105 #[inline]
106 fn addr(&self) -> SocketAddr {
107 unsafe { self.deref_inner().addr }
108 }
109
110 #[inline]
111 async fn is_disconnect(&self) -> Result<bool> {
112 self.inner_call(|inner| async move { Ok(inner.get().is_disconnect()) })
113 .await
114 }
115 #[inline]
116 async fn send_message(&self, message: Message) -> Result<()> {
117 self.inner_call(|inner| async move { inner.get_mut().send_message(message).await })
118 .await
119 }
120
121 #[inline]
122 async fn send(&self, buff: Vec<u8>) -> Result<usize> {
123 ensure!(!buff.is_empty(), "send buff is null");
124 self.inner_call(|inner| async move { inner.get_mut().send_vec(buff).await })
125 .await
126 }
127 #[inline]
128 async fn send_all(&self, buff: Vec<u8>) -> Result<()> {
129 ensure!(!buff.is_empty(), "send buff is null");
130 self.inner_call(|inner| async move {
131 inner.get_mut().send_vec(buff).await?;
132 Ok(())
133 })
134 .await
135 }
136 #[inline]
137 async fn send_ref(&self, buff: &[u8]) -> Result<usize> {
138 self.inner_call(|inner| async move { inner.get_mut().send(buff).await })
139 .await
140 }
141
142 #[inline]
143 async fn send_all_ref(&self, buff: &[u8]) -> Result<()> {
144 self.inner_call(|inner| async move {
145 inner.get_mut().send(buff).await?;
146 Ok(())
147 })
148 .await
149 }
150
151 #[inline]
152 async fn flush(&self) -> Result<()> {
153 self.inner_call(|inner| async move { inner.get_mut().flush().await })
154 .await
155 }
156
157 #[inline]
158 async fn disconnect(&self) -> Result<()> {
159 self.inner_call(|inner| async move { inner.get_mut().disconnect().await })
160 .await
161 }
162}