1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use log::*;
use std::cell::RefCell;
use std::error::Error;
use std::future::Future;
use std::net::SocketAddr;
use std::option::Option::Some;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, ToSocketAddrs};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use xbinary::XBWrite;
use crate::peer::TCPPeer;
pub type ConnectEventType = fn(SocketAddr) -> bool;
pub struct TCPServer<I, R>
where
I: Fn(TCPPeer) -> R + Send + Sync + 'static,
R: Future<Output = ()> + Send,
{
listener: RefCell<Option<TcpListener>>,
connect_event: RefCell<Option<ConnectEventType>>,
input_event: Arc<I>,
}
impl<I, R> TCPServer<I, R>
where
I: Fn(TCPPeer) -> R + Send + Sync + 'static,
R: Future<Output = ()> + Send,
{
pub async fn new<T: ToSocketAddrs>(
addr: T,
input: I,
) -> Result<TCPServer<I, R>, Box<dyn Error>> {
let listener = TcpListener::bind(addr).await?;
Ok(TCPServer {
listener: RefCell::new(Some(listener)),
connect_event: RefCell::new(None),
input_event: Arc::new(input),
})
}
pub fn set_connection_event(&self, f: ConnectEventType) {
self.connect_event.replace(Some(f));
}
pub async fn start(&self) -> Result<(), Box<dyn Error>> {
if let Some(mut listener) = self.listener.borrow_mut().take() {
loop {
let (socket, addr) = listener.accept().await?;
if let Some(connect_event) = *self.connect_event.borrow() {
if !connect_event(addr) {
warn!("addr:{} not connect", addr);
continue;
}
}
trace!("start read:{}", addr);
let (tx, mut rx): (Sender<XBWrite>, Receiver<XBWrite>) = channel(1024);
let (reader, mut sender) = socket.into_split();
tokio::spawn(async move {
while let Some(buff) = rx.recv().await {
if buff.is_empty() {
if let Err(er) = sender.shutdown().await {
error!("{} disconnect error:{}", addr, er);
}
break;
} else if let Err(er) = sender.write(&buff).await {
error!("{} send buffer error:{}", addr, er);
}
}
});
let peer = TCPPeer::new(addr, reader, tx);
let input = self.input_event.clone();
tokio::spawn(async move {
(*input)(peer).await;
});
}
}
Err("not listener or repeat start".into())
}
}