cyfs_lib/base/
tcp_listener.rs

1use cyfs_base::{BuckyError, BuckyResult};
2use cyfs_debug::Mutex;
3
4use async_std::net::{TcpListener, TcpStream};
5use async_std::stream::StreamExt;
6use async_std::task;
7use futures::future::{AbortHandle, Aborted};
8use std::net::SocketAddr;
9use std::sync::Arc;
10use once_cell::sync::OnceCell;
11
12
13#[async_trait::async_trait]
14pub trait BaseTcpListenerHandler: Send + Sync {
15    async fn on_accept(&self, stream: TcpStream) -> BuckyResult<()>;
16}
17
18pub type BaseTcpListenerHandlerRef = Arc<Box<dyn BaseTcpListenerHandler>>;
19
20struct BaseTcpListenerInner {
21    listen: SocketAddr,
22
23    handler: OnceCell<BaseTcpListenerHandlerRef>,
24
25    // 取消listener的运行
26    canceler: Option<AbortHandle>,
27
28    running_task: Option<async_std::task::JoinHandle<()>>,
29}
30
31impl BaseTcpListenerInner {
32    pub fn new(listen: SocketAddr) -> Self {
33        Self {
34            listen,
35            handler: OnceCell::new(),
36            canceler: None,
37            running_task: None,
38        }
39    }
40
41    fn bind_handler(&self,  handler: BaseTcpListenerHandlerRef) {
42        if let Err(_) = self.handler.set(handler) {
43            unreachable!();
44        }
45    }
46
47    pub fn get_addr(&self) -> SocketAddr {
48        self.listen.clone()
49    }
50
51    pub fn get_listen(&self) -> String {
52        self.listen.to_string()
53    }
54}
55
56#[derive(Clone)]
57pub struct BaseTcpListener(Arc<Mutex<BaseTcpListenerInner>>);
58
59impl BaseTcpListener {
60    pub fn new(addr: SocketAddr) -> Self {
61        let inner = BaseTcpListenerInner::new(addr);
62
63        Self(Arc::new(Mutex::new(inner)))
64    }
65
66    
67    pub fn bind_handler(&self,  handler: BaseTcpListenerHandlerRef) {
68        self.0.lock().unwrap().bind_handler(handler)
69    }
70
71    pub fn get_addr(&self) -> SocketAddr {
72        self.0.lock().unwrap().get_addr()
73    }
74    pub fn get_listen(&self) -> String {
75        self.0.lock().unwrap().get_listen()
76    }
77
78    pub async fn start(&self) -> BuckyResult<()> {
79        let tcp_listener = self.create_listener().await?;
80
81        let this = self.clone();
82        let (release_task, handle) = futures::future::abortable(async move {
83            let _ = this.run_inner(tcp_listener).await;
84        });
85
86        let this = self.clone();
87        let task = async_std::task::spawn(async move {
88            match release_task.await {
89                Ok(_) => {
90                    info!("tcp listener complete: {}", this.get_listen());
91                }
92                Err(Aborted) => {
93                    info!("tcp listener cancelled: {}", this.get_listen());
94                }
95            }
96        });
97
98        {
99            let mut listener = self.0.lock().unwrap();
100            assert!(listener.canceler.is_none());
101            assert!(listener.running_task.is_none());
102            listener.canceler = Some(handle);
103            listener.running_task = Some(task);
104        }
105
106        Ok(())
107    }
108
109    pub async fn stop(&self) {
110        let canceler;
111        let running_task;
112
113        info!("will stop tcp listener: {}", self.get_listen());
114
115        {
116            let mut listener = self.0.lock().unwrap();
117            canceler = listener.canceler.take();
118            running_task = listener.running_task.take();
119        }
120
121        if let Some(canceler) = canceler {
122            canceler.abort();
123            let running_task = running_task.unwrap();
124            running_task.await;
125            info!("tcp listener stoped complete: {}", self.get_listen());
126        } else {
127            warn!("tcp listener not running: {}", self.get_listen());
128        }
129    }
130
131    async fn create_listener(&self) -> BuckyResult<TcpListener> {
132        let listen;
133        {
134            let listener = self.0.lock().unwrap();
135            listen = listener.listen.clone();
136        }
137
138        let tcp_listener = TcpListener::bind(listen).await.map_err(|e| {
139            let msg = format!(
140                "object tcp listener bind addr failed! addr={}, err={}",
141                listen, e
142            );
143            error!("{}", msg);
144
145            BuckyError::from(msg)
146        })?;
147
148        #[cfg(unix)]
149        {
150            use async_std::os::unix::io::AsRawFd;
151            if let Err(e) = cyfs_util::set_socket_reuseaddr(tcp_listener.as_raw_fd()) {
152                error!("set_socket_reuseaddr for {:?} error! err={}", listen, e);
153            }
154        }
155
156        let local_addr = tcp_listener.local_addr().map_err(|e| {
157            error!("get tcp listener local addr failed! {}", e);
158            BuckyError::from(e)
159        })?;
160
161        // 更新本地的local addr
162        {
163            let mut listener = self.0.lock().unwrap();
164            info!(
165                "will update tcp listener local addr: {} -> {}",
166                listener.listen, local_addr
167            );
168            listener.listen = local_addr.clone();
169        }
170
171        Ok(tcp_listener)
172    }
173
174    async fn run_inner(&self, tcp_listener: TcpListener) -> BuckyResult<()> {
175        let listen;
176        let handler;
177        {
178            let listener = self.0.lock().unwrap();
179
180            listen = listener.listen.clone();
181            handler = listener.handler.get().unwrap().clone();
182        }
183
184        let addr = format!("http://{}", tcp_listener.local_addr().unwrap());
185        info!("tcp listener listen at {}", addr);
186
187        let mut incoming = tcp_listener.incoming();
188        loop {
189            match incoming.next().await {
190                Some(Ok(tcp_stream)) => {
191                    debug!(
192                        "tcp listener recv new connection from {:?}",
193                        tcp_stream.peer_addr()
194                    );
195
196                    let handler = handler.clone();
197                    task::spawn(async move {
198                        if let Err(e) = handler.on_accept(tcp_stream).await {
199                            error!(
200                                "object tcp process http connection error: listen={} err={}",
201                                listen, e
202                            );
203                        }
204                    });
205                }
206                Some(Err(e)) => {
207                    // FIXME 返回错误后如何处理?是否要停止
208                    let listener = self.0.lock().unwrap();
209                    error!(
210                        "tcp listener recv connection error! listener={}, err={}",
211                        listener.listen, e
212                    );
213                }
214                None => {
215                    info!("tcp listener recv connection finished! listen={}", listen);
216                    break;
217                }
218            }
219        }
220
221        Ok(())
222    }
223}