1use std::sync::mpsc as sync_mpsc;
2use std::time::Duration;
3use std::{io, thread};
4
5use scrappy_rt::time::{delay_until, Instant};
6use scrappy_rt::System;
7use log::{error, info};
8use slab::Slab;
9
10use crate::server::Server;
11use crate::socket::{SocketAddr, SocketListener, StdListener};
12use crate::worker::{Conn, WorkerClient};
13use crate::Token;
14
15pub(crate) enum Command {
16 Pause,
17 Resume,
18 Stop,
19 Worker(WorkerClient),
20}
21
22struct ServerSocketInfo {
23 addr: SocketAddr,
24 token: Token,
25 sock: SocketListener,
26 timeout: Option<Instant>,
27}
28
29#[derive(Clone)]
30pub(crate) struct AcceptNotify(mio::SetReadiness);
31
32impl AcceptNotify {
33 pub(crate) fn new(ready: mio::SetReadiness) -> Self {
34 AcceptNotify(ready)
35 }
36
37 pub(crate) fn notify(&self) {
38 let _ = self.0.set_readiness(mio::Ready::readable());
39 }
40}
41
42impl Default for AcceptNotify {
43 fn default() -> Self {
44 AcceptNotify::new(mio::Registration::new2().1)
45 }
46}
47
48pub(crate) struct AcceptLoop {
49 cmd_reg: Option<mio::Registration>,
50 cmd_ready: mio::SetReadiness,
51 notify_reg: Option<mio::Registration>,
52 notify_ready: mio::SetReadiness,
53 tx: sync_mpsc::Sender<Command>,
54 rx: Option<sync_mpsc::Receiver<Command>>,
55 srv: Option<Server>,
56}
57
58impl AcceptLoop {
59 pub fn new(srv: Server) -> AcceptLoop {
60 let (tx, rx) = sync_mpsc::channel();
61 let (cmd_reg, cmd_ready) = mio::Registration::new2();
62 let (notify_reg, notify_ready) = mio::Registration::new2();
63
64 AcceptLoop {
65 tx,
66 cmd_ready,
67 cmd_reg: Some(cmd_reg),
68 notify_ready,
69 notify_reg: Some(notify_reg),
70 rx: Some(rx),
71 srv: Some(srv),
72 }
73 }
74
75 pub fn send(&self, msg: Command) {
76 let _ = self.tx.send(msg);
77 let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
78 }
79
80 pub fn get_notify(&self) -> AcceptNotify {
81 AcceptNotify::new(self.notify_ready.clone())
82 }
83
84 pub(crate) fn start(
85 &mut self,
86 socks: Vec<(Token, StdListener)>,
87 workers: Vec<WorkerClient>,
88 ) {
89 let srv = self.srv.take().expect("Can not re-use AcceptInfo");
90
91 Accept::start(
92 self.rx.take().expect("Can not re-use AcceptInfo"),
93 self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
94 self.notify_reg.take().expect("Can not re-use AcceptInfo"),
95 socks,
96 srv,
97 workers,
98 );
99 }
100}
101
102struct Accept {
103 poll: mio::Poll,
104 rx: sync_mpsc::Receiver<Command>,
105 sockets: Slab<ServerSocketInfo>,
106 workers: Vec<WorkerClient>,
107 srv: Server,
108 timer: (mio::Registration, mio::SetReadiness),
109 next: usize,
110 backpressure: bool,
111}
112
113const DELTA: usize = 100;
114const CMD: mio::Token = mio::Token(0);
115const TIMER: mio::Token = mio::Token(1);
116const NOTIFY: mio::Token = mio::Token(2);
117
118fn connection_error(e: &io::Error) -> bool {
126 e.kind() == io::ErrorKind::ConnectionRefused
127 || e.kind() == io::ErrorKind::ConnectionAborted
128 || e.kind() == io::ErrorKind::ConnectionReset
129}
130
131impl Accept {
132 #![allow(clippy::too_many_arguments)]
133 pub(crate) fn start(
134 rx: sync_mpsc::Receiver<Command>,
135 cmd_reg: mio::Registration,
136 notify_reg: mio::Registration,
137 socks: Vec<(Token, StdListener)>,
138 srv: Server,
139 workers: Vec<WorkerClient>,
140 ) {
141 let sys = System::current();
142
143 let _ = thread::Builder::new()
145 .name("scrappy-server accept loop".to_owned())
146 .spawn(move || {
147 System::set_current(sys);
148 let mut accept = Accept::new(rx, socks, workers, srv);
149
150 if let Err(err) = accept.poll.register(
152 &cmd_reg,
153 CMD,
154 mio::Ready::readable(),
155 mio::PollOpt::edge(),
156 ) {
157 panic!("Can not register Registration: {}", err);
158 }
159
160 if let Err(err) = accept.poll.register(
162 ¬ify_reg,
163 NOTIFY,
164 mio::Ready::readable(),
165 mio::PollOpt::edge(),
166 ) {
167 panic!("Can not register Registration: {}", err);
168 }
169
170 accept.poll();
171 });
172 }
173
174 fn new(
175 rx: sync_mpsc::Receiver<Command>,
176 socks: Vec<(Token, StdListener)>,
177 workers: Vec<WorkerClient>,
178 srv: Server,
179 ) -> Accept {
180 let poll = match mio::Poll::new() {
182 Ok(poll) => poll,
183 Err(err) => panic!("Can not create mio::Poll: {}", err),
184 };
185
186 let mut sockets = Slab::new();
188 for (hnd_token, lst) in socks.into_iter() {
189 let addr = lst.local_addr();
190
191 let server = lst.into_listener();
192 let entry = sockets.vacant_entry();
193 let token = entry.key();
194
195 if let Err(err) = poll.register(
197 &server,
198 mio::Token(token + DELTA),
199 mio::Ready::readable(),
200 mio::PollOpt::edge(),
201 ) {
202 panic!("Can not register io: {}", err);
203 }
204
205 entry.insert(ServerSocketInfo {
206 addr,
207 token: hnd_token,
208 sock: server,
209 timeout: None,
210 });
211 }
212
213 let (tm, tmr) = mio::Registration::new2();
215 if let Err(err) =
216 poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
217 {
218 panic!("Can not register Registration: {}", err);
219 }
220
221 Accept {
222 poll,
223 rx,
224 sockets,
225 workers,
226 srv,
227 next: 0,
228 timer: (tm, tmr),
229 backpressure: false,
230 }
231 }
232
233 fn poll(&mut self) {
234 let mut events = mio::Events::with_capacity(128);
236
237 loop {
238 if let Err(err) = self.poll.poll(&mut events, None) {
239 panic!("Poll error: {}", err);
240 }
241
242 for event in events.iter() {
243 let token = event.token();
244 match token {
245 CMD => {
246 if !self.process_cmd() {
247 return;
248 }
249 }
250 TIMER => self.process_timer(),
251 NOTIFY => self.backpressure(false),
252 _ => {
253 let token = usize::from(token);
254 if token < DELTA {
255 continue;
256 }
257 self.accept(token - DELTA);
258 }
259 }
260 }
261 }
262 }
263
264 fn process_timer(&mut self) {
265 let now = Instant::now();
266 for (token, info) in self.sockets.iter_mut() {
267 if let Some(inst) = info.timeout.take() {
268 if now > inst {
269 if let Err(err) = self.poll.register(
270 &info.sock,
271 mio::Token(token + DELTA),
272 mio::Ready::readable(),
273 mio::PollOpt::edge(),
274 ) {
275 error!("Can not register server socket {}", err);
276 } else {
277 info!("Resume accepting connections on {}", info.addr);
278 }
279 } else {
280 info.timeout = Some(inst);
281 }
282 }
283 }
284 }
285
286 fn process_cmd(&mut self) -> bool {
287 loop {
288 match self.rx.try_recv() {
289 Ok(cmd) => match cmd {
290 Command::Pause => {
291 for (_, info) in self.sockets.iter_mut() {
292 if let Err(err) = self.poll.deregister(&info.sock) {
293 error!("Can not deregister server socket {}", err);
294 } else {
295 info!("Paused accepting connections on {}", info.addr);
296 }
297 }
298 }
299 Command::Resume => {
300 for (token, info) in self.sockets.iter() {
301 if let Err(err) = self.poll.register(
302 &info.sock,
303 mio::Token(token + DELTA),
304 mio::Ready::readable(),
305 mio::PollOpt::edge(),
306 ) {
307 error!("Can not resume socket accept process: {}", err);
308 } else {
309 info!(
310 "Accepting connections on {} has been resumed",
311 info.addr
312 );
313 }
314 }
315 }
316 Command::Stop => {
317 for (_, info) in self.sockets.iter() {
318 let _ = self.poll.deregister(&info.sock);
319 }
320 return false;
321 }
322 Command::Worker(worker) => {
323 self.backpressure(false);
324 self.workers.push(worker);
325 }
326 },
327 Err(err) => match err {
328 sync_mpsc::TryRecvError::Empty => break,
329 sync_mpsc::TryRecvError::Disconnected => {
330 for (_, info) in self.sockets.iter() {
331 let _ = self.poll.deregister(&info.sock);
332 }
333 return false;
334 }
335 },
336 }
337 }
338 true
339 }
340
341 fn backpressure(&mut self, on: bool) {
342 if self.backpressure {
343 if !on {
344 self.backpressure = false;
345 for (token, info) in self.sockets.iter() {
346 if let Err(err) = self.poll.register(
347 &info.sock,
348 mio::Token(token + DELTA),
349 mio::Ready::readable(),
350 mio::PollOpt::edge(),
351 ) {
352 error!("Can not resume socket accept process: {}", err);
353 } else {
354 info!("Accepting connections on {} has been resumed", info.addr);
355 }
356 }
357 }
358 } else if on {
359 self.backpressure = true;
360 for (_, info) in self.sockets.iter() {
361 let _ = self.poll.deregister(&info.sock);
362 }
363 }
364 }
365
366 fn accept_one(&mut self, mut msg: Conn) {
367 if self.backpressure {
368 while !self.workers.is_empty() {
369 match self.workers[self.next].send(msg) {
370 Ok(_) => (),
371 Err(tmp) => {
372 self.srv.worker_faulted(self.workers[self.next].idx);
373 msg = tmp;
374 self.workers.swap_remove(self.next);
375 if self.workers.is_empty() {
376 error!("No workers");
377 return;
378 } else if self.workers.len() <= self.next {
379 self.next = 0;
380 }
381 continue;
382 }
383 }
384 self.next = (self.next + 1) % self.workers.len();
385 break;
386 }
387 } else {
388 let mut idx = 0;
389 while idx < self.workers.len() {
390 idx += 1;
391 if self.workers[self.next].available() {
392 match self.workers[self.next].send(msg) {
393 Ok(_) => {
394 self.next = (self.next + 1) % self.workers.len();
395 return;
396 }
397 Err(tmp) => {
398 self.srv.worker_faulted(self.workers[self.next].idx);
399 msg = tmp;
400 self.workers.swap_remove(self.next);
401 if self.workers.is_empty() {
402 error!("No workers");
403 self.backpressure(true);
404 return;
405 } else if self.workers.len() <= self.next {
406 self.next = 0;
407 }
408 continue;
409 }
410 }
411 }
412 self.next = (self.next + 1) % self.workers.len();
413 }
414 self.backpressure(true);
416 self.accept_one(msg);
417 }
418 }
419
420 fn accept(&mut self, token: usize) {
421 loop {
422 let msg = if let Some(info) = self.sockets.get_mut(token) {
423 match info.sock.accept() {
424 Ok(Some((io, addr))) => Conn {
425 io,
426 token: info.token,
427 peer: Some(addr),
428 },
429 Ok(None) => return,
430 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
431 Err(ref e) if connection_error(e) => continue,
432 Err(e) => {
433 error!("Error accepting connection: {}", e);
434 if let Err(err) = self.poll.deregister(&info.sock) {
435 error!("Can not deregister server socket {}", err);
436 }
437
438 info.timeout = Some(Instant::now() + Duration::from_millis(500));
440
441 let r = self.timer.1.clone();
442 System::current().arbiter().send(Box::pin(async move {
443 delay_until(Instant::now() + Duration::from_millis(510)).await;
444 let _ = r.set_readiness(mio::Ready::readable());
445 }));
446 return;
447 }
448 }
449 } else {
450 return;
451 };
452
453 self.accept_one(msg);
454 }
455 }
456}