1use crate::error::Result;
2use crate::peer::TCPPeer;
3use aqueue::Actor;
4use log::*;
5use std::error::Error;
6use std::future::Future;
7use std::marker::PhantomData;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncRead, AsyncWrite, ReadHalf};
11use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
12use tokio::task::JoinHandle;
13
14pub type ConnectEventType = fn(SocketAddr) -> bool;
15
16pub struct TCPServer<I, R, T, B, C, IST> {
17 listener: Option<TcpListener>,
18 connect_event: Option<ConnectEventType>,
19 stream_init: Arc<IST>,
20 input_event: Option<I>,
21 _phantom1: PhantomData<R>,
22 _phantom2: PhantomData<T>,
23 _phantom3: PhantomData<C>,
24 _phantom4: PhantomData<B>,
25}
26
27unsafe impl<I, R, T, B, C, IST> Send for TCPServer<I, R, T, B, C, IST> {}
28unsafe impl<I, R, T, B, C, IST> Sync for TCPServer<I, R, T, B, C, IST> {}
29
30impl<I, R, T, B, C, IST> TCPServer<I, R, T, B, C, IST>
31where
32 I: Fn(ReadHalf<C>, Arc<TCPPeer<C>>, T) -> R + Send + Clone + 'static,
33 R: Future<Output = anyhow::Result<()>> + Send + 'static,
34 T: Clone + Send + 'static,
35 B: Future<Output = anyhow::Result<C>> + Send + 'static,
36 C: AsyncRead + AsyncWrite + Send + Sync + 'static,
37 IST: Fn(TcpStream) -> B + Send + Sync + 'static,
38{
39 pub(crate) async fn new<A: ToSocketAddrs>(
41 addr: A,
42 stream_init: IST,
43 input: I,
44 connect_event: Option<ConnectEventType>,
45 ) -> Result<Arc<Actor<TCPServer<I, R, T, B, C, IST>>>, Box<dyn Error>> {
46 let listener = TcpListener::bind(addr).await?;
47 Ok(Arc::new(Actor::new(TCPServer {
48 listener: Some(listener),
49 connect_event,
50 stream_init: Arc::new(stream_init),
51 input_event: Some(input),
52 _phantom1: Default::default(),
53 _phantom2: Default::default(),
54 _phantom3: Default::default(),
55 _phantom4: Default::default(),
56 })))
57 }
58
59 pub async fn start(&mut self, token: T) -> Result<JoinHandle<anyhow::Result<()>>> {
61 if let Some(listener) = self.listener.take() {
62 let connect_event = self.connect_event.take();
63 let input_event = self.input_event.take().unwrap();
64 let stream_init = self.stream_init.clone();
65 let join: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
66 loop {
67 let (socket, addr) = listener.accept().await?;
68 if let Some(ref connect_event) = connect_event {
69 if !connect_event(addr) {
70 warn!("addr:{} not connect", addr);
71 continue;
72 }
73 }
74 trace!("start read:{}", addr);
75 let input = input_event.clone();
76 let peer_token = token.clone();
77 let stream_init = stream_init.clone();
78 tokio::spawn(async move {
79 match (*stream_init)(socket).await {
80 Ok(socket) => {
81 let (reader, sender) = tokio::io::split(socket);
82 let peer = TCPPeer::new(addr, sender);
83 if let Err(err) = input(reader, peer.clone(), peer_token).await {
84 error!("input data error:{}", err);
85 }
86 if let Err(er) = peer.disconnect().await {
87 debug!("disconnect client:{:?} err:{}", peer.addr(), er);
88 } else {
89 debug!("{} disconnect", peer.addr())
90 }
91 }
92 Err(err) => {
93 warn!("init stream err:{}", err);
94 }
95 }
96 });
97 }
98 });
99
100 Ok(join)
101 } else {
102 Err(crate::error::Error::NotListenerError)
103 }
104 }
105}
106
107#[async_trait::async_trait]
108pub trait ITCPServer<T> {
109 async fn start(&self, token: T) -> anyhow::Result<JoinHandle<anyhow::Result<()>>>;
110 async fn start_block(&self, token: T) -> anyhow::Result<()>;
111}
112
113#[async_trait::async_trait]
114impl<I, R, T, B, C, IST> ITCPServer<T> for Actor<TCPServer<I, R, T, B, C, IST>>
115where
116 I: Fn(ReadHalf<C>, Arc<TCPPeer<C>>, T) -> R + Send + Sync + Clone + 'static,
117 R: Future<Output = anyhow::Result<()>> + Send + 'static,
118 T: Clone + Send + Sync + 'static,
119 B: Future<Output = anyhow::Result<C>> + Send + 'static,
120 C: AsyncRead + AsyncWrite + Send + Sync + 'static,
121 IST: Fn(TcpStream) -> B + Send + Sync + 'static,
122{
123 async fn start(&self, token: T) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
124 self.inner_call(|inner| async move { Ok(inner.get_mut().start(token).await?) })
125 .await
126 }
127
128 async fn start_block(&self, token: T) -> anyhow::Result<()> {
129 Self::start(self, token).await?.await??;
130 Ok(())
131 }
132}