1use std::pin::Pin;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time;
6
7use requiem_rt::time::{delay_until, Delay, Instant};
8use requiem_rt::{spawn, Arbiter};
9use requiem_utils::counter::Counter;
10use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
11use futures::channel::oneshot;
12use futures::future::{join_all, LocalBoxFuture, MapOk};
13use futures::{Future, FutureExt, Stream, TryFutureExt};
14use log::{error, info, trace};
15
16use crate::accept::AcceptNotify;
17use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
18use crate::socket::{SocketAddr, StdStream};
19use crate::Token;
20
21pub(crate) struct WorkerCommand(Conn);
22
23pub(crate) struct StopCommand {
26 graceful: bool,
27 result: oneshot::Sender<bool>,
28}
29
30#[derive(Debug)]
31pub(crate) struct Conn {
32 pub io: StdStream,
33 pub token: Token,
34 pub peer: Option<SocketAddr>,
35}
36
37static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
38
39pub fn max_concurrent_connections(num: usize) {
46 MAX_CONNS.store(num, Ordering::Relaxed);
47}
48
49pub(crate) fn num_connections() -> usize {
50 MAX_CONNS_COUNTER.with(|conns| conns.total())
51}
52
53thread_local! {
54 static MAX_CONNS_COUNTER: Counter =
55 Counter::new(MAX_CONNS.load(Ordering::Relaxed));
56}
57
58#[derive(Clone)]
59pub(crate) struct WorkerClient {
60 pub idx: usize,
61 tx1: UnboundedSender<WorkerCommand>,
62 tx2: UnboundedSender<StopCommand>,
63 avail: WorkerAvailability,
64}
65
66impl WorkerClient {
67 pub fn new(
68 idx: usize,
69 tx1: UnboundedSender<WorkerCommand>,
70 tx2: UnboundedSender<StopCommand>,
71 avail: WorkerAvailability,
72 ) -> Self {
73 WorkerClient {
74 idx,
75 tx1,
76 tx2,
77 avail,
78 }
79 }
80
81 pub fn send(&self, msg: Conn) -> Result<(), Conn> {
82 self.tx1
83 .unbounded_send(WorkerCommand(msg))
84 .map_err(|msg| msg.into_inner().0)
85 }
86
87 pub fn available(&self) -> bool {
88 self.avail.available()
89 }
90
91 pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
92 let (result, rx) = oneshot::channel();
93 let _ = self.tx2.unbounded_send(StopCommand { graceful, result });
94 rx
95 }
96}
97
98#[derive(Clone)]
99pub(crate) struct WorkerAvailability {
100 notify: AcceptNotify,
101 available: Arc<AtomicBool>,
102}
103
104impl WorkerAvailability {
105 pub fn new(notify: AcceptNotify) -> Self {
106 WorkerAvailability {
107 notify,
108 available: Arc::new(AtomicBool::new(false)),
109 }
110 }
111
112 pub fn available(&self) -> bool {
113 self.available.load(Ordering::Acquire)
114 }
115
116 pub fn set(&self, val: bool) {
117 let old = self.available.swap(val, Ordering::Release);
118 if !old && val {
119 self.notify.notify()
120 }
121 }
122}
123
124pub(crate) struct Worker {
129 rx: UnboundedReceiver<WorkerCommand>,
130 rx2: UnboundedReceiver<StopCommand>,
131 services: Vec<WorkerService>,
132 availability: WorkerAvailability,
133 conns: Counter,
134 factories: Vec<Box<dyn InternalServiceFactory>>,
135 state: WorkerState,
136 shutdown_timeout: time::Duration,
137}
138
139struct WorkerService {
140 factory: usize,
141 status: WorkerServiceStatus,
142 service: BoxedServerService,
143}
144
145impl WorkerService {
146 fn created(&mut self, service: BoxedServerService) {
147 self.service = service;
148 self.status = WorkerServiceStatus::Unavailable;
149 }
150}
151
152#[derive(Copy, Clone, Debug, PartialEq, Eq)]
153enum WorkerServiceStatus {
154 Available,
155 Unavailable,
156 Failed,
157 Restarting,
158 Stopping,
159 Stopped,
160}
161
162impl Worker {
163 pub(crate) fn start(
164 idx: usize,
165 factories: Vec<Box<dyn InternalServiceFactory>>,
166 availability: WorkerAvailability,
167 shutdown_timeout: time::Duration,
168 ) -> WorkerClient {
169 let (tx1, rx) = unbounded();
170 let (tx2, rx2) = unbounded();
171 let avail = availability.clone();
172
173 Arbiter::new().send(
174 async move {
175 availability.set(false);
176 let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
177 rx,
178 rx2,
179 availability,
180 factories,
181 shutdown_timeout,
182 services: Vec::new(),
183 conns: conns.clone(),
184 state: WorkerState::Unavailable(Vec::new()),
185 });
186
187 let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
188 for (idx, factory) in wrk.factories.iter().enumerate() {
189 fut.push(factory.create().map_ok(move |r| {
190 r.into_iter()
191 .map(|(t, s): (Token, _)| (idx, t, s))
192 .collect::<Vec<_>>()
193 }));
194 }
195
196 spawn(async move {
197 let res = join_all(fut).await;
198 let res: Result<Vec<_>, _> = res.into_iter().collect();
199 match res {
200 Ok(services) => {
201 for item in services {
202 for (factory, token, service) in item {
203 assert_eq!(token.0, wrk.services.len());
204 wrk.services.push(WorkerService {
205 factory,
206 service,
207 status: WorkerServiceStatus::Unavailable,
208 });
209 }
210 }
211 }
212 Err(e) => {
213 error!("Can not start worker: {:?}", e);
214 Arbiter::current().stop();
215 }
216 }
217 wrk.await
218 });
219 }
220 .boxed(),
221 );
222
223 WorkerClient::new(idx, tx1, tx2, avail)
224 }
225
226 fn shutdown(&mut self, force: bool) {
227 if force {
228 self.services.iter_mut().for_each(|srv| {
229 if srv.status == WorkerServiceStatus::Available {
230 srv.status = WorkerServiceStatus::Stopped;
231 requiem_rt::spawn(
232 srv.service
233 .call((None, ServerMessage::ForceShutdown))
234 .map(|_| ()),
235 );
236 }
237 });
238 } else {
239 let timeout = self.shutdown_timeout;
240 self.services.iter_mut().for_each(move |srv| {
241 if srv.status == WorkerServiceStatus::Available {
242 srv.status = WorkerServiceStatus::Stopping;
243 requiem_rt::spawn(
244 srv.service
245 .call((None, ServerMessage::Shutdown(timeout)))
246 .map(|_| ()),
247 );
248 }
249 });
250 }
251 }
252
253 fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
254 let mut ready = self.conns.available(cx);
255 let mut failed = None;
256 for (idx, srv) in &mut self.services.iter_mut().enumerate() {
257 if srv.status == WorkerServiceStatus::Available
258 || srv.status == WorkerServiceStatus::Unavailable
259 {
260 match srv.service.poll_ready(cx) {
261 Poll::Ready(Ok(_)) => {
262 if srv.status == WorkerServiceStatus::Unavailable {
263 trace!(
264 "Service {:?} is available",
265 self.factories[srv.factory].name(Token(idx))
266 );
267 srv.status = WorkerServiceStatus::Available;
268 }
269 }
270 Poll::Pending => {
271 ready = false;
272
273 if srv.status == WorkerServiceStatus::Available {
274 trace!(
275 "Service {:?} is unavailable",
276 self.factories[srv.factory].name(Token(idx))
277 );
278 srv.status = WorkerServiceStatus::Unavailable;
279 }
280 }
281 Poll::Ready(Err(_)) => {
282 error!(
283 "Service {:?} readiness check returned error, restarting",
284 self.factories[srv.factory].name(Token(idx))
285 );
286 failed = Some((Token(idx), srv.factory));
287 srv.status = WorkerServiceStatus::Failed;
288 }
289 }
290 }
291 }
292 if let Some(idx) = failed {
293 Err(idx)
294 } else {
295 Ok(ready)
296 }
297 }
298}
299
300enum WorkerState {
301 Available,
302 Unavailable(Vec<Conn>),
303 Restarting(
304 usize,
305 Token,
306 Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
307 ),
308 Shutdown(
309 Pin<Box<Delay>>,
310 Pin<Box<Delay>>,
311 Option<oneshot::Sender<bool>>,
312 ),
313}
314
315impl Future for Worker {
316 type Output = ();
317
318 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
319 if let Poll::Ready(Some(StopCommand { graceful, result })) =
321 Pin::new(&mut self.rx2).poll_next(cx)
322 {
323 self.availability.set(false);
324 let num = num_connections();
325 if num == 0 {
326 info!("Shutting down worker, 0 connections");
327 let _ = result.send(true);
328 return Poll::Ready(());
329 } else if graceful {
330 self.shutdown(false);
331 let num = num_connections();
332 if num != 0 {
333 info!("Graceful worker shutdown, {} connections", num);
334 self.state = WorkerState::Shutdown(
335 Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))),
336 Box::pin(delay_until(Instant::now() + self.shutdown_timeout)),
337 Some(result),
338 );
339 } else {
340 let _ = result.send(true);
341 return Poll::Ready(());
342 }
343 } else {
344 info!("Force shutdown worker, {} connections", num);
345 self.shutdown(true);
346 let _ = result.send(false);
347 return Poll::Ready(());
348 }
349 }
350
351 match self.state {
352 WorkerState::Unavailable(ref mut conns) => {
353 let conn = conns.pop();
354 match self.check_readiness(cx) {
355 Ok(true) => {
356 if let Some(conn) = conn {
358 let guard = self.conns.get();
359 let _ = self.services[conn.token.0]
360 .service
361 .call((Some(guard), ServerMessage::Connect(conn.io)));
362 } else {
363 self.state = WorkerState::Available;
364 self.availability.set(true);
365 }
366 self.poll(cx)
367 }
368 Ok(false) => {
369 if let Some(conn) = conn {
371 match self.state {
372 WorkerState::Unavailable(ref mut conns) => {
373 conns.push(conn);
374 }
375 _ => (),
376 }
377 }
378 Poll::Pending
379 }
380 Err((token, idx)) => {
381 trace!(
382 "Service {:?} failed, restarting",
383 self.factories[idx].name(token)
384 );
385 self.services[token.0].status = WorkerServiceStatus::Restarting;
386 self.state =
387 WorkerState::Restarting(idx, token, self.factories[idx].create());
388 self.poll(cx)
389 }
390 }
391 }
392 WorkerState::Restarting(idx, token, ref mut fut) => {
393 match Pin::new(fut).poll(cx) {
394 Poll::Ready(Ok(item)) => {
395 for (token, service) in item {
396 trace!(
397 "Service {:?} has been restarted",
398 self.factories[idx].name(token)
399 );
400 self.services[token.0].created(service);
401 self.state = WorkerState::Unavailable(Vec::new());
402 return self.poll(cx);
403 }
404 }
405 Poll::Ready(Err(_)) => {
406 panic!(
407 "Can not restart {:?} service",
408 self.factories[idx].name(token)
409 );
410 }
411 Poll::Pending => {
412 return Poll::Pending;
413 }
414 }
415 self.poll(cx)
416 }
417 WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
418 let num = num_connections();
419 if num == 0 {
420 let _ = tx.take().unwrap().send(true);
421 Arbiter::current().stop();
422 return Poll::Ready(());
423 }
424
425 match t2.as_mut().poll(cx) {
427 Poll::Pending => (),
428 Poll::Ready(_) => {
429 let _ = tx.take().unwrap().send(false);
430 self.shutdown(true);
431 Arbiter::current().stop();
432 return Poll::Ready(());
433 }
434 }
435
436 match t1.as_mut().poll(cx) {
438 Poll::Pending => (),
439 Poll::Ready(_) => {
440 *t1 = Box::pin(delay_until(
441 Instant::now() + time::Duration::from_secs(1),
442 ));
443 let _ = t1.as_mut().poll(cx);
444 }
445 }
446 Poll::Pending
447 }
448 WorkerState::Available => {
449 loop {
450 match Pin::new(&mut self.rx).poll_next(cx) {
451 Poll::Ready(Some(WorkerCommand(msg))) => {
453 match self.check_readiness(cx) {
454 Ok(true) => {
455 let guard = self.conns.get();
456 let _ = self.services[msg.token.0]
457 .service
458 .call((Some(guard), ServerMessage::Connect(msg.io)));
459 continue;
460 }
461 Ok(false) => {
462 trace!("Worker is unavailable");
463 self.availability.set(false);
464 self.state = WorkerState::Unavailable(vec![msg]);
465 }
466 Err((token, idx)) => {
467 trace!(
468 "Service {:?} failed, restarting",
469 self.factories[idx].name(token)
470 );
471 self.availability.set(false);
472 self.services[token.0].status =
473 WorkerServiceStatus::Restarting;
474 self.state = WorkerState::Restarting(
475 idx,
476 token,
477 self.factories[idx].create(),
478 );
479 }
480 }
481 return self.poll(cx);
482 }
483 Poll::Pending => {
484 self.state = WorkerState::Available;
485 return Poll::Pending;
486 }
487 Poll::Ready(None) => return Poll::Ready(()),
488 }
489 }
490 }
491 }
492 }
493}