Skip to main content

requiem_server/
worker.rs

1use std::pin::Pin;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time;
6
7use requiem_rt::time::{delay_until, Delay, Instant};
8use requiem_rt::{spawn, Arbiter};
9use requiem_utils::counter::Counter;
10use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
11use futures::channel::oneshot;
12use futures::future::{join_all, LocalBoxFuture, MapOk};
13use futures::{Future, FutureExt, Stream, TryFutureExt};
14use log::{error, info, trace};
15
16use crate::accept::AcceptNotify;
17use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
18use crate::socket::{SocketAddr, StdStream};
19use crate::Token;
20
21pub(crate) struct WorkerCommand(Conn);
22
23/// Stop worker message. Returns `true` on successful shutdown
24/// and `false` if some connections still alive.
25pub(crate) struct StopCommand {
26    graceful: bool,
27    result: oneshot::Sender<bool>,
28}
29
30#[derive(Debug)]
31pub(crate) struct Conn {
32    pub io: StdStream,
33    pub token: Token,
34    pub peer: Option<SocketAddr>,
35}
36
37static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
38
39/// Sets the maximum per-worker number of concurrent connections.
40///
41/// All socket listeners will stop accepting connections when this limit is
42/// reached for each worker.
43///
44/// By default max connections is set to a 25k per worker.
45pub fn max_concurrent_connections(num: usize) {
46    MAX_CONNS.store(num, Ordering::Relaxed);
47}
48
49pub(crate) fn num_connections() -> usize {
50    MAX_CONNS_COUNTER.with(|conns| conns.total())
51}
52
53thread_local! {
54    static MAX_CONNS_COUNTER: Counter =
55        Counter::new(MAX_CONNS.load(Ordering::Relaxed));
56}
57
58#[derive(Clone)]
59pub(crate) struct WorkerClient {
60    pub idx: usize,
61    tx1: UnboundedSender<WorkerCommand>,
62    tx2: UnboundedSender<StopCommand>,
63    avail: WorkerAvailability,
64}
65
66impl WorkerClient {
67    pub fn new(
68        idx: usize,
69        tx1: UnboundedSender<WorkerCommand>,
70        tx2: UnboundedSender<StopCommand>,
71        avail: WorkerAvailability,
72    ) -> Self {
73        WorkerClient {
74            idx,
75            tx1,
76            tx2,
77            avail,
78        }
79    }
80
81    pub fn send(&self, msg: Conn) -> Result<(), Conn> {
82        self.tx1
83            .unbounded_send(WorkerCommand(msg))
84            .map_err(|msg| msg.into_inner().0)
85    }
86
87    pub fn available(&self) -> bool {
88        self.avail.available()
89    }
90
91    pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
92        let (result, rx) = oneshot::channel();
93        let _ = self.tx2.unbounded_send(StopCommand { graceful, result });
94        rx
95    }
96}
97
98#[derive(Clone)]
99pub(crate) struct WorkerAvailability {
100    notify: AcceptNotify,
101    available: Arc<AtomicBool>,
102}
103
104impl WorkerAvailability {
105    pub fn new(notify: AcceptNotify) -> Self {
106        WorkerAvailability {
107            notify,
108            available: Arc::new(AtomicBool::new(false)),
109        }
110    }
111
112    pub fn available(&self) -> bool {
113        self.available.load(Ordering::Acquire)
114    }
115
116    pub fn set(&self, val: bool) {
117        let old = self.available.swap(val, Ordering::Release);
118        if !old && val {
119            self.notify.notify()
120        }
121    }
122}
123
124/// Service worker
125///
126/// Worker accepts Socket objects via unbounded channel and starts stream
127/// processing.
128pub(crate) struct Worker {
129    rx: UnboundedReceiver<WorkerCommand>,
130    rx2: UnboundedReceiver<StopCommand>,
131    services: Vec<WorkerService>,
132    availability: WorkerAvailability,
133    conns: Counter,
134    factories: Vec<Box<dyn InternalServiceFactory>>,
135    state: WorkerState,
136    shutdown_timeout: time::Duration,
137}
138
139struct WorkerService {
140    factory: usize,
141    status: WorkerServiceStatus,
142    service: BoxedServerService,
143}
144
145impl WorkerService {
146    fn created(&mut self, service: BoxedServerService) {
147        self.service = service;
148        self.status = WorkerServiceStatus::Unavailable;
149    }
150}
151
152#[derive(Copy, Clone, Debug, PartialEq, Eq)]
153enum WorkerServiceStatus {
154    Available,
155    Unavailable,
156    Failed,
157    Restarting,
158    Stopping,
159    Stopped,
160}
161
162impl Worker {
163    pub(crate) fn start(
164        idx: usize,
165        factories: Vec<Box<dyn InternalServiceFactory>>,
166        availability: WorkerAvailability,
167        shutdown_timeout: time::Duration,
168    ) -> WorkerClient {
169        let (tx1, rx) = unbounded();
170        let (tx2, rx2) = unbounded();
171        let avail = availability.clone();
172
173        Arbiter::new().send(
174            async move {
175                availability.set(false);
176                let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
177                    rx,
178                    rx2,
179                    availability,
180                    factories,
181                    shutdown_timeout,
182                    services: Vec::new(),
183                    conns: conns.clone(),
184                    state: WorkerState::Unavailable(Vec::new()),
185                });
186
187                let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
188                for (idx, factory) in wrk.factories.iter().enumerate() {
189                    fut.push(factory.create().map_ok(move |r| {
190                        r.into_iter()
191                            .map(|(t, s): (Token, _)| (idx, t, s))
192                            .collect::<Vec<_>>()
193                    }));
194                }
195
196                spawn(async move {
197                    let res = join_all(fut).await;
198                    let res: Result<Vec<_>, _> = res.into_iter().collect();
199                    match res {
200                        Ok(services) => {
201                            for item in services {
202                                for (factory, token, service) in item {
203                                    assert_eq!(token.0, wrk.services.len());
204                                    wrk.services.push(WorkerService {
205                                        factory,
206                                        service,
207                                        status: WorkerServiceStatus::Unavailable,
208                                    });
209                                }
210                            }
211                        }
212                        Err(e) => {
213                            error!("Can not start worker: {:?}", e);
214                            Arbiter::current().stop();
215                        }
216                    }
217                    wrk.await
218                });
219            }
220            .boxed(),
221        );
222
223        WorkerClient::new(idx, tx1, tx2, avail)
224    }
225
226    fn shutdown(&mut self, force: bool) {
227        if force {
228            self.services.iter_mut().for_each(|srv| {
229                if srv.status == WorkerServiceStatus::Available {
230                    srv.status = WorkerServiceStatus::Stopped;
231                    requiem_rt::spawn(
232                        srv.service
233                            .call((None, ServerMessage::ForceShutdown))
234                            .map(|_| ()),
235                    );
236                }
237            });
238        } else {
239            let timeout = self.shutdown_timeout;
240            self.services.iter_mut().for_each(move |srv| {
241                if srv.status == WorkerServiceStatus::Available {
242                    srv.status = WorkerServiceStatus::Stopping;
243                    requiem_rt::spawn(
244                        srv.service
245                            .call((None, ServerMessage::Shutdown(timeout)))
246                            .map(|_| ()),
247                    );
248                }
249            });
250        }
251    }
252
253    fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
254        let mut ready = self.conns.available(cx);
255        let mut failed = None;
256        for (idx, srv) in &mut self.services.iter_mut().enumerate() {
257            if srv.status == WorkerServiceStatus::Available
258                || srv.status == WorkerServiceStatus::Unavailable
259            {
260                match srv.service.poll_ready(cx) {
261                    Poll::Ready(Ok(_)) => {
262                        if srv.status == WorkerServiceStatus::Unavailable {
263                            trace!(
264                                "Service {:?} is available",
265                                self.factories[srv.factory].name(Token(idx))
266                            );
267                            srv.status = WorkerServiceStatus::Available;
268                        }
269                    }
270                    Poll::Pending => {
271                        ready = false;
272
273                        if srv.status == WorkerServiceStatus::Available {
274                            trace!(
275                                "Service {:?} is unavailable",
276                                self.factories[srv.factory].name(Token(idx))
277                            );
278                            srv.status = WorkerServiceStatus::Unavailable;
279                        }
280                    }
281                    Poll::Ready(Err(_)) => {
282                        error!(
283                            "Service {:?} readiness check returned error, restarting",
284                            self.factories[srv.factory].name(Token(idx))
285                        );
286                        failed = Some((Token(idx), srv.factory));
287                        srv.status = WorkerServiceStatus::Failed;
288                    }
289                }
290            }
291        }
292        if let Some(idx) = failed {
293            Err(idx)
294        } else {
295            Ok(ready)
296        }
297    }
298}
299
300enum WorkerState {
301    Available,
302    Unavailable(Vec<Conn>),
303    Restarting(
304        usize,
305        Token,
306        Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
307    ),
308    Shutdown(
309        Pin<Box<Delay>>,
310        Pin<Box<Delay>>,
311        Option<oneshot::Sender<bool>>,
312    ),
313}
314
315impl Future for Worker {
316    type Output = ();
317
318    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
319        // `StopWorker` message handler
320        if let Poll::Ready(Some(StopCommand { graceful, result })) =
321            Pin::new(&mut self.rx2).poll_next(cx)
322        {
323            self.availability.set(false);
324            let num = num_connections();
325            if num == 0 {
326                info!("Shutting down worker, 0 connections");
327                let _ = result.send(true);
328                return Poll::Ready(());
329            } else if graceful {
330                self.shutdown(false);
331                let num = num_connections();
332                if num != 0 {
333                    info!("Graceful worker shutdown, {} connections", num);
334                    self.state = WorkerState::Shutdown(
335                        Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))),
336                        Box::pin(delay_until(Instant::now() + self.shutdown_timeout)),
337                        Some(result),
338                    );
339                } else {
340                    let _ = result.send(true);
341                    return Poll::Ready(());
342                }
343            } else {
344                info!("Force shutdown worker, {} connections", num);
345                self.shutdown(true);
346                let _ = result.send(false);
347                return Poll::Ready(());
348            }
349        }
350
351        match self.state {
352            WorkerState::Unavailable(ref mut conns) => {
353                let conn = conns.pop();
354                match self.check_readiness(cx) {
355                    Ok(true) => {
356                        // process requests from wait queue
357                        if let Some(conn) = conn {
358                            let guard = self.conns.get();
359                            let _ = self.services[conn.token.0]
360                                .service
361                                .call((Some(guard), ServerMessage::Connect(conn.io)));
362                        } else {
363                            self.state = WorkerState::Available;
364                            self.availability.set(true);
365                        }
366                        self.poll(cx)
367                    }
368                    Ok(false) => {
369                        // push connection back to queue
370                        if let Some(conn) = conn {
371                            match self.state {
372                                WorkerState::Unavailable(ref mut conns) => {
373                                    conns.push(conn);
374                                }
375                                _ => (),
376                            }
377                        }
378                        Poll::Pending
379                    }
380                    Err((token, idx)) => {
381                        trace!(
382                            "Service {:?} failed, restarting",
383                            self.factories[idx].name(token)
384                        );
385                        self.services[token.0].status = WorkerServiceStatus::Restarting;
386                        self.state =
387                            WorkerState::Restarting(idx, token, self.factories[idx].create());
388                        self.poll(cx)
389                    }
390                }
391            }
392            WorkerState::Restarting(idx, token, ref mut fut) => {
393                match Pin::new(fut).poll(cx) {
394                    Poll::Ready(Ok(item)) => {
395                        for (token, service) in item {
396                            trace!(
397                                "Service {:?} has been restarted",
398                                self.factories[idx].name(token)
399                            );
400                            self.services[token.0].created(service);
401                            self.state = WorkerState::Unavailable(Vec::new());
402                            return self.poll(cx);
403                        }
404                    }
405                    Poll::Ready(Err(_)) => {
406                        panic!(
407                            "Can not restart {:?} service",
408                            self.factories[idx].name(token)
409                        );
410                    }
411                    Poll::Pending => {
412                        return Poll::Pending;
413                    }
414                }
415                self.poll(cx)
416            }
417            WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
418                let num = num_connections();
419                if num == 0 {
420                    let _ = tx.take().unwrap().send(true);
421                    Arbiter::current().stop();
422                    return Poll::Ready(());
423                }
424
425                // check graceful timeout
426                match t2.as_mut().poll(cx) {
427                    Poll::Pending => (),
428                    Poll::Ready(_) => {
429                        let _ = tx.take().unwrap().send(false);
430                        self.shutdown(true);
431                        Arbiter::current().stop();
432                        return Poll::Ready(());
433                    }
434                }
435
436                // sleep for 1 second and then check again
437                match t1.as_mut().poll(cx) {
438                    Poll::Pending => (),
439                    Poll::Ready(_) => {
440                        *t1 = Box::pin(delay_until(
441                            Instant::now() + time::Duration::from_secs(1),
442                        ));
443                        let _ = t1.as_mut().poll(cx);
444                    }
445                }
446                Poll::Pending
447            }
448            WorkerState::Available => {
449                loop {
450                    match Pin::new(&mut self.rx).poll_next(cx) {
451                        // handle incoming io stream
452                        Poll::Ready(Some(WorkerCommand(msg))) => {
453                            match self.check_readiness(cx) {
454                                Ok(true) => {
455                                    let guard = self.conns.get();
456                                    let _ = self.services[msg.token.0]
457                                        .service
458                                        .call((Some(guard), ServerMessage::Connect(msg.io)));
459                                    continue;
460                                }
461                                Ok(false) => {
462                                    trace!("Worker is unavailable");
463                                    self.availability.set(false);
464                                    self.state = WorkerState::Unavailable(vec![msg]);
465                                }
466                                Err((token, idx)) => {
467                                    trace!(
468                                        "Service {:?} failed, restarting",
469                                        self.factories[idx].name(token)
470                                    );
471                                    self.availability.set(false);
472                                    self.services[token.0].status =
473                                        WorkerServiceStatus::Restarting;
474                                    self.state = WorkerState::Restarting(
475                                        idx,
476                                        token,
477                                        self.factories[idx].create(),
478                                    );
479                                }
480                            }
481                            return self.poll(cx);
482                        }
483                        Poll::Pending => {
484                            self.state = WorkerState::Available;
485                            return Poll::Pending;
486                        }
487                        Poll::Ready(None) => return Poll::Ready(()),
488                    }
489                }
490            }
491        }
492    }
493}