1use std::time::{Duration, Instant};
2use std::{cell::Cell, fmt, io, sync::mpsc, sync::Arc, thread};
3use std::{collections::VecDeque, num::NonZeroUsize};
4
5use ntex_rt::System;
6use ntex_util::{future::Either, time::sleep, time::Millis};
7use polling::{Event, Events, Poller};
8
9use super::socket::{Connection, Listener, SocketAddr};
10use super::{Server, ServerStatus, Token};
11
12const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
13const ERR_TIMEOUT: Duration = Duration::from_millis(500);
14const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
15
16#[derive(Debug)]
17pub enum AcceptorCommand {
18 Stop(oneshot::Sender<()>),
19 Terminate,
20 Pause,
21 Resume,
22 Timer,
23}
24
25#[derive(Debug)]
26struct ServerSocketInfo {
27 addr: SocketAddr,
28 token: Token,
29 sock: Listener,
30 registered: Cell<bool>,
31 timeout: Cell<Option<Instant>>,
32}
33
34#[derive(Debug, Clone)]
35pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
36
37impl AcceptNotify {
38 fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
39 AcceptNotify(waker, tx)
40 }
41
42 pub fn send(&self, cmd: AcceptorCommand) {
43 let _ = self.1.send(cmd);
44 let _ = self.0.notify();
45 }
46}
47
48pub struct AcceptLoop {
50 notify: AcceptNotify,
51 inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
52 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
53}
54
55impl Default for AcceptLoop {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl AcceptLoop {
62 pub fn new() -> AcceptLoop {
64 let poll = Arc::new(
66 Poller::new()
67 .map_err(|e| panic!("Cannot create Polller {}", e))
68 .unwrap(),
69 );
70
71 let (tx, rx) = mpsc::channel();
72 let notify = AcceptNotify::new(poll.clone(), tx);
73
74 AcceptLoop {
75 notify,
76 inner: Some((rx, poll)),
77 status_handler: None,
78 }
79 }
80
81 pub fn notify(&self) -> AcceptNotify {
83 self.notify.clone()
84 }
85
86 pub fn set_status_handler<F>(&mut self, f: F)
87 where
88 F: FnMut(ServerStatus) + Send + 'static,
89 {
90 self.status_handler = Some(Box::new(f));
91 }
92
93 pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
95 let (tx, rx_start) = oneshot::channel();
96 let (rx, poll) = self
97 .inner
98 .take()
99 .expect("AcceptLoop cannot be used multiple times");
100
101 Accept::start(
102 tx,
103 rx,
104 poll,
105 socks,
106 srv,
107 self.notify.clone(),
108 self.status_handler.take(),
109 );
110
111 let _ = rx_start.recv();
112 }
113}
114
115impl fmt::Debug for AcceptLoop {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("AcceptLoop")
118 .field("notify", &self.notify)
119 .field("inner", &self.inner)
120 .field("status_handler", &self.status_handler.is_some())
121 .finish()
122 }
123}
124
125struct Accept {
126 poller: Arc<Poller>,
127 rx: mpsc::Receiver<AcceptorCommand>,
128 tx: Option<oneshot::Sender<()>>,
129 sockets: Vec<ServerSocketInfo>,
130 srv: Server,
131 notify: AcceptNotify,
132 backpressure: bool,
133 backlog: VecDeque<Connection>,
134 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
135}
136
137impl Accept {
138 fn start(
139 tx: oneshot::Sender<()>,
140 rx: mpsc::Receiver<AcceptorCommand>,
141 poller: Arc<Poller>,
142 socks: Vec<(Token, Listener)>,
143 srv: Server,
144 notify: AcceptNotify,
145 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
146 ) {
147 let sys = System::current();
148
149 let _ = thread::Builder::new()
151 .name("accept loop".to_owned())
152 .spawn(move || {
153 System::set_current(sys);
154 Accept::new(tx, rx, poller, socks, srv, notify, status_handler).poll()
155 });
156 }
157
158 fn new(
159 tx: oneshot::Sender<()>,
160 rx: mpsc::Receiver<AcceptorCommand>,
161 poller: Arc<Poller>,
162 socks: Vec<(Token, Listener)>,
163 srv: Server,
164 notify: AcceptNotify,
165 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
166 ) -> Accept {
167 let mut sockets = Vec::new();
168 for (hnd_token, lst) in socks.into_iter() {
169 sockets.push(ServerSocketInfo {
170 addr: lst.local_addr(),
171 sock: lst,
172 token: hnd_token,
173 registered: Cell::new(false),
174 timeout: Cell::new(None),
175 });
176 }
177
178 Accept {
179 poller,
180 rx,
181 sockets,
182 notify,
183 srv,
184 status_handler,
185 tx: Some(tx),
186 backpressure: true,
187 backlog: VecDeque::new(),
188 }
189 }
190
191 fn update_status(&mut self, st: ServerStatus) {
192 if let Some(ref mut hnd) = self.status_handler {
193 (*hnd)(st)
194 }
195 }
196
197 fn poll(&mut self) {
198 log::trace!("Starting server accept loop");
199
200 let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
202
203 let mut timeout = Some(Duration::ZERO);
204 loop {
205 events.clear();
206
207 if let Err(e) = self.poller.wait(&mut events, timeout) {
208 if e.kind() != io::ErrorKind::Interrupted {
209 panic!("Cannot wait for events in poller: {}", e)
210 }
211 } else if timeout.is_some() {
212 timeout = None;
213 let _ = self.tx.take().unwrap().send(());
214 }
215
216 for idx in 0..self.sockets.len() {
217 if self.sockets[idx].registered.get() {
218 let readd = self.accept(idx);
219 if readd {
220 self.add_source(idx);
221 }
222 }
223 }
224
225 match self.process_cmd() {
226 Either::Left(_) => events.clear(),
227 Either::Right(rx) => {
228 for info in self.sockets.drain(..) {
230 info.sock.remove_source()
231 }
232 log::info!("Accept loop has been stopped");
233
234 if let Some(rx) = rx {
235 thread::sleep(EXIT_TIMEOUT);
236 let _ = rx.send(());
237 }
238
239 break;
240 }
241 }
242 }
243 }
244
245 fn add_source(&self, idx: usize) {
246 let info = &self.sockets[idx];
247
248 loop {
249 let result = if info.registered.get() {
251 self.poller.modify(&info.sock, Event::readable(idx))
252 } else {
253 unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
254 };
255 if let Err(err) = result {
256 if err.kind() == io::ErrorKind::WouldBlock {
257 continue;
258 }
259 log::error!("Cannot register socket listener: {}", err);
260
261 info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
263
264 let notify = self.notify.clone();
265 System::current().arbiter().spawn(Box::pin(async move {
266 sleep(ERR_SLEEP_TIMEOUT).await;
267 notify.send(AcceptorCommand::Timer);
268 }));
269 } else {
270 info.registered.set(true);
271 }
272
273 break;
274 }
275 }
276
277 fn remove_source(&self, key: usize) {
278 let info = &self.sockets[key];
279
280 let result = if info.registered.get() {
281 self.poller.modify(&info.sock, Event::none(key))
282 } else {
283 return;
284 };
285
286 if let Err(err) = result {
288 log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
289 }
290 }
291
292 fn process_timer(&mut self) {
293 let now = Instant::now();
294 for key in 0..self.sockets.len() {
295 let info = &mut self.sockets[key];
296 if let Some(inst) = info.timeout.get() {
297 if now > inst && !self.backpressure {
298 log::info!("Resuming socket listener on {} after timeout", info.addr);
299 info.timeout.take();
300 self.add_source(key);
301 }
302 }
303 }
304 }
305
306 fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
307 loop {
308 match self.rx.try_recv() {
309 Ok(cmd) => match cmd {
310 AcceptorCommand::Stop(rx) => {
311 if !self.backpressure {
312 log::info!("Stopping accept loop");
313 self.backpressure(true);
314 }
315 break Either::Right(Some(rx));
316 }
317 AcceptorCommand::Terminate => {
318 log::info!("Stopping accept loop");
319 self.backpressure(true);
320 break Either::Right(None);
321 }
322 AcceptorCommand::Pause => {
323 if !self.backpressure {
324 log::info!("Pausing accept loop");
325 self.backpressure(true);
326 }
327 }
328 AcceptorCommand::Resume => {
329 if self.backpressure {
330 log::info!("Resuming accept loop");
331 self.backpressure(false);
332 }
333 }
334 AcceptorCommand::Timer => {
335 self.process_timer();
336 }
337 },
338 Err(err) => {
339 break match err {
340 mpsc::TryRecvError::Empty => Either::Left(()),
341 mpsc::TryRecvError::Disconnected => {
342 log::error!("Dropping accept loop");
343 self.backpressure(true);
344 Either::Right(None)
345 }
346 };
347 }
348 }
349 }
350 }
351
352 fn backpressure(&mut self, on: bool) {
353 self.update_status(if on {
354 ServerStatus::NotReady
355 } else {
356 ServerStatus::Ready
357 });
358
359 if self.backpressure && !on {
360 while let Some(msg) = self.backlog.pop_front() {
362 if let Err(msg) = self.srv.process(msg) {
363 log::trace!("Server is unavailable");
364 self.backlog.push_front(msg);
365 return;
366 }
367 }
368
369 self.backpressure = false;
371 for (key, info) in self.sockets.iter().enumerate() {
372 if info.timeout.get().is_none() {
373 log::info!(
375 "Resuming socket listener on {} after back-pressure",
376 info.addr
377 );
378 self.add_source(key);
379 }
380 }
381 } else if !self.backpressure && on {
382 self.backpressure = true;
383 for key in 0..self.sockets.len() {
384 let info = &mut self.sockets[key];
386 if info.timeout.take().is_none() {
387 log::info!("Stopping socket listener on {}", info.addr);
388 self.remove_source(key);
389 }
390 }
391 }
392 }
393
394 fn accept(&mut self, token: usize) -> bool {
395 loop {
396 if let Some(info) = self.sockets.get_mut(token) {
397 match info.sock.accept() {
398 Ok(Some(io)) => {
399 let msg = Connection {
400 io,
401 token: info.token,
402 };
403 if let Err(msg) = self.srv.process(msg) {
404 log::trace!("Server is unavailable");
405 self.backlog.push_back(msg);
406 self.backpressure(true);
407 return false;
408 }
409 }
410 Ok(None) => return true,
411 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
412 Err(ref e) if connection_error(e) => continue,
413 Err(e) => {
414 log::error!("Error accepting socket: {}", e);
415
416 info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
418
419 let notify = self.notify.clone();
420 System::current().arbiter().spawn(Box::pin(async move {
421 sleep(ERR_SLEEP_TIMEOUT).await;
422 notify.send(AcceptorCommand::Timer);
423 }));
424 return false;
425 }
426 }
427 }
428 }
429 }
430}
431
432fn connection_error(e: &io::Error) -> bool {
440 e.kind() == io::ErrorKind::ConnectionRefused
441 || e.kind() == io::ErrorKind::ConnectionAborted
442 || e.kind() == io::ErrorKind::ConnectionReset
443}