1use std::time::{Duration, Instant};
2use std::{cell::Cell, fmt, io, sync::mpsc, sync::Arc, thread};
3use std::{collections::VecDeque, num::NonZeroUsize};
4
5use ntex_rt::System;
6use ntex_util::{future::Either, time::sleep, time::Millis};
7use polling::{Event, Events, Poller};
8
9use super::socket::{Connection, Listener, SocketAddr};
10use super::{Server, ServerStatus, Token};
11
12const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
13const ERR_TIMEOUT: Duration = Duration::from_millis(500);
14const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
15
16#[derive(Debug)]
17pub enum AcceptorCommand {
18 Stop(oneshot::Sender<()>),
19 Terminate,
20 Pause,
21 Resume,
22 Timer,
23}
24
25#[derive(Debug)]
26struct ServerSocketInfo {
27 addr: SocketAddr,
28 token: Token,
29 sock: Listener,
30 registered: Cell<bool>,
31 timeout: Cell<Option<Instant>>,
32}
33
34#[derive(Debug, Clone)]
35pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
36
37impl AcceptNotify {
38 fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
39 AcceptNotify(waker, tx)
40 }
41
42 pub fn send(&self, cmd: AcceptorCommand) {
43 let _ = self.1.send(cmd);
44 let _ = self.0.notify();
45 }
46}
47
48pub struct AcceptLoop {
50 notify: AcceptNotify,
51 inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
52 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
53}
54
55impl Default for AcceptLoop {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl AcceptLoop {
62 pub fn new() -> AcceptLoop {
64 let poll = Arc::new(
66 Poller::new()
67 .map_err(|e| panic!("Cannot create Polller {}", e))
68 .unwrap(),
69 );
70
71 let (tx, rx) = mpsc::channel();
72 let notify = AcceptNotify::new(poll.clone(), tx);
73
74 AcceptLoop {
75 notify,
76 inner: Some((rx, poll)),
77 status_handler: None,
78 }
79 }
80
81 pub fn notify(&self) -> AcceptNotify {
83 self.notify.clone()
84 }
85
86 pub fn set_status_handler<F>(&mut self, f: F)
87 where
88 F: FnMut(ServerStatus) + Send + 'static,
89 {
90 self.status_handler = Some(Box::new(f));
91 }
92
93 pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
95 let (tx, rx_start) = oneshot::channel();
96 let (rx, poll) = self
97 .inner
98 .take()
99 .expect("AcceptLoop cannot be used multiple times");
100
101 Accept::start(
102 tx,
103 rx,
104 poll,
105 socks,
106 srv,
107 self.notify.clone(),
108 self.status_handler.take(),
109 );
110
111 let _ = rx_start.recv();
112 }
113}
114
115impl fmt::Debug for AcceptLoop {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("AcceptLoop")
118 .field("notify", &self.notify)
119 .field("inner", &self.inner)
120 .field("status_handler", &self.status_handler.is_some())
121 .finish()
122 }
123}
124
125struct Accept {
126 poller: Arc<Poller>,
127 rx: mpsc::Receiver<AcceptorCommand>,
128 tx: Option<oneshot::Sender<()>>,
129 sockets: Vec<ServerSocketInfo>,
130 srv: Server,
131 notify: AcceptNotify,
132 backpressure: bool,
133 backlog: VecDeque<Connection>,
134 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
135}
136
137impl Accept {
138 fn start(
139 tx: oneshot::Sender<()>,
140 rx: mpsc::Receiver<AcceptorCommand>,
141 poller: Arc<Poller>,
142 socks: Vec<(Token, Listener)>,
143 srv: Server,
144 notify: AcceptNotify,
145 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
146 ) {
147 let sys = System::current();
148
149 let _ = thread::Builder::new()
151 .name("ntex-server accept loop".to_owned())
152 .spawn(move || {
153 System::set_current(sys);
154 Accept::new(tx, rx, poller, socks, srv, notify, status_handler).poll()
155 });
156 }
157
158 fn new(
159 tx: oneshot::Sender<()>,
160 rx: mpsc::Receiver<AcceptorCommand>,
161 poller: Arc<Poller>,
162 socks: Vec<(Token, Listener)>,
163 srv: Server,
164 notify: AcceptNotify,
165 status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
166 ) -> Accept {
167 let mut sockets = Vec::new();
168 for (hnd_token, lst) in socks.into_iter() {
169 sockets.push(ServerSocketInfo {
170 addr: lst.local_addr(),
171 sock: lst,
172 token: hnd_token,
173 registered: Cell::new(false),
174 timeout: Cell::new(None),
175 });
176 }
177
178 Accept {
179 poller,
180 rx,
181 sockets,
182 notify,
183 srv,
184 status_handler,
185 tx: Some(tx),
186 backpressure: true,
187 backlog: VecDeque::new(),
188 }
189 }
190
191 fn update_status(&mut self, st: ServerStatus) {
192 if let Some(ref mut hnd) = self.status_handler {
193 (*hnd)(st)
194 }
195 }
196
197 fn poll(&mut self) {
198 log::trace!("Starting server accept loop");
199
200 let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
202
203 let mut timeout = Some(Duration::ZERO);
204 loop {
205 if let Err(e) = self.poller.wait(&mut events, timeout) {
206 if e.kind() != io::ErrorKind::Interrupted {
207 panic!("Cannot wait for events in poller: {}", e)
208 }
209 } else if timeout.is_some() {
210 timeout = None;
211 let _ = self.tx.take().unwrap().send(());
212 }
213
214 for idx in 0..self.sockets.len() {
215 if self.sockets[idx].registered.get() {
216 let readd = self.accept(idx);
217 if readd {
218 self.add_source(idx);
219 }
220 }
221 }
222
223 match self.process_cmd() {
224 Either::Left(_) => events.clear(),
225 Either::Right(rx) => {
226 for info in self.sockets.drain(..) {
228 info.sock.remove_source()
229 }
230 log::info!("Accept loop has been stopped");
231
232 if let Some(rx) = rx {
233 thread::sleep(EXIT_TIMEOUT);
234 let _ = rx.send(());
235 }
236
237 break;
238 }
239 }
240 }
241 }
242
243 fn add_source(&self, idx: usize) {
244 let info = &self.sockets[idx];
245
246 loop {
247 let result = if info.registered.get() {
249 self.poller.modify(&info.sock, Event::readable(idx))
250 } else {
251 unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
252 };
253 if let Err(err) = result {
254 if err.kind() == io::ErrorKind::WouldBlock {
255 continue;
256 }
257 log::error!("Cannot register socket listener: {}", err);
258
259 info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
261
262 let notify = self.notify.clone();
263 System::current().arbiter().spawn(Box::pin(async move {
264 sleep(ERR_SLEEP_TIMEOUT).await;
265 notify.send(AcceptorCommand::Timer);
266 }));
267 } else {
268 info.registered.set(true);
269 }
270
271 break;
272 }
273 }
274
275 fn remove_source(&self, key: usize) {
276 let info = &self.sockets[key];
277
278 let result = if info.registered.get() {
279 self.poller.modify(&info.sock, Event::none(key))
280 } else {
281 return;
282 };
283
284 if let Err(err) = result {
286 log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
287 }
288 }
289
290 fn process_timer(&mut self) {
291 let now = Instant::now();
292 for key in 0..self.sockets.len() {
293 let info = &mut self.sockets[key];
294 if let Some(inst) = info.timeout.get() {
295 if now > inst && !self.backpressure {
296 log::info!("Resuming socket listener on {} after timeout", info.addr);
297 info.timeout.take();
298 self.add_source(key);
299 }
300 }
301 }
302 }
303
304 fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
305 loop {
306 match self.rx.try_recv() {
307 Ok(cmd) => match cmd {
308 AcceptorCommand::Stop(rx) => {
309 if !self.backpressure {
310 log::info!("Stopping accept loop");
311 self.backpressure(true);
312 }
313 break Either::Right(Some(rx));
314 }
315 AcceptorCommand::Terminate => {
316 log::info!("Stopping accept loop");
317 self.backpressure(true);
318 break Either::Right(None);
319 }
320 AcceptorCommand::Pause => {
321 if !self.backpressure {
322 log::info!("Pausing accept loop");
323 self.backpressure(true);
324 }
325 }
326 AcceptorCommand::Resume => {
327 if self.backpressure {
328 log::info!("Resuming accept loop");
329 self.backpressure(false);
330 }
331 }
332 AcceptorCommand::Timer => {
333 self.process_timer();
334 }
335 },
336 Err(err) => {
337 break match err {
338 mpsc::TryRecvError::Empty => Either::Left(()),
339 mpsc::TryRecvError::Disconnected => {
340 log::error!("Dropping accept loop");
341 self.backpressure(true);
342 Either::Right(None)
343 }
344 };
345 }
346 }
347 }
348 }
349
350 fn backpressure(&mut self, on: bool) {
351 self.update_status(if on {
352 ServerStatus::NotReady
353 } else {
354 ServerStatus::Ready
355 });
356
357 if self.backpressure && !on {
358 while let Some(msg) = self.backlog.pop_front() {
360 if let Err(msg) = self.srv.process(msg) {
361 log::trace!("Server is unavailable");
362 self.backlog.push_front(msg);
363 return;
364 }
365 }
366
367 self.backpressure = false;
369 for (key, info) in self.sockets.iter().enumerate() {
370 if info.timeout.get().is_none() {
371 log::info!(
373 "Resuming socket listener on {} after back-pressure",
374 info.addr
375 );
376 self.add_source(key);
377 }
378 }
379 } else if !self.backpressure && on {
380 self.backpressure = true;
381 for key in 0..self.sockets.len() {
382 let info = &mut self.sockets[key];
384 if info.timeout.take().is_none() {
385 log::info!("Stopping socket listener on {}", info.addr);
386 self.remove_source(key);
387 }
388 }
389 }
390 }
391
392 fn accept(&mut self, token: usize) -> bool {
393 loop {
394 if let Some(info) = self.sockets.get_mut(token) {
395 match info.sock.accept() {
396 Ok(Some(io)) => {
397 let msg = Connection {
398 io,
399 token: info.token,
400 };
401 if let Err(msg) = self.srv.process(msg) {
402 log::trace!("Server is unavailable");
403 self.backlog.push_back(msg);
404 self.backpressure(true);
405 return false;
406 }
407 }
408 Ok(None) => return true,
409 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
410 Err(ref e) if connection_error(e) => continue,
411 Err(e) => {
412 log::error!("Error accepting socket: {}", e);
413
414 info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
416
417 let notify = self.notify.clone();
418 System::current().arbiter().spawn(Box::pin(async move {
419 sleep(ERR_SLEEP_TIMEOUT).await;
420 notify.send(AcceptorCommand::Timer);
421 }));
422 return false;
423 }
424 }
425 }
426 }
427 }
428}
429
430fn connection_error(e: &io::Error) -> bool {
438 e.kind() == io::ErrorKind::ConnectionRefused
439 || e.kind() == io::ErrorKind::ConnectionAborted
440 || e.kind() == io::ErrorKind::ConnectionReset
441}