ntex_server/net/
accept.rs

1use std::time::{Duration, Instant};
2use std::{cell::Cell, fmt, io, sync::mpsc, sync::Arc, thread};
3use std::{collections::VecDeque, num::NonZeroUsize};
4
5use ntex_rt::System;
6use ntex_util::{future::Either, time::sleep, time::Millis};
7use polling::{Event, Events, Poller};
8
9use super::socket::{Connection, Listener, SocketAddr};
10use super::{Server, ServerStatus, Token};
11
12const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
13const ERR_TIMEOUT: Duration = Duration::from_millis(500);
14const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
15
16#[derive(Debug)]
17pub enum AcceptorCommand {
18    Stop(oneshot::Sender<()>),
19    Terminate,
20    Pause,
21    Resume,
22    Timer,
23}
24
25#[derive(Debug)]
26struct ServerSocketInfo {
27    addr: SocketAddr,
28    token: Token,
29    sock: Listener,
30    registered: Cell<bool>,
31    timeout: Cell<Option<Instant>>,
32}
33
34#[derive(Debug, Clone)]
35pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
36
37impl AcceptNotify {
38    fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
39        AcceptNotify(waker, tx)
40    }
41
42    pub fn send(&self, cmd: AcceptorCommand) {
43        let _ = self.1.send(cmd);
44        let _ = self.0.notify();
45    }
46}
47
48/// Streamin io accept loop
49pub struct AcceptLoop {
50    notify: AcceptNotify,
51    inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
52    status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
53}
54
55impl Default for AcceptLoop {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl AcceptLoop {
62    /// Create accept loop
63    pub fn new() -> AcceptLoop {
64        // Create a poller instance
65        let poll = Arc::new(
66            Poller::new()
67                .map_err(|e| panic!("Cannot create Polller {}", e))
68                .unwrap(),
69        );
70
71        let (tx, rx) = mpsc::channel();
72        let notify = AcceptNotify::new(poll.clone(), tx);
73
74        AcceptLoop {
75            notify,
76            inner: Some((rx, poll)),
77            status_handler: None,
78        }
79    }
80
81    /// Get notification api for the loop
82    pub fn notify(&self) -> AcceptNotify {
83        self.notify.clone()
84    }
85
86    pub fn set_status_handler<F>(&mut self, f: F)
87    where
88        F: FnMut(ServerStatus) + Send + 'static,
89    {
90        self.status_handler = Some(Box::new(f));
91    }
92
93    /// Start accept loop
94    pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
95        let (tx, rx_start) = oneshot::channel();
96        let (rx, poll) = self
97            .inner
98            .take()
99            .expect("AcceptLoop cannot be used multiple times");
100
101        Accept::start(
102            tx,
103            rx,
104            poll,
105            socks,
106            srv,
107            self.notify.clone(),
108            self.status_handler.take(),
109        );
110
111        let _ = rx_start.recv();
112    }
113}
114
115impl fmt::Debug for AcceptLoop {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("AcceptLoop")
118            .field("notify", &self.notify)
119            .field("inner", &self.inner)
120            .field("status_handler", &self.status_handler.is_some())
121            .finish()
122    }
123}
124
125struct Accept {
126    poller: Arc<Poller>,
127    rx: mpsc::Receiver<AcceptorCommand>,
128    tx: Option<oneshot::Sender<()>>,
129    sockets: Vec<ServerSocketInfo>,
130    srv: Server,
131    notify: AcceptNotify,
132    backpressure: bool,
133    backlog: VecDeque<Connection>,
134    status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
135}
136
137impl Accept {
138    fn start(
139        tx: oneshot::Sender<()>,
140        rx: mpsc::Receiver<AcceptorCommand>,
141        poller: Arc<Poller>,
142        socks: Vec<(Token, Listener)>,
143        srv: Server,
144        notify: AcceptNotify,
145        status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
146    ) {
147        let sys = System::current();
148
149        // start accept thread
150        let _ = thread::Builder::new()
151            .name("ntex-server accept loop".to_owned())
152            .spawn(move || {
153                System::set_current(sys);
154                Accept::new(tx, rx, poller, socks, srv, notify, status_handler).poll()
155            });
156    }
157
158    fn new(
159        tx: oneshot::Sender<()>,
160        rx: mpsc::Receiver<AcceptorCommand>,
161        poller: Arc<Poller>,
162        socks: Vec<(Token, Listener)>,
163        srv: Server,
164        notify: AcceptNotify,
165        status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
166    ) -> Accept {
167        let mut sockets = Vec::new();
168        for (hnd_token, lst) in socks.into_iter() {
169            sockets.push(ServerSocketInfo {
170                addr: lst.local_addr(),
171                sock: lst,
172                token: hnd_token,
173                registered: Cell::new(false),
174                timeout: Cell::new(None),
175            });
176        }
177
178        Accept {
179            poller,
180            rx,
181            sockets,
182            notify,
183            srv,
184            status_handler,
185            tx: Some(tx),
186            backpressure: true,
187            backlog: VecDeque::new(),
188        }
189    }
190
191    fn update_status(&mut self, st: ServerStatus) {
192        if let Some(ref mut hnd) = self.status_handler {
193            (*hnd)(st)
194        }
195    }
196
197    fn poll(&mut self) {
198        log::trace!("Starting server accept loop");
199
200        // Create storage for events
201        let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
202
203        let mut timeout = Some(Duration::ZERO);
204        loop {
205            if let Err(e) = self.poller.wait(&mut events, timeout) {
206                if e.kind() != io::ErrorKind::Interrupted {
207                    panic!("Cannot wait for events in poller: {}", e)
208                }
209            } else if timeout.is_some() {
210                timeout = None;
211                let _ = self.tx.take().unwrap().send(());
212            }
213
214            for idx in 0..self.sockets.len() {
215                if self.sockets[idx].registered.get() {
216                    let readd = self.accept(idx);
217                    if readd {
218                        self.add_source(idx);
219                    }
220                }
221            }
222
223            match self.process_cmd() {
224                Either::Left(_) => events.clear(),
225                Either::Right(rx) => {
226                    // cleanup
227                    for info in self.sockets.drain(..) {
228                        info.sock.remove_source()
229                    }
230                    log::info!("Accept loop has been stopped");
231
232                    if let Some(rx) = rx {
233                        thread::sleep(EXIT_TIMEOUT);
234                        let _ = rx.send(());
235                    }
236
237                    break;
238                }
239            }
240        }
241    }
242
243    fn add_source(&self, idx: usize) {
244        let info = &self.sockets[idx];
245
246        loop {
247            // try to register poller source
248            let result = if info.registered.get() {
249                self.poller.modify(&info.sock, Event::readable(idx))
250            } else {
251                unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
252            };
253            if let Err(err) = result {
254                if err.kind() == io::ErrorKind::WouldBlock {
255                    continue;
256                }
257                log::error!("Cannot register socket listener: {}", err);
258
259                // sleep after error
260                info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
261
262                let notify = self.notify.clone();
263                System::current().arbiter().spawn(Box::pin(async move {
264                    sleep(ERR_SLEEP_TIMEOUT).await;
265                    notify.send(AcceptorCommand::Timer);
266                }));
267            } else {
268                info.registered.set(true);
269            }
270
271            break;
272        }
273    }
274
275    fn remove_source(&self, key: usize) {
276        let info = &self.sockets[key];
277
278        let result = if info.registered.get() {
279            self.poller.modify(&info.sock, Event::none(key))
280        } else {
281            return;
282        };
283
284        // stop listening for incoming connections
285        if let Err(err) = result {
286            log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
287        }
288    }
289
290    fn process_timer(&mut self) {
291        let now = Instant::now();
292        for key in 0..self.sockets.len() {
293            let info = &mut self.sockets[key];
294            if let Some(inst) = info.timeout.get() {
295                if now > inst && !self.backpressure {
296                    log::info!("Resuming socket listener on {} after timeout", info.addr);
297                    info.timeout.take();
298                    self.add_source(key);
299                }
300            }
301        }
302    }
303
304    fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
305        loop {
306            match self.rx.try_recv() {
307                Ok(cmd) => match cmd {
308                    AcceptorCommand::Stop(rx) => {
309                        if !self.backpressure {
310                            log::info!("Stopping accept loop");
311                            self.backpressure(true);
312                        }
313                        break Either::Right(Some(rx));
314                    }
315                    AcceptorCommand::Terminate => {
316                        log::info!("Stopping accept loop");
317                        self.backpressure(true);
318                        break Either::Right(None);
319                    }
320                    AcceptorCommand::Pause => {
321                        if !self.backpressure {
322                            log::info!("Pausing accept loop");
323                            self.backpressure(true);
324                        }
325                    }
326                    AcceptorCommand::Resume => {
327                        if self.backpressure {
328                            log::info!("Resuming accept loop");
329                            self.backpressure(false);
330                        }
331                    }
332                    AcceptorCommand::Timer => {
333                        self.process_timer();
334                    }
335                },
336                Err(err) => {
337                    break match err {
338                        mpsc::TryRecvError::Empty => Either::Left(()),
339                        mpsc::TryRecvError::Disconnected => {
340                            log::error!("Dropping accept loop");
341                            self.backpressure(true);
342                            Either::Right(None)
343                        }
344                    };
345                }
346            }
347        }
348    }
349
350    fn backpressure(&mut self, on: bool) {
351        self.update_status(if on {
352            ServerStatus::NotReady
353        } else {
354            ServerStatus::Ready
355        });
356
357        if self.backpressure && !on {
358            // handle backlog
359            while let Some(msg) = self.backlog.pop_front() {
360                if let Err(msg) = self.srv.process(msg) {
361                    log::trace!("Server is unavailable");
362                    self.backlog.push_front(msg);
363                    return;
364                }
365            }
366
367            // re-enable acceptors
368            self.backpressure = false;
369            for (key, info) in self.sockets.iter().enumerate() {
370                if info.timeout.get().is_none() {
371                    // socket with timeout will re-register itself after timeout
372                    log::info!(
373                        "Resuming socket listener on {} after back-pressure",
374                        info.addr
375                    );
376                    self.add_source(key);
377                }
378            }
379        } else if !self.backpressure && on {
380            self.backpressure = true;
381            for key in 0..self.sockets.len() {
382                // disable err timeout
383                let info = &mut self.sockets[key];
384                if info.timeout.take().is_none() {
385                    log::info!("Stopping socket listener on {}", info.addr);
386                    self.remove_source(key);
387                }
388            }
389        }
390    }
391
392    fn accept(&mut self, token: usize) -> bool {
393        loop {
394            if let Some(info) = self.sockets.get_mut(token) {
395                match info.sock.accept() {
396                    Ok(Some(io)) => {
397                        let msg = Connection {
398                            io,
399                            token: info.token,
400                        };
401                        if let Err(msg) = self.srv.process(msg) {
402                            log::trace!("Server is unavailable");
403                            self.backlog.push_back(msg);
404                            self.backpressure(true);
405                            return false;
406                        }
407                    }
408                    Ok(None) => return true,
409                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
410                    Err(ref e) if connection_error(e) => continue,
411                    Err(e) => {
412                        log::error!("Error accepting socket: {}", e);
413
414                        // sleep after error
415                        info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
416
417                        let notify = self.notify.clone();
418                        System::current().arbiter().spawn(Box::pin(async move {
419                            sleep(ERR_SLEEP_TIMEOUT).await;
420                            notify.send(AcceptorCommand::Timer);
421                        }));
422                        return false;
423                    }
424                }
425            }
426        }
427    }
428}
429
430/// This function defines errors that are per-connection. Which basically
431/// means that if we get this error from `accept()` system call it means
432/// next connection might be ready to be accepted.
433///
434/// All other errors will incur a timeout before next `accept()` is performed.
435/// The timeout is useful to handle resource exhaustion errors like ENFILE
436/// and EMFILE. Otherwise, could enter into tight loop.
437fn connection_error(e: &io::Error) -> bool {
438    e.kind() == io::ErrorKind::ConnectionRefused
439        || e.kind() == io::ErrorKind::ConnectionAborted
440        || e.kind() == io::ErrorKind::ConnectionReset
441}