ntex_server/net/
accept.rs

1use std::time::{Duration, Instant};
2use std::{cell::Cell, fmt, io, sync::Arc, sync::mpsc, thread};
3use std::{collections::VecDeque, num::NonZeroUsize};
4
5use ntex_rt::System;
6use ntex_util::{future::Either, time::Millis, time::sleep};
7use polling::{Event, Events, Poller};
8
9use super::socket::{Connection, Listener, SocketAddr};
10use super::{Server, ServerStatus, Token};
11
12const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
13const ERR_TIMEOUT: Duration = Duration::from_millis(500);
14const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
15
16#[derive(Debug)]
17pub enum AcceptorCommand {
18    Stop(oneshot::Sender<()>),
19    Terminate,
20    Pause,
21    Resume,
22    Timer,
23}
24
25#[derive(Debug)]
26struct ServerSocketInfo {
27    addr: SocketAddr,
28    token: Token,
29    sock: Listener,
30    registered: Cell<bool>,
31    timeout: Cell<Option<Instant>>,
32}
33
34#[derive(Debug, Clone)]
35pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
36
37impl AcceptNotify {
38    fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
39        AcceptNotify(waker, tx)
40    }
41
42    pub fn send(&self, cmd: AcceptorCommand) {
43        let _ = self.1.send(cmd);
44        let _ = self.0.notify();
45    }
46}
47
48/// Streamin io accept loop
49pub struct AcceptLoop {
50    testing: bool,
51    notify: AcceptNotify,
52    inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
53    status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
54}
55
56impl Default for AcceptLoop {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62impl AcceptLoop {
63    /// Create accept loop
64    pub fn new() -> AcceptLoop {
65        // Create a poller instance
66        let poll = Arc::new(
67            Poller::new()
68                .map_err(|e| panic!("Cannot create Poller {e}"))
69                .unwrap(),
70        );
71
72        let (tx, rx) = mpsc::channel();
73        let notify = AcceptNotify::new(poll.clone(), tx);
74
75        AcceptLoop {
76            notify,
77            inner: Some((rx, poll)),
78            testing: false,
79            status_handler: None,
80        }
81    }
82
83    /// Get notification api for the loop
84    pub fn notify(&self) -> AcceptNotify {
85        self.notify.clone()
86    }
87
88    pub fn set_status_handler<F>(&mut self, f: F)
89    where
90        F: FnMut(ServerStatus) + Send + 'static,
91    {
92        self.status_handler = Some(Box::new(f));
93    }
94
95    pub fn testing(&mut self) {
96        self.testing = true;
97    }
98
99    /// Start accept loop
100    pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
101        let (tx, rx_start) = oneshot::channel();
102        let (rx, poll) = self
103            .inner
104            .take()
105            .expect("AcceptLoop cannot be used multiple times");
106
107        Accept::start(
108            tx,
109            rx,
110            poll,
111            socks,
112            srv,
113            self.notify.clone(),
114            self.testing,
115            self.status_handler.take(),
116        );
117
118        let _ = rx_start.recv();
119    }
120}
121
122impl fmt::Debug for AcceptLoop {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        f.debug_struct("AcceptLoop")
125            .field("notify", &self.notify)
126            .field("inner", &self.inner)
127            .field("status_handler", &self.status_handler.is_some())
128            .finish()
129    }
130}
131
132struct Accept {
133    poller: Arc<Poller>,
134    rx: mpsc::Receiver<AcceptorCommand>,
135    tx: Option<oneshot::Sender<()>>,
136    sockets: Vec<ServerSocketInfo>,
137    srv: Server,
138    notify: AcceptNotify,
139    testing: bool,
140    backpressure: bool,
141    backlog: VecDeque<Connection>,
142    status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
143}
144
145impl Accept {
146    #[allow(clippy::too_many_arguments)]
147    fn start(
148        tx: oneshot::Sender<()>,
149        rx: mpsc::Receiver<AcceptorCommand>,
150        poller: Arc<Poller>,
151        socks: Vec<(Token, Listener)>,
152        srv: Server,
153        notify: AcceptNotify,
154        testing: bool,
155        status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
156    ) {
157        let sys = System::current();
158
159        // start accept thread
160        let _ = thread::Builder::new()
161            .name("accept loop".to_owned())
162            .spawn(move || {
163                System::set_current(sys);
164                Accept::new(tx, rx, poller, socks, srv, notify, testing, status_handler)
165                    .poll()
166            });
167    }
168
169    #[allow(clippy::too_many_arguments)]
170    fn new(
171        tx: oneshot::Sender<()>,
172        rx: mpsc::Receiver<AcceptorCommand>,
173        poller: Arc<Poller>,
174        socks: Vec<(Token, Listener)>,
175        srv: Server,
176        notify: AcceptNotify,
177        testing: bool,
178        status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
179    ) -> Accept {
180        let mut sockets = Vec::new();
181        for (hnd_token, lst) in socks.into_iter() {
182            sockets.push(ServerSocketInfo {
183                addr: lst.local_addr(),
184                sock: lst,
185                token: hnd_token,
186                registered: Cell::new(false),
187                timeout: Cell::new(None),
188            });
189        }
190
191        Accept {
192            poller,
193            rx,
194            sockets,
195            notify,
196            srv,
197            testing,
198            status_handler,
199            tx: Some(tx),
200            backpressure: true,
201            backlog: VecDeque::new(),
202        }
203    }
204
205    fn update_status(&mut self, st: ServerStatus) {
206        if let Some(ref mut hnd) = self.status_handler {
207            (*hnd)(st)
208        }
209    }
210
211    fn poll(mut self) {
212        log::trace!("Starting server accept loop");
213
214        // Create storage for events
215        let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
216
217        let mut timeout = Some(Duration::ZERO);
218        loop {
219            events.clear();
220
221            if let Err(e) = self.poller.wait(&mut events, timeout) {
222                if e.kind() != io::ErrorKind::Interrupted {
223                    panic!("Cannot wait for events in poller: {e}")
224                }
225            } else if timeout.is_some() {
226                timeout = None;
227                let _ = self.tx.take().unwrap().send(());
228            }
229
230            for idx in 0..self.sockets.len() {
231                if self.sockets[idx].registered.get() {
232                    let readd = self.accept(idx);
233                    if readd {
234                        self.add_source(idx);
235                    }
236                }
237            }
238
239            match self.process_cmd() {
240                Either::Left(_) => events.clear(),
241                Either::Right(rx) => {
242                    // cleanup
243                    for info in self.sockets.drain(..) {
244                        info.sock.remove_source()
245                    }
246                    log::info!("Accept loop has been stopped");
247
248                    if let Some(rx) = rx {
249                        if !self.testing {
250                            thread::sleep(EXIT_TIMEOUT);
251                        }
252                        let _ = rx.send(());
253                    }
254
255                    break;
256                }
257            }
258        }
259    }
260
261    fn add_source(&self, idx: usize) {
262        let info = &self.sockets[idx];
263
264        loop {
265            // try to register poller source
266            let result = if info.registered.get() {
267                self.poller.modify(&info.sock, Event::readable(idx))
268            } else {
269                unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
270            };
271            if let Err(err) = result {
272                if err.kind() == io::ErrorKind::WouldBlock {
273                    continue;
274                }
275                log::error!("Cannot register socket listener: {err}");
276
277                // sleep after error
278                info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
279
280                let notify = self.notify.clone();
281                System::current().arbiter().spawn(Box::pin(async move {
282                    sleep(ERR_SLEEP_TIMEOUT).await;
283                    notify.send(AcceptorCommand::Timer);
284                }));
285            } else {
286                info.registered.set(true);
287            }
288
289            break;
290        }
291    }
292
293    fn remove_source(&self, key: usize) {
294        let info = &self.sockets[key];
295
296        let result = if info.registered.get() {
297            self.poller.modify(&info.sock, Event::none(key))
298        } else {
299            return;
300        };
301
302        // stop listening for incoming connections
303        if let Err(err) = result {
304            log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
305        }
306    }
307
308    fn process_timer(&mut self) {
309        let now = Instant::now();
310        for key in 0..self.sockets.len() {
311            let info = &mut self.sockets[key];
312            if let Some(inst) = info.timeout.get() {
313                if now > inst && !self.backpressure {
314                    log::info!("Resuming socket listener on {} after timeout", info.addr);
315                    info.timeout.take();
316                    self.add_source(key);
317                }
318            }
319        }
320    }
321
322    fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
323        loop {
324            match self.rx.try_recv() {
325                Ok(cmd) => match cmd {
326                    AcceptorCommand::Stop(rx) => {
327                        if !self.backpressure {
328                            log::info!("Stopping accept loop");
329                            self.backpressure(true);
330                        }
331                        break Either::Right(Some(rx));
332                    }
333                    AcceptorCommand::Terminate => {
334                        log::info!("Stopping accept loop");
335                        self.backpressure(true);
336                        break Either::Right(None);
337                    }
338                    AcceptorCommand::Pause => {
339                        if !self.backpressure {
340                            log::info!("Pausing accept loop");
341                            self.backpressure(true);
342                        }
343                    }
344                    AcceptorCommand::Resume => {
345                        if self.backpressure {
346                            log::info!("Resuming accept loop");
347                            self.backpressure(false);
348                        }
349                    }
350                    AcceptorCommand::Timer => {
351                        self.process_timer();
352                    }
353                },
354                Err(err) => {
355                    break match err {
356                        mpsc::TryRecvError::Empty => Either::Left(()),
357                        mpsc::TryRecvError::Disconnected => {
358                            log::error!("Dropping accept loop");
359                            self.backpressure(true);
360                            Either::Right(None)
361                        }
362                    };
363                }
364            }
365        }
366    }
367
368    fn backpressure(&mut self, on: bool) {
369        self.update_status(if on {
370            ServerStatus::NotReady
371        } else {
372            ServerStatus::Ready
373        });
374
375        if self.backpressure && !on {
376            // handle backlog
377            while let Some(msg) = self.backlog.pop_front() {
378                if let Err(msg) = self.srv.process(msg) {
379                    log::trace!("Server is unavailable");
380                    self.backlog.push_front(msg);
381                    return;
382                }
383            }
384
385            // re-enable acceptors
386            self.backpressure = false;
387            for (key, info) in self.sockets.iter().enumerate() {
388                if info.timeout.get().is_none() {
389                    // socket with timeout will re-register itself after timeout
390                    log::info!(
391                        "Resuming socket listener on {} after back-pressure",
392                        info.addr
393                    );
394                    self.add_source(key);
395                }
396            }
397        } else if !self.backpressure && on {
398            self.backpressure = true;
399            for key in 0..self.sockets.len() {
400                // disable err timeout
401                let info = &mut self.sockets[key];
402                if info.timeout.take().is_none() {
403                    log::info!("Stopping socket listener on {}", info.addr);
404                    self.remove_source(key);
405                }
406            }
407        }
408    }
409
410    fn accept(&mut self, token: usize) -> bool {
411        loop {
412            if let Some(info) = self.sockets.get_mut(token) {
413                match info.sock.accept() {
414                    Ok(Some(io)) => {
415                        let msg = Connection {
416                            io,
417                            token: info.token,
418                        };
419                        if let Err(msg) = self.srv.process(msg) {
420                            log::trace!("Server is unavailable");
421                            self.backlog.push_back(msg);
422                            self.backpressure(true);
423                            return false;
424                        }
425                    }
426                    Ok(None) => return true,
427                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
428                    Err(ref e) if connection_error(e) => continue,
429                    Err(e) => {
430                        log::error!("Error accepting socket: {e}");
431
432                        // sleep after error
433                        info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
434
435                        let notify = self.notify.clone();
436                        System::current().arbiter().spawn(Box::pin(async move {
437                            sleep(ERR_SLEEP_TIMEOUT).await;
438                            notify.send(AcceptorCommand::Timer);
439                        }));
440                        return false;
441                    }
442                }
443            }
444        }
445    }
446}
447
448/// This function defines errors that are per-connection. Which basically
449/// means that if we get this error from `accept()` system call it means
450/// next connection might be ready to be accepted.
451///
452/// All other errors will incur a timeout before next `accept()` is performed.
453/// The timeout is useful to handle resource exhaustion errors like ENFILE
454/// and EMFILE. Otherwise, could enter into tight loop.
455fn connection_error(e: &io::Error) -> bool {
456    e.kind() == io::ErrorKind::ConnectionRefused
457        || e.kind() == io::ErrorKind::ConnectionAborted
458        || e.kind() == io::ErrorKind::ConnectionReset
459}