ntex_server/net/
accept.rs

1use std::time::{Duration, Instant};
2use std::{cell::Cell, fmt, io, sync::mpsc, sync::Arc, thread};
3use std::{collections::VecDeque, num::NonZeroUsize};
4
5use ntex_rt::System;
6use ntex_util::{future::Either, time::sleep, time::Millis};
7use polling::{Event, Events, Poller};
8
9use super::socket::{Connection, Listener, SocketAddr};
10use super::{Server, ServerStatus, Token};
11
12const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
13const ERR_TIMEOUT: Duration = Duration::from_millis(500);
14const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
15
16#[derive(Debug)]
17pub enum AcceptorCommand {
18    Stop(oneshot::Sender<()>),
19    Terminate,
20    Pause,
21    Resume,
22    Timer,
23}
24
25#[derive(Debug)]
26struct ServerSocketInfo {
27    addr: SocketAddr,
28    token: Token,
29    sock: Listener,
30    registered: Cell<bool>,
31    timeout: Cell<Option<Instant>>,
32}
33
34#[derive(Debug, Clone)]
35pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
36
37impl AcceptNotify {
38    fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
39        AcceptNotify(waker, tx)
40    }
41
42    pub fn send(&self, cmd: AcceptorCommand) {
43        let _ = self.1.send(cmd);
44        let _ = self.0.notify();
45    }
46}
47
48/// Streamin io accept loop
49pub struct AcceptLoop {
50    notify: AcceptNotify,
51    inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
52    status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
53}
54
55impl Default for AcceptLoop {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl AcceptLoop {
62    /// Create accept loop
63    pub fn new() -> AcceptLoop {
64        // Create a poller instance
65        let poll = Arc::new(
66            Poller::new()
67                .map_err(|e| panic!("Cannot create Polller {}", e))
68                .unwrap(),
69        );
70
71        let (tx, rx) = mpsc::channel();
72        let notify = AcceptNotify::new(poll.clone(), tx);
73
74        AcceptLoop {
75            notify,
76            inner: Some((rx, poll)),
77            status_handler: None,
78        }
79    }
80
81    /// Get notification api for the loop
82    pub fn notify(&self) -> AcceptNotify {
83        self.notify.clone()
84    }
85
86    pub fn set_status_handler<F>(&mut self, f: F)
87    where
88        F: FnMut(ServerStatus) + Send + 'static,
89    {
90        self.status_handler = Some(Box::new(f));
91    }
92
93    /// Start accept loop
94    pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
95        let (tx, rx_start) = oneshot::channel();
96        let (rx, poll) = self
97            .inner
98            .take()
99            .expect("AcceptLoop cannot be used multiple times");
100
101        Accept::start(
102            tx,
103            rx,
104            poll,
105            socks,
106            srv,
107            self.notify.clone(),
108            self.status_handler.take(),
109        );
110
111        let _ = rx_start.recv();
112    }
113}
114
115impl fmt::Debug for AcceptLoop {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("AcceptLoop")
118            .field("notify", &self.notify)
119            .field("inner", &self.inner)
120            .field("status_handler", &self.status_handler.is_some())
121            .finish()
122    }
123}
124
125struct Accept {
126    poller: Arc<Poller>,
127    rx: mpsc::Receiver<AcceptorCommand>,
128    tx: Option<oneshot::Sender<()>>,
129    sockets: Vec<ServerSocketInfo>,
130    srv: Server,
131    notify: AcceptNotify,
132    backpressure: bool,
133    backlog: VecDeque<Connection>,
134    status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
135}
136
137impl Accept {
138    fn start(
139        tx: oneshot::Sender<()>,
140        rx: mpsc::Receiver<AcceptorCommand>,
141        poller: Arc<Poller>,
142        socks: Vec<(Token, Listener)>,
143        srv: Server,
144        notify: AcceptNotify,
145        status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
146    ) {
147        let sys = System::current();
148
149        // start accept thread
150        let _ = thread::Builder::new()
151            .name("accept loop".to_owned())
152            .spawn(move || {
153                System::set_current(sys);
154                Accept::new(tx, rx, poller, socks, srv, notify, status_handler).poll()
155            });
156    }
157
158    fn new(
159        tx: oneshot::Sender<()>,
160        rx: mpsc::Receiver<AcceptorCommand>,
161        poller: Arc<Poller>,
162        socks: Vec<(Token, Listener)>,
163        srv: Server,
164        notify: AcceptNotify,
165        status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
166    ) -> Accept {
167        let mut sockets = Vec::new();
168        for (hnd_token, lst) in socks.into_iter() {
169            sockets.push(ServerSocketInfo {
170                addr: lst.local_addr(),
171                sock: lst,
172                token: hnd_token,
173                registered: Cell::new(false),
174                timeout: Cell::new(None),
175            });
176        }
177
178        Accept {
179            poller,
180            rx,
181            sockets,
182            notify,
183            srv,
184            status_handler,
185            tx: Some(tx),
186            backpressure: true,
187            backlog: VecDeque::new(),
188        }
189    }
190
191    fn update_status(&mut self, st: ServerStatus) {
192        if let Some(ref mut hnd) = self.status_handler {
193            (*hnd)(st)
194        }
195    }
196
197    fn poll(&mut self) {
198        log::trace!("Starting server accept loop");
199
200        // Create storage for events
201        let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
202
203        let mut timeout = Some(Duration::ZERO);
204        loop {
205            events.clear();
206
207            if let Err(e) = self.poller.wait(&mut events, timeout) {
208                if e.kind() != io::ErrorKind::Interrupted {
209                    panic!("Cannot wait for events in poller: {}", e)
210                }
211            } else if timeout.is_some() {
212                timeout = None;
213                let _ = self.tx.take().unwrap().send(());
214            }
215
216            for idx in 0..self.sockets.len() {
217                if self.sockets[idx].registered.get() {
218                    let readd = self.accept(idx);
219                    if readd {
220                        self.add_source(idx);
221                    }
222                }
223            }
224
225            match self.process_cmd() {
226                Either::Left(_) => events.clear(),
227                Either::Right(rx) => {
228                    // cleanup
229                    for info in self.sockets.drain(..) {
230                        info.sock.remove_source()
231                    }
232                    log::info!("Accept loop has been stopped");
233
234                    if let Some(rx) = rx {
235                        thread::sleep(EXIT_TIMEOUT);
236                        let _ = rx.send(());
237                    }
238
239                    break;
240                }
241            }
242        }
243    }
244
245    fn add_source(&self, idx: usize) {
246        let info = &self.sockets[idx];
247
248        loop {
249            // try to register poller source
250            let result = if info.registered.get() {
251                self.poller.modify(&info.sock, Event::readable(idx))
252            } else {
253                unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
254            };
255            if let Err(err) = result {
256                if err.kind() == io::ErrorKind::WouldBlock {
257                    continue;
258                }
259                log::error!("Cannot register socket listener: {}", err);
260
261                // sleep after error
262                info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
263
264                let notify = self.notify.clone();
265                System::current().arbiter().spawn(Box::pin(async move {
266                    sleep(ERR_SLEEP_TIMEOUT).await;
267                    notify.send(AcceptorCommand::Timer);
268                }));
269            } else {
270                info.registered.set(true);
271            }
272
273            break;
274        }
275    }
276
277    fn remove_source(&self, key: usize) {
278        let info = &self.sockets[key];
279
280        let result = if info.registered.get() {
281            self.poller.modify(&info.sock, Event::none(key))
282        } else {
283            return;
284        };
285
286        // stop listening for incoming connections
287        if let Err(err) = result {
288            log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
289        }
290    }
291
292    fn process_timer(&mut self) {
293        let now = Instant::now();
294        for key in 0..self.sockets.len() {
295            let info = &mut self.sockets[key];
296            if let Some(inst) = info.timeout.get() {
297                if now > inst && !self.backpressure {
298                    log::info!("Resuming socket listener on {} after timeout", info.addr);
299                    info.timeout.take();
300                    self.add_source(key);
301                }
302            }
303        }
304    }
305
306    fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
307        loop {
308            match self.rx.try_recv() {
309                Ok(cmd) => match cmd {
310                    AcceptorCommand::Stop(rx) => {
311                        if !self.backpressure {
312                            log::info!("Stopping accept loop");
313                            self.backpressure(true);
314                        }
315                        break Either::Right(Some(rx));
316                    }
317                    AcceptorCommand::Terminate => {
318                        log::info!("Stopping accept loop");
319                        self.backpressure(true);
320                        break Either::Right(None);
321                    }
322                    AcceptorCommand::Pause => {
323                        if !self.backpressure {
324                            log::info!("Pausing accept loop");
325                            self.backpressure(true);
326                        }
327                    }
328                    AcceptorCommand::Resume => {
329                        if self.backpressure {
330                            log::info!("Resuming accept loop");
331                            self.backpressure(false);
332                        }
333                    }
334                    AcceptorCommand::Timer => {
335                        self.process_timer();
336                    }
337                },
338                Err(err) => {
339                    break match err {
340                        mpsc::TryRecvError::Empty => Either::Left(()),
341                        mpsc::TryRecvError::Disconnected => {
342                            log::error!("Dropping accept loop");
343                            self.backpressure(true);
344                            Either::Right(None)
345                        }
346                    };
347                }
348            }
349        }
350    }
351
352    fn backpressure(&mut self, on: bool) {
353        self.update_status(if on {
354            ServerStatus::NotReady
355        } else {
356            ServerStatus::Ready
357        });
358
359        if self.backpressure && !on {
360            // handle backlog
361            while let Some(msg) = self.backlog.pop_front() {
362                if let Err(msg) = self.srv.process(msg) {
363                    log::trace!("Server is unavailable");
364                    self.backlog.push_front(msg);
365                    return;
366                }
367            }
368
369            // re-enable acceptors
370            self.backpressure = false;
371            for (key, info) in self.sockets.iter().enumerate() {
372                if info.timeout.get().is_none() {
373                    // socket with timeout will re-register itself after timeout
374                    log::info!(
375                        "Resuming socket listener on {} after back-pressure",
376                        info.addr
377                    );
378                    self.add_source(key);
379                }
380            }
381        } else if !self.backpressure && on {
382            self.backpressure = true;
383            for key in 0..self.sockets.len() {
384                // disable err timeout
385                let info = &mut self.sockets[key];
386                if info.timeout.take().is_none() {
387                    log::info!("Stopping socket listener on {}", info.addr);
388                    self.remove_source(key);
389                }
390            }
391        }
392    }
393
394    fn accept(&mut self, token: usize) -> bool {
395        loop {
396            if let Some(info) = self.sockets.get_mut(token) {
397                match info.sock.accept() {
398                    Ok(Some(io)) => {
399                        let msg = Connection {
400                            io,
401                            token: info.token,
402                        };
403                        if let Err(msg) = self.srv.process(msg) {
404                            log::trace!("Server is unavailable");
405                            self.backlog.push_back(msg);
406                            self.backpressure(true);
407                            return false;
408                        }
409                    }
410                    Ok(None) => return true,
411                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
412                    Err(ref e) if connection_error(e) => continue,
413                    Err(e) => {
414                        log::error!("Error accepting socket: {}", e);
415
416                        // sleep after error
417                        info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
418
419                        let notify = self.notify.clone();
420                        System::current().arbiter().spawn(Box::pin(async move {
421                            sleep(ERR_SLEEP_TIMEOUT).await;
422                            notify.send(AcceptorCommand::Timer);
423                        }));
424                        return false;
425                    }
426                }
427            }
428        }
429    }
430}
431
432/// This function defines errors that are per-connection. Which basically
433/// means that if we get this error from `accept()` system call it means
434/// next connection might be ready to be accepted.
435///
436/// All other errors will incur a timeout before next `accept()` is performed.
437/// The timeout is useful to handle resource exhaustion errors like ENFILE
438/// and EMFILE. Otherwise, could enter into tight loop.
439fn connection_error(e: &io::Error) -> bool {
440    e.kind() == io::ErrorKind::ConnectionRefused
441        || e.kind() == io::ErrorKind::ConnectionAborted
442        || e.kind() == io::ErrorKind::ConnectionReset
443}