cyfs_lib/base/
tcp_listener.rs1use cyfs_base::{BuckyError, BuckyResult};
2use cyfs_debug::Mutex;
3
4use async_std::net::{TcpListener, TcpStream};
5use async_std::stream::StreamExt;
6use async_std::task;
7use futures::future::{AbortHandle, Aborted};
8use std::net::SocketAddr;
9use std::sync::Arc;
10use once_cell::sync::OnceCell;
11
12
13#[async_trait::async_trait]
14pub trait BaseTcpListenerHandler: Send + Sync {
15 async fn on_accept(&self, stream: TcpStream) -> BuckyResult<()>;
16}
17
18pub type BaseTcpListenerHandlerRef = Arc<Box<dyn BaseTcpListenerHandler>>;
19
20struct BaseTcpListenerInner {
21 listen: SocketAddr,
22
23 handler: OnceCell<BaseTcpListenerHandlerRef>,
24
25 canceler: Option<AbortHandle>,
27
28 running_task: Option<async_std::task::JoinHandle<()>>,
29}
30
31impl BaseTcpListenerInner {
32 pub fn new(listen: SocketAddr) -> Self {
33 Self {
34 listen,
35 handler: OnceCell::new(),
36 canceler: None,
37 running_task: None,
38 }
39 }
40
41 fn bind_handler(&self, handler: BaseTcpListenerHandlerRef) {
42 if let Err(_) = self.handler.set(handler) {
43 unreachable!();
44 }
45 }
46
47 pub fn get_addr(&self) -> SocketAddr {
48 self.listen.clone()
49 }
50
51 pub fn get_listen(&self) -> String {
52 self.listen.to_string()
53 }
54}
55
56#[derive(Clone)]
57pub struct BaseTcpListener(Arc<Mutex<BaseTcpListenerInner>>);
58
59impl BaseTcpListener {
60 pub fn new(addr: SocketAddr) -> Self {
61 let inner = BaseTcpListenerInner::new(addr);
62
63 Self(Arc::new(Mutex::new(inner)))
64 }
65
66
67 pub fn bind_handler(&self, handler: BaseTcpListenerHandlerRef) {
68 self.0.lock().unwrap().bind_handler(handler)
69 }
70
71 pub fn get_addr(&self) -> SocketAddr {
72 self.0.lock().unwrap().get_addr()
73 }
74 pub fn get_listen(&self) -> String {
75 self.0.lock().unwrap().get_listen()
76 }
77
78 pub async fn start(&self) -> BuckyResult<()> {
79 let tcp_listener = self.create_listener().await?;
80
81 let this = self.clone();
82 let (release_task, handle) = futures::future::abortable(async move {
83 let _ = this.run_inner(tcp_listener).await;
84 });
85
86 let this = self.clone();
87 let task = async_std::task::spawn(async move {
88 match release_task.await {
89 Ok(_) => {
90 info!("tcp listener complete: {}", this.get_listen());
91 }
92 Err(Aborted) => {
93 info!("tcp listener cancelled: {}", this.get_listen());
94 }
95 }
96 });
97
98 {
99 let mut listener = self.0.lock().unwrap();
100 assert!(listener.canceler.is_none());
101 assert!(listener.running_task.is_none());
102 listener.canceler = Some(handle);
103 listener.running_task = Some(task);
104 }
105
106 Ok(())
107 }
108
109 pub async fn stop(&self) {
110 let canceler;
111 let running_task;
112
113 info!("will stop tcp listener: {}", self.get_listen());
114
115 {
116 let mut listener = self.0.lock().unwrap();
117 canceler = listener.canceler.take();
118 running_task = listener.running_task.take();
119 }
120
121 if let Some(canceler) = canceler {
122 canceler.abort();
123 let running_task = running_task.unwrap();
124 running_task.await;
125 info!("tcp listener stoped complete: {}", self.get_listen());
126 } else {
127 warn!("tcp listener not running: {}", self.get_listen());
128 }
129 }
130
131 async fn create_listener(&self) -> BuckyResult<TcpListener> {
132 let listen;
133 {
134 let listener = self.0.lock().unwrap();
135 listen = listener.listen.clone();
136 }
137
138 let tcp_listener = TcpListener::bind(listen).await.map_err(|e| {
139 let msg = format!(
140 "object tcp listener bind addr failed! addr={}, err={}",
141 listen, e
142 );
143 error!("{}", msg);
144
145 BuckyError::from(msg)
146 })?;
147
148 #[cfg(unix)]
149 {
150 use async_std::os::unix::io::AsRawFd;
151 if let Err(e) = cyfs_util::set_socket_reuseaddr(tcp_listener.as_raw_fd()) {
152 error!("set_socket_reuseaddr for {:?} error! err={}", listen, e);
153 }
154 }
155
156 let local_addr = tcp_listener.local_addr().map_err(|e| {
157 error!("get tcp listener local addr failed! {}", e);
158 BuckyError::from(e)
159 })?;
160
161 {
163 let mut listener = self.0.lock().unwrap();
164 info!(
165 "will update tcp listener local addr: {} -> {}",
166 listener.listen, local_addr
167 );
168 listener.listen = local_addr.clone();
169 }
170
171 Ok(tcp_listener)
172 }
173
174 async fn run_inner(&self, tcp_listener: TcpListener) -> BuckyResult<()> {
175 let listen;
176 let handler;
177 {
178 let listener = self.0.lock().unwrap();
179
180 listen = listener.listen.clone();
181 handler = listener.handler.get().unwrap().clone();
182 }
183
184 let addr = format!("http://{}", tcp_listener.local_addr().unwrap());
185 info!("tcp listener listen at {}", addr);
186
187 let mut incoming = tcp_listener.incoming();
188 loop {
189 match incoming.next().await {
190 Some(Ok(tcp_stream)) => {
191 debug!(
192 "tcp listener recv new connection from {:?}",
193 tcp_stream.peer_addr()
194 );
195
196 let handler = handler.clone();
197 task::spawn(async move {
198 if let Err(e) = handler.on_accept(tcp_stream).await {
199 error!(
200 "object tcp process http connection error: listen={} err={}",
201 listen, e
202 );
203 }
204 });
205 }
206 Some(Err(e)) => {
207 let listener = self.0.lock().unwrap();
209 error!(
210 "tcp listener recv connection error! listener={}, err={}",
211 listener.listen, e
212 );
213 }
214 None => {
215 info!("tcp listener recv connection finished! listen={}", listen);
216 break;
217 }
218 }
219 }
220
221 Ok(())
222 }
223}