Skip to main content

scrappy_server/
accept.rs

1use std::sync::mpsc as sync_mpsc;
2use std::time::Duration;
3use std::{io, thread};
4
5use scrappy_rt::time::{delay_until, Instant};
6use scrappy_rt::System;
7use log::{error, info};
8use slab::Slab;
9
10use crate::server::Server;
11use crate::socket::{SocketAddr, SocketListener, StdListener};
12use crate::worker::{Conn, WorkerClient};
13use crate::Token;
14
15pub(crate) enum Command {
16    Pause,
17    Resume,
18    Stop,
19    Worker(WorkerClient),
20}
21
22struct ServerSocketInfo {
23    addr: SocketAddr,
24    token: Token,
25    sock: SocketListener,
26    timeout: Option<Instant>,
27}
28
29#[derive(Clone)]
30pub(crate) struct AcceptNotify(mio::SetReadiness);
31
32impl AcceptNotify {
33    pub(crate) fn new(ready: mio::SetReadiness) -> Self {
34        AcceptNotify(ready)
35    }
36
37    pub(crate) fn notify(&self) {
38        let _ = self.0.set_readiness(mio::Ready::readable());
39    }
40}
41
42impl Default for AcceptNotify {
43    fn default() -> Self {
44        AcceptNotify::new(mio::Registration::new2().1)
45    }
46}
47
48pub(crate) struct AcceptLoop {
49    cmd_reg: Option<mio::Registration>,
50    cmd_ready: mio::SetReadiness,
51    notify_reg: Option<mio::Registration>,
52    notify_ready: mio::SetReadiness,
53    tx: sync_mpsc::Sender<Command>,
54    rx: Option<sync_mpsc::Receiver<Command>>,
55    srv: Option<Server>,
56}
57
58impl AcceptLoop {
59    pub fn new(srv: Server) -> AcceptLoop {
60        let (tx, rx) = sync_mpsc::channel();
61        let (cmd_reg, cmd_ready) = mio::Registration::new2();
62        let (notify_reg, notify_ready) = mio::Registration::new2();
63
64        AcceptLoop {
65            tx,
66            cmd_ready,
67            cmd_reg: Some(cmd_reg),
68            notify_ready,
69            notify_reg: Some(notify_reg),
70            rx: Some(rx),
71            srv: Some(srv),
72        }
73    }
74
75    pub fn send(&self, msg: Command) {
76        let _ = self.tx.send(msg);
77        let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
78    }
79
80    pub fn get_notify(&self) -> AcceptNotify {
81        AcceptNotify::new(self.notify_ready.clone())
82    }
83
84    pub(crate) fn start(
85        &mut self,
86        socks: Vec<(Token, StdListener)>,
87        workers: Vec<WorkerClient>,
88    ) {
89        let srv = self.srv.take().expect("Can not re-use AcceptInfo");
90
91        Accept::start(
92            self.rx.take().expect("Can not re-use AcceptInfo"),
93            self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
94            self.notify_reg.take().expect("Can not re-use AcceptInfo"),
95            socks,
96            srv,
97            workers,
98        );
99    }
100}
101
102struct Accept {
103    poll: mio::Poll,
104    rx: sync_mpsc::Receiver<Command>,
105    sockets: Slab<ServerSocketInfo>,
106    workers: Vec<WorkerClient>,
107    srv: Server,
108    timer: (mio::Registration, mio::SetReadiness),
109    next: usize,
110    backpressure: bool,
111}
112
113const DELTA: usize = 100;
114const CMD: mio::Token = mio::Token(0);
115const TIMER: mio::Token = mio::Token(1);
116const NOTIFY: mio::Token = mio::Token(2);
117
118/// This function defines errors that are per-connection. Which basically
119/// means that if we get this error from `accept()` system call it means
120/// next connection might be ready to be accepted.
121///
122/// All other errors will incur a timeout before next `accept()` is performed.
123/// The timeout is useful to handle resource exhaustion errors like ENFILE
124/// and EMFILE. Otherwise, could enter into tight loop.
125fn connection_error(e: &io::Error) -> bool {
126    e.kind() == io::ErrorKind::ConnectionRefused
127        || e.kind() == io::ErrorKind::ConnectionAborted
128        || e.kind() == io::ErrorKind::ConnectionReset
129}
130
131impl Accept {
132    #![allow(clippy::too_many_arguments)]
133    pub(crate) fn start(
134        rx: sync_mpsc::Receiver<Command>,
135        cmd_reg: mio::Registration,
136        notify_reg: mio::Registration,
137        socks: Vec<(Token, StdListener)>,
138        srv: Server,
139        workers: Vec<WorkerClient>,
140    ) {
141        let sys = System::current();
142
143        // start accept thread
144        let _ = thread::Builder::new()
145            .name("scrappy-server accept loop".to_owned())
146            .spawn(move || {
147                System::set_current(sys);
148                let mut accept = Accept::new(rx, socks, workers, srv);
149
150                // Start listening for incoming commands
151                if let Err(err) = accept.poll.register(
152                    &cmd_reg,
153                    CMD,
154                    mio::Ready::readable(),
155                    mio::PollOpt::edge(),
156                ) {
157                    panic!("Can not register Registration: {}", err);
158                }
159
160                // Start listening for notify updates
161                if let Err(err) = accept.poll.register(
162                    &notify_reg,
163                    NOTIFY,
164                    mio::Ready::readable(),
165                    mio::PollOpt::edge(),
166                ) {
167                    panic!("Can not register Registration: {}", err);
168                }
169
170                accept.poll();
171            });
172    }
173
174    fn new(
175        rx: sync_mpsc::Receiver<Command>,
176        socks: Vec<(Token, StdListener)>,
177        workers: Vec<WorkerClient>,
178        srv: Server,
179    ) -> Accept {
180        // Create a poll instance
181        let poll = match mio::Poll::new() {
182            Ok(poll) => poll,
183            Err(err) => panic!("Can not create mio::Poll: {}", err),
184        };
185
186        // Start accept
187        let mut sockets = Slab::new();
188        for (hnd_token, lst) in socks.into_iter() {
189            let addr = lst.local_addr();
190
191            let server = lst.into_listener();
192            let entry = sockets.vacant_entry();
193            let token = entry.key();
194
195            // Start listening for incoming connections
196            if let Err(err) = poll.register(
197                &server,
198                mio::Token(token + DELTA),
199                mio::Ready::readable(),
200                mio::PollOpt::edge(),
201            ) {
202                panic!("Can not register io: {}", err);
203            }
204
205            entry.insert(ServerSocketInfo {
206                addr,
207                token: hnd_token,
208                sock: server,
209                timeout: None,
210            });
211        }
212
213        // Timer
214        let (tm, tmr) = mio::Registration::new2();
215        if let Err(err) =
216            poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
217        {
218            panic!("Can not register Registration: {}", err);
219        }
220
221        Accept {
222            poll,
223            rx,
224            sockets,
225            workers,
226            srv,
227            next: 0,
228            timer: (tm, tmr),
229            backpressure: false,
230        }
231    }
232
233    fn poll(&mut self) {
234        // Create storage for events
235        let mut events = mio::Events::with_capacity(128);
236
237        loop {
238            if let Err(err) = self.poll.poll(&mut events, None) {
239                panic!("Poll error: {}", err);
240            }
241
242            for event in events.iter() {
243                let token = event.token();
244                match token {
245                    CMD => {
246                        if !self.process_cmd() {
247                            return;
248                        }
249                    }
250                    TIMER => self.process_timer(),
251                    NOTIFY => self.backpressure(false),
252                    _ => {
253                        let token = usize::from(token);
254                        if token < DELTA {
255                            continue;
256                        }
257                        self.accept(token - DELTA);
258                    }
259                }
260            }
261        }
262    }
263
264    fn process_timer(&mut self) {
265        let now = Instant::now();
266        for (token, info) in self.sockets.iter_mut() {
267            if let Some(inst) = info.timeout.take() {
268                if now > inst {
269                    if let Err(err) = self.poll.register(
270                        &info.sock,
271                        mio::Token(token + DELTA),
272                        mio::Ready::readable(),
273                        mio::PollOpt::edge(),
274                    ) {
275                        error!("Can not register server socket {}", err);
276                    } else {
277                        info!("Resume accepting connections on {}", info.addr);
278                    }
279                } else {
280                    info.timeout = Some(inst);
281                }
282            }
283        }
284    }
285
286    fn process_cmd(&mut self) -> bool {
287        loop {
288            match self.rx.try_recv() {
289                Ok(cmd) => match cmd {
290                    Command::Pause => {
291                        for (_, info) in self.sockets.iter_mut() {
292                            if let Err(err) = self.poll.deregister(&info.sock) {
293                                error!("Can not deregister server socket {}", err);
294                            } else {
295                                info!("Paused accepting connections on {}", info.addr);
296                            }
297                        }
298                    }
299                    Command::Resume => {
300                        for (token, info) in self.sockets.iter() {
301                            if let Err(err) = self.poll.register(
302                                &info.sock,
303                                mio::Token(token + DELTA),
304                                mio::Ready::readable(),
305                                mio::PollOpt::edge(),
306                            ) {
307                                error!("Can not resume socket accept process: {}", err);
308                            } else {
309                                info!(
310                                    "Accepting connections on {} has been resumed",
311                                    info.addr
312                                );
313                            }
314                        }
315                    }
316                    Command::Stop => {
317                        for (_, info) in self.sockets.iter() {
318                            let _ = self.poll.deregister(&info.sock);
319                        }
320                        return false;
321                    }
322                    Command::Worker(worker) => {
323                        self.backpressure(false);
324                        self.workers.push(worker);
325                    }
326                },
327                Err(err) => match err {
328                    sync_mpsc::TryRecvError::Empty => break,
329                    sync_mpsc::TryRecvError::Disconnected => {
330                        for (_, info) in self.sockets.iter() {
331                            let _ = self.poll.deregister(&info.sock);
332                        }
333                        return false;
334                    }
335                },
336            }
337        }
338        true
339    }
340
341    fn backpressure(&mut self, on: bool) {
342        if self.backpressure {
343            if !on {
344                self.backpressure = false;
345                for (token, info) in self.sockets.iter() {
346                    if let Err(err) = self.poll.register(
347                        &info.sock,
348                        mio::Token(token + DELTA),
349                        mio::Ready::readable(),
350                        mio::PollOpt::edge(),
351                    ) {
352                        error!("Can not resume socket accept process: {}", err);
353                    } else {
354                        info!("Accepting connections on {} has been resumed", info.addr);
355                    }
356                }
357            }
358        } else if on {
359            self.backpressure = true;
360            for (_, info) in self.sockets.iter() {
361                let _ = self.poll.deregister(&info.sock);
362            }
363        }
364    }
365
366    fn accept_one(&mut self, mut msg: Conn) {
367        if self.backpressure {
368            while !self.workers.is_empty() {
369                match self.workers[self.next].send(msg) {
370                    Ok(_) => (),
371                    Err(tmp) => {
372                        self.srv.worker_faulted(self.workers[self.next].idx);
373                        msg = tmp;
374                        self.workers.swap_remove(self.next);
375                        if self.workers.is_empty() {
376                            error!("No workers");
377                            return;
378                        } else if self.workers.len() <= self.next {
379                            self.next = 0;
380                        }
381                        continue;
382                    }
383                }
384                self.next = (self.next + 1) % self.workers.len();
385                break;
386            }
387        } else {
388            let mut idx = 0;
389            while idx < self.workers.len() {
390                idx += 1;
391                if self.workers[self.next].available() {
392                    match self.workers[self.next].send(msg) {
393                        Ok(_) => {
394                            self.next = (self.next + 1) % self.workers.len();
395                            return;
396                        }
397                        Err(tmp) => {
398                            self.srv.worker_faulted(self.workers[self.next].idx);
399                            msg = tmp;
400                            self.workers.swap_remove(self.next);
401                            if self.workers.is_empty() {
402                                error!("No workers");
403                                self.backpressure(true);
404                                return;
405                            } else if self.workers.len() <= self.next {
406                                self.next = 0;
407                            }
408                            continue;
409                        }
410                    }
411                }
412                self.next = (self.next + 1) % self.workers.len();
413            }
414            // enable backpressure
415            self.backpressure(true);
416            self.accept_one(msg);
417        }
418    }
419
420    fn accept(&mut self, token: usize) {
421        loop {
422            let msg = if let Some(info) = self.sockets.get_mut(token) {
423                match info.sock.accept() {
424                    Ok(Some((io, addr))) => Conn {
425                        io,
426                        token: info.token,
427                        peer: Some(addr),
428                    },
429                    Ok(None) => return,
430                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
431                    Err(ref e) if connection_error(e) => continue,
432                    Err(e) => {
433                        error!("Error accepting connection: {}", e);
434                        if let Err(err) = self.poll.deregister(&info.sock) {
435                            error!("Can not deregister server socket {}", err);
436                        }
437
438                        // sleep after error
439                        info.timeout = Some(Instant::now() + Duration::from_millis(500));
440
441                        let r = self.timer.1.clone();
442                        System::current().arbiter().send(Box::pin(async move {
443                            delay_until(Instant::now() + Duration::from_millis(510)).await;
444                            let _ = r.set_readiness(mio::Ready::readable());
445                        }));
446                        return;
447                    }
448                }
449            } else {
450                return;
451            };
452
453            self.accept_one(msg);
454        }
455    }
456}