1use std::time::{Duration, Instant};
2use std::{cell::Cell, fmt, io, sync::Arc, sync::mpsc, thread};
3use std::{collections::VecDeque, num::NonZeroUsize};
4
5use ntex_rt::System;
6use ntex_util::{future::Either, time::Millis, time::sleep};
7use polling::{Event, Events, Poller};
8
9use super::socket::{Connection, Listener, SocketAddr};
10use super::{Server, ServerStatus, Token};
11
12const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
13const ERR_TIMEOUT: Duration = Duration::from_millis(500);
14const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
15
16#[derive(Debug)]
17pub enum AcceptorCommand {
18 Stop(oneshot::Sender<()>),
19 Terminate,
20 Pause,
21 Resume,
22 Timer,
23}
24
25#[derive(Debug)]
26struct ServerSocketInfo {
27 addr: SocketAddr,
28 token: Token,
29 sock: Listener,
30 registered: Cell<bool>,
31 timeout: Cell<Option<Instant>>,
32}
33
34#[derive(Debug, Clone)]
35pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
36
37impl AcceptNotify {
38 fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
39 AcceptNotify(waker, tx)
40 }
41
42 pub fn send(&self, cmd: AcceptorCommand) {
43 let _ = self.1.send(cmd);
44 let _ = self.0.notify();
45 }
46}
47
48pub struct AcceptLoop {
50 testing: bool,
51 notify: AcceptNotify,
52 inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
53 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
54}
55
56impl Default for AcceptLoop {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl AcceptLoop {
63 pub fn new() -> AcceptLoop {
65 let poll = Arc::new(
67 Poller::new()
68 .map_err(|e| panic!("Cannot create Poller {e}"))
69 .unwrap(),
70 );
71
72 let (tx, rx) = mpsc::channel();
73 let notify = AcceptNotify::new(poll.clone(), tx);
74
75 AcceptLoop {
76 notify,
77 inner: Some((rx, poll)),
78 testing: false,
79 status_handler: None,
80 }
81 }
82
83 pub fn notify(&self) -> AcceptNotify {
85 self.notify.clone()
86 }
87
88 pub fn set_status_handler<F>(&mut self, f: F)
89 where
90 F: FnMut(ServerStatus) + Send + 'static,
91 {
92 self.status_handler = Some(Box::new(f));
93 }
94
95 pub fn testing(&mut self) {
96 self.testing = true;
97 }
98
99 pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
101 let (tx, rx_start) = oneshot::channel();
102 let (rx, poll) = self
103 .inner
104 .take()
105 .expect("AcceptLoop cannot be used multiple times");
106
107 Accept::start(
108 tx,
109 rx,
110 poll,
111 socks,
112 srv,
113 self.notify.clone(),
114 self.testing,
115 self.status_handler.take(),
116 );
117
118 let _ = rx_start.recv();
119 }
120}
121
122impl fmt::Debug for AcceptLoop {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 f.debug_struct("AcceptLoop")
125 .field("notify", &self.notify)
126 .field("inner", &self.inner)
127 .field("status_handler", &self.status_handler.is_some())
128 .finish()
129 }
130}
131
132struct Accept {
133 poller: Arc<Poller>,
134 rx: mpsc::Receiver<AcceptorCommand>,
135 tx: Option<oneshot::Sender<()>>,
136 sockets: Vec<ServerSocketInfo>,
137 srv: Server,
138 notify: AcceptNotify,
139 testing: bool,
140 backpressure: bool,
141 backlog: VecDeque<Connection>,
142 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
143}
144
145impl Accept {
146 #[allow(clippy::too_many_arguments)]
147 fn start(
148 tx: oneshot::Sender<()>,
149 rx: mpsc::Receiver<AcceptorCommand>,
150 poller: Arc<Poller>,
151 socks: Vec<(Token, Listener)>,
152 srv: Server,
153 notify: AcceptNotify,
154 testing: bool,
155 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
156 ) {
157 let sys = System::current();
158
159 let _ = thread::Builder::new()
161 .name("accept loop".to_owned())
162 .spawn(move || {
163 System::set_current(sys);
164 Accept::new(tx, rx, poller, socks, srv, notify, testing, status_handler)
165 .poll()
166 });
167 }
168
169 #[allow(clippy::too_many_arguments)]
170 fn new(
171 tx: oneshot::Sender<()>,
172 rx: mpsc::Receiver<AcceptorCommand>,
173 poller: Arc<Poller>,
174 socks: Vec<(Token, Listener)>,
175 srv: Server,
176 notify: AcceptNotify,
177 testing: bool,
178 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
179 ) -> Accept {
180 let mut sockets = Vec::new();
181 for (hnd_token, lst) in socks.into_iter() {
182 sockets.push(ServerSocketInfo {
183 addr: lst.local_addr(),
184 sock: lst,
185 token: hnd_token,
186 registered: Cell::new(false),
187 timeout: Cell::new(None),
188 });
189 }
190
191 Accept {
192 poller,
193 rx,
194 sockets,
195 notify,
196 srv,
197 testing,
198 status_handler,
199 tx: Some(tx),
200 backpressure: true,
201 backlog: VecDeque::new(),
202 }
203 }
204
205 fn update_status(&mut self, st: ServerStatus) {
206 if let Some(ref mut hnd) = self.status_handler {
207 (*hnd)(st)
208 }
209 }
210
211 fn poll(mut self) {
212 log::trace!("Starting server accept loop");
213
214 let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
216
217 let mut timeout = Some(Duration::ZERO);
218 loop {
219 events.clear();
220
221 if let Err(e) = self.poller.wait(&mut events, timeout) {
222 if e.kind() != io::ErrorKind::Interrupted {
223 panic!("Cannot wait for events in poller: {e}")
224 }
225 } else if timeout.is_some() {
226 timeout = None;
227 let _ = self.tx.take().unwrap().send(());
228 }
229
230 for idx in 0..self.sockets.len() {
231 if self.sockets[idx].registered.get() {
232 let readd = self.accept(idx);
233 if readd {
234 self.add_source(idx);
235 }
236 }
237 }
238
239 match self.process_cmd() {
240 Either::Left(_) => events.clear(),
241 Either::Right(rx) => {
242 for info in self.sockets.drain(..) {
244 info.sock.remove_source()
245 }
246 log::info!("Accept loop has been stopped");
247
248 if let Some(rx) = rx {
249 if !self.testing {
250 thread::sleep(EXIT_TIMEOUT);
251 }
252 let _ = rx.send(());
253 }
254
255 break;
256 }
257 }
258 }
259 }
260
261 fn add_source(&self, idx: usize) {
262 let info = &self.sockets[idx];
263
264 loop {
265 let result = if info.registered.get() {
267 self.poller.modify(&info.sock, Event::readable(idx))
268 } else {
269 unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
270 };
271 if let Err(err) = result {
272 if err.kind() == io::ErrorKind::WouldBlock {
273 continue;
274 }
275 log::error!("Cannot register socket listener: {err}");
276
277 info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
279
280 let notify = self.notify.clone();
281 System::current().arbiter().spawn(Box::pin(async move {
282 sleep(ERR_SLEEP_TIMEOUT).await;
283 notify.send(AcceptorCommand::Timer);
284 }));
285 } else {
286 info.registered.set(true);
287 }
288
289 break;
290 }
291 }
292
293 fn remove_source(&self, key: usize) {
294 let info = &self.sockets[key];
295
296 let result = if info.registered.get() {
297 self.poller.modify(&info.sock, Event::none(key))
298 } else {
299 return;
300 };
301
302 if let Err(err) = result {
304 log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
305 }
306 }
307
308 fn process_timer(&mut self) {
309 let now = Instant::now();
310 for key in 0..self.sockets.len() {
311 let info = &mut self.sockets[key];
312 if let Some(inst) = info.timeout.get() {
313 if now > inst && !self.backpressure {
314 log::info!("Resuming socket listener on {} after timeout", info.addr);
315 info.timeout.take();
316 self.add_source(key);
317 }
318 }
319 }
320 }
321
322 fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
323 loop {
324 match self.rx.try_recv() {
325 Ok(cmd) => match cmd {
326 AcceptorCommand::Stop(rx) => {
327 if !self.backpressure {
328 log::info!("Stopping accept loop");
329 self.backpressure(true);
330 }
331 break Either::Right(Some(rx));
332 }
333 AcceptorCommand::Terminate => {
334 log::info!("Stopping accept loop");
335 self.backpressure(true);
336 break Either::Right(None);
337 }
338 AcceptorCommand::Pause => {
339 if !self.backpressure {
340 log::info!("Pausing accept loop");
341 self.backpressure(true);
342 }
343 }
344 AcceptorCommand::Resume => {
345 if self.backpressure {
346 log::info!("Resuming accept loop");
347 self.backpressure(false);
348 }
349 }
350 AcceptorCommand::Timer => {
351 self.process_timer();
352 }
353 },
354 Err(err) => {
355 break match err {
356 mpsc::TryRecvError::Empty => Either::Left(()),
357 mpsc::TryRecvError::Disconnected => {
358 log::error!("Dropping accept loop");
359 self.backpressure(true);
360 Either::Right(None)
361 }
362 };
363 }
364 }
365 }
366 }
367
368 fn backpressure(&mut self, on: bool) {
369 self.update_status(if on {
370 ServerStatus::NotReady
371 } else {
372 ServerStatus::Ready
373 });
374
375 if self.backpressure && !on {
376 while let Some(msg) = self.backlog.pop_front() {
378 if let Err(msg) = self.srv.process(msg) {
379 log::trace!("Server is unavailable");
380 self.backlog.push_front(msg);
381 return;
382 }
383 }
384
385 self.backpressure = false;
387 for (key, info) in self.sockets.iter().enumerate() {
388 if info.timeout.get().is_none() {
389 log::info!(
391 "Resuming socket listener on {} after back-pressure",
392 info.addr
393 );
394 self.add_source(key);
395 }
396 }
397 } else if !self.backpressure && on {
398 self.backpressure = true;
399 for key in 0..self.sockets.len() {
400 let info = &mut self.sockets[key];
402 if info.timeout.take().is_none() {
403 log::info!("Stopping socket listener on {}", info.addr);
404 self.remove_source(key);
405 }
406 }
407 }
408 }
409
410 fn accept(&mut self, token: usize) -> bool {
411 loop {
412 if let Some(info) = self.sockets.get_mut(token) {
413 match info.sock.accept() {
414 Ok(Some(io)) => {
415 let msg = Connection {
416 io,
417 token: info.token,
418 };
419 if let Err(msg) = self.srv.process(msg) {
420 log::trace!("Server is unavailable");
421 self.backlog.push_back(msg);
422 self.backpressure(true);
423 return false;
424 }
425 }
426 Ok(None) => return true,
427 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
428 Err(ref e) if connection_error(e) => continue,
429 Err(e) => {
430 log::error!("Error accepting socket: {e}");
431
432 info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
434
435 let notify = self.notify.clone();
436 System::current().arbiter().spawn(Box::pin(async move {
437 sleep(ERR_SLEEP_TIMEOUT).await;
438 notify.send(AcceptorCommand::Timer);
439 }));
440 return false;
441 }
442 }
443 }
444 }
445 }
446}
447
448fn connection_error(e: &io::Error) -> bool {
456 e.kind() == io::ErrorKind::ConnectionRefused
457 || e.kind() == io::ErrorKind::ConnectionAborted
458 || e.kind() == io::ErrorKind::ConnectionReset
459}