tcp_channel_server/
tcpserver.rs

1use crate::error::Result;
2use crate::peer::TCPPeer;
3use aqueue::Actor;
4use log::*;
5use std::error::Error;
6use std::future::Future;
7use std::marker::PhantomData;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::io::{AsyncRead, AsyncWrite, ReadHalf};
11use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
12use tokio::task::JoinHandle;
13
14pub type ConnectEventType = fn(SocketAddr) -> bool;
15
16pub struct TCPServer<I, R, T, B, C, IST> {
17    listener: Option<TcpListener>,
18    connect_event: Option<ConnectEventType>,
19    stream_init: Arc<IST>,
20    input_event: Option<I>,
21    _phantom1: PhantomData<R>,
22    _phantom2: PhantomData<T>,
23    _phantom3: PhantomData<C>,
24    _phantom4: PhantomData<B>,
25}
26
27unsafe impl<I, R, T, B, C, IST> Send for TCPServer<I, R, T, B, C, IST> {}
28unsafe impl<I, R, T, B, C, IST> Sync for TCPServer<I, R, T, B, C, IST> {}
29
30impl<I, R, T, B, C, IST> TCPServer<I, R, T, B, C, IST>
31where
32    I: Fn(ReadHalf<C>, Arc<TCPPeer<C>>, T) -> R + Send + Clone + 'static,
33    R: Future<Output = anyhow::Result<()>> + Send + 'static,
34    T: Clone + Send + 'static,
35    B: Future<Output = anyhow::Result<C>> + Send + 'static,
36    C: AsyncRead + AsyncWrite + Send + Sync + 'static,
37    IST: Fn(TcpStream) -> B + Send + Sync + 'static,
38{
39    /// 创建一个新的TCP服务
40    pub(crate) async fn new<A: ToSocketAddrs>(
41        addr: A,
42        stream_init: IST,
43        input: I,
44        connect_event: Option<ConnectEventType>,
45    ) -> Result<Arc<Actor<TCPServer<I, R, T, B, C, IST>>>, Box<dyn Error>> {
46        let listener = TcpListener::bind(addr).await?;
47        Ok(Arc::new(Actor::new(TCPServer {
48            listener: Some(listener),
49            connect_event,
50            stream_init: Arc::new(stream_init),
51            input_event: Some(input),
52            _phantom1: Default::default(),
53            _phantom2: Default::default(),
54            _phantom3: Default::default(),
55            _phantom4: Default::default(),
56        })))
57    }
58
59    /// 启动TCP服务
60    pub async fn start(&mut self, token: T) -> Result<JoinHandle<anyhow::Result<()>>> {
61        if let Some(listener) = self.listener.take() {
62            let connect_event = self.connect_event.take();
63            let input_event = self.input_event.take().unwrap();
64            let stream_init = self.stream_init.clone();
65            let join: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
66                loop {
67                    let (socket, addr) = listener.accept().await?;
68                    if let Some(ref connect_event) = connect_event {
69                        if !connect_event(addr) {
70                            warn!("addr:{} not connect", addr);
71                            continue;
72                        }
73                    }
74                    trace!("start read:{}", addr);
75                    let input = input_event.clone();
76                    let peer_token = token.clone();
77                    let stream_init = stream_init.clone();
78                    tokio::spawn(async move {
79                        match (*stream_init)(socket).await {
80                            Ok(socket) => {
81                                let (reader, sender) = tokio::io::split(socket);
82                                let peer = TCPPeer::new(addr, sender);
83                                if let Err(err) = input(reader, peer.clone(), peer_token).await {
84                                    error!("input data error:{}", err);
85                                }
86                                if let Err(er) = peer.disconnect().await {
87                                    debug!("disconnect client:{:?} err:{}", peer.addr(), er);
88                                } else {
89                                    debug!("{} disconnect", peer.addr())
90                                }
91                            }
92                            Err(err) => {
93                                warn!("init stream err:{}", err);
94                            }
95                        }
96                    });
97                }
98            });
99
100            Ok(join)
101        } else {
102            Err(crate::error::Error::NotListenerError)
103        }
104    }
105}
106
107#[async_trait::async_trait]
108pub trait ITCPServer<T> {
109    async fn start(&self, token: T) -> anyhow::Result<JoinHandle<anyhow::Result<()>>>;
110    async fn start_block(&self, token: T) -> anyhow::Result<()>;
111}
112
113#[async_trait::async_trait]
114impl<I, R, T, B, C, IST> ITCPServer<T> for Actor<TCPServer<I, R, T, B, C, IST>>
115where
116    I: Fn(ReadHalf<C>, Arc<TCPPeer<C>>, T) -> R + Send + Sync + Clone + 'static,
117    R: Future<Output = anyhow::Result<()>> + Send + 'static,
118    T: Clone + Send + Sync + 'static,
119    B: Future<Output = anyhow::Result<C>> + Send + 'static,
120    C: AsyncRead + AsyncWrite + Send + Sync + 'static,
121    IST: Fn(TcpStream) -> B + Send + Sync + 'static,
122{
123    async fn start(&self, token: T) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
124        self.inner_call(|inner| async move { Ok(inner.get_mut().start(token).await?) })
125            .await
126    }
127
128    async fn start_block(&self, token: T) -> anyhow::Result<()> {
129        Self::start(self, token).await?.await??;
130        Ok(())
131    }
132}