1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::peer::{IPeer, WSPeer};
use anyhow::{bail, Result};
use aqueue::Actor;
use futures_util::stream::SplitStream;
use futures_util::StreamExt;
use log::*;
use std::future::Future;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio::task::JoinHandle;
use tokio_tungstenite::{accept_async, WebSocketStream};
pub type ConnectEventType = fn(SocketAddr) -> bool;
pub struct WebSocketServer<I, R, T> {
listener: Option<TcpListener>,
connect_event: Option<ConnectEventType>,
input_event: Arc<I>,
_phantom1: PhantomData<R>,
_phantom2: PhantomData<T>,
}
unsafe impl<I, R, T> Send for WebSocketServer<I, R, T> {}
unsafe impl<I, R, T> Sync for WebSocketServer<I, R, T> {}
impl<I, R, T> WebSocketServer<I, R, T>
where
I: Fn(SplitStream<WebSocketStream<TcpStream>>, Arc<Actor<WSPeer>>, T) -> R
+ Send
+ Sync
+ 'static,
R: Future<Output = Result<()>> + Send + 'static,
T: Clone + Send + 'static,
{
pub(crate) async fn new<A: ToSocketAddrs>(
addr: A,
input: I,
connect_event: Option<ConnectEventType>,
) -> Result<Arc<Actor<WebSocketServer<I, R, T>>>> {
let listener = TcpListener::bind(addr).await?;
Ok(Arc::new(Actor::new(WebSocketServer {
listener: Some(listener),
connect_event,
input_event: Arc::new(input),
_phantom1: PhantomData::default(),
_phantom2: PhantomData::default(),
})))
}
pub async fn start(&mut self, token: T) -> Result<JoinHandle<Result<()>>> {
if let Some(listener) = self.listener.take() {
let connect_event = self.connect_event.take();
let input_event = self.input_event.clone();
let join: JoinHandle<Result<()>> = tokio::spawn(async move {
loop {
let (socket, addr) = listener.accept().await?;
if let Some(ref connect_event) = connect_event {
if !connect_event(addr) {
warn!("addr:{} not connect", addr);
continue;
}
}
trace!("start read:{}", addr);
let input = input_event.clone();
let peer_token = token.clone();
tokio::spawn(async move {
match accept_async(socket).await {
Ok(ws_stream) => {
let (sender, reader) = ws_stream.split();
let peer = WSPeer::new(addr, sender);
if let Err(err) = (*input)(reader, peer.clone(), peer_token).await {
error!("input data error:{}", err);
}
if let Err(er) = peer.disconnect().await {
debug!("disconnect client:{:?} err:{}", peer.addr(), er);
} else {
debug!("{} disconnect", peer.addr())
}
}
Err(err) => {
error!("init websocket stream error:{:?}", err);
}
}
});
}
});
return Ok(join);
}
bail!("not listener or repeat start")
}
}
#[async_trait::async_trait]
pub trait IWebSocketServer<T> {
async fn start(&self, token: T) -> Result<JoinHandle<Result<()>>>;
async fn start_block(&self, token: T) -> Result<()>;
}
#[async_trait::async_trait]
impl<I, R, T> IWebSocketServer<T> for Actor<WebSocketServer<I, R, T>>
where
I: Fn(SplitStream<WebSocketStream<TcpStream>>, Arc<Actor<WSPeer>>, T) -> R
+ Send
+ Sync
+ 'static,
R: Future<Output = Result<()>> + Send + 'static,
T: Clone + Send + Sync + 'static,
{
async fn start(&self, token: T) -> Result<JoinHandle<Result<()>>> {
self.inner_call(|inner| async move { inner.get_mut().start(token).await })
.await
}
async fn start_block(&self, token: T) -> Result<()> {
Self::start(self, token).await?.await??;
Ok(())
}
}