1use std::any::{Any, TypeId};
2use std::collections::VecDeque;
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::{Mutex, RwLock};
10
11use crate::arbiter::Arbiter;
12use crate::pool::ThreadPool;
13use crate::{BlockingResult, Builder, Handle, HashMap, HashSet, Runner, SystemRunner};
14
15static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17thread_local!(
18 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19 RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24 all: HashMap<Id, Arbiter>,
25 list: Vec<Arbiter>,
26}
27
28#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32pub struct System(Arc<SystemInner>);
34
35struct SystemInner {
36 id: usize,
37 arbiter: Arbiter,
38 config: SystemConfig,
39 sender: Sender<SystemCommand>,
40 receiver: Receiver<SystemCommand>,
41 storage: RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>,
42 arbiters: Mutex<Arbiters>,
43 pool: ThreadPool,
44}
45
46#[derive(Clone)]
47pub struct SystemConfig {
48 pub(super) name: String,
49 pub(super) stack_size: usize,
50 pub(super) stop_on_panic: bool,
51 pub(super) ping_interval: usize,
52 pub(super) pool_limit: usize,
53 pub(super) pool_recv_timeout: Duration,
54 pub(super) disable_signals: bool,
55 pub(super) testing: bool,
56 pub(super) runner: Arc<dyn Runner>,
57}
58
59thread_local!(
60 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
61);
62
63impl Clone for System {
64 fn clone(&self) -> Self {
65 Self(self.0.clone())
66 }
67}
68
69impl System {
70 pub(super) fn start(config: SystemConfig) -> (Self, oneshot::Receiver<i32>) {
72 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
73 let (sender, receiver) = unbounded();
74
75 let pool =
76 ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
77 let (arbiter, controller) = Arbiter::new_system(id, config.name.clone());
78
79 let mut arbiters = Arbiters::default();
80 arbiters.all.insert(arbiter.id(), arbiter.clone());
81 arbiters.list.push(arbiter.clone());
82
83 let sys = System(Arc::new(SystemInner {
84 id,
85 config,
86 arbiter,
87 sender,
88 receiver,
89 pool,
90 arbiters: Mutex::new(arbiters),
91 storage: RwLock::new(HashMap::default()),
92 }));
93 System::set_current(sys.clone());
94
95 let (stop_tx, stop) = oneshot::channel();
96
97 crate::spawn(SystemSupport::new(&sys, stop_tx).run());
99 crate::spawn(controller.run(sys.clone()));
100
101 (sys, stop)
102 }
103
104 pub fn build() -> Builder {
109 Builder::new()
110 }
111
112 #[allow(clippy::new_ret_no_self)]
113 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
117 Self::build().name(name).build(runner)
118 }
119
120 #[allow(clippy::new_ret_no_self)]
121 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
125 Self::build().name(name).build_with(config)
126 }
127
128 pub fn current() -> System {
134 CURRENT.with(|cell| match *cell.borrow() {
135 Some(ref sys) => sys.clone(),
136 None => panic!("System is not running"),
137 })
138 }
139
140 pub fn try_current() -> Option<System> {
142 CURRENT.with(|cell| cell.borrow().as_ref().map(Clone::clone))
143 }
144
145 #[doc(hidden)]
147 pub fn set_current(sys: System) {
148 CURRENT.with(|s| {
149 *s.borrow_mut() = Some(sys);
150 });
151 }
152
153 pub(crate) fn register_arbiter(&self, arb: Arbiter) {
154 CURRENT.with(|s| {
155 *s.borrow_mut() = Some(self.clone());
156 });
157 let mut arbiters = self.0.arbiters.lock();
158 arbiters.all.insert(arb.id(), arb.clone());
159 arbiters.list.push(arb);
160 }
161
162 pub(crate) fn unregister_arbiter(&self, id: Id) {
163 CURRENT.with(|s| {
164 *s.borrow_mut() = None;
165 });
166 let mut arbiters = self.0.arbiters.lock();
167 if let Some(hnd) = arbiters.all.remove(&id) {
168 for (idx, arb) in arbiters.list.iter().enumerate() {
169 if &hnd == arb {
170 arbiters.list.remove(idx);
171 break;
172 }
173 }
174 }
175 }
176
177 pub(super) fn remove_current() {
178 CURRENT.with(|cell| {
179 cell.borrow_mut().take();
180 });
181 }
182
183 pub fn id(&self) -> Id {
185 Id(self.0.id)
186 }
187
188 pub fn name(&self) -> &str {
190 &self.0.config.name
191 }
192
193 pub fn stop(&self) {
195 self.stop_with_code(0);
196 }
197
198 pub fn stop_with_code(&self, code: i32) {
200 let _ = self.0.sender.try_send(SystemCommand::Exit(code));
201 }
202
203 pub fn stop_on_panic(&self) -> bool {
208 self.0.config.stop_on_panic
209 }
210
211 pub fn signals_disabled(&self) -> bool {
213 self.0.config.disable_signals
214 }
215
216 pub fn arbiter(&self) -> Arbiter {
222 self.0.arbiter.clone()
223 }
224
225 pub fn list_arbiters<F, R>(&self, f: F) -> R
230 where
231 F: FnOnce(&[Arbiter]) -> R,
232 {
233 f(&self.0.arbiters.lock().list)
234 }
235
236 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
241 where
242 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
243 {
244 PINGS.with(|pings| {
245 if let Some(recs) = pings.borrow().get(&id) {
246 f(Some(recs))
247 } else {
248 f(None)
249 }
250 })
251 }
252
253 pub fn config(&self) -> SystemConfig {
255 self.0.config.clone()
256 }
257
258 #[inline]
259 pub fn handle(&self) -> Handle {
261 self.arbiter().handle().clone()
262 }
263
264 pub fn testing(&self) -> bool {
266 self.0.config.testing()
267 }
268
269 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
273 where
274 F: FnOnce() -> R + Send + 'static,
275 R: Send + 'static,
276 {
277 self.0.pool.execute(f)
278 }
279
280 pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
285 where
286 T: Clone + Send + Sync + 'static,
287 {
288 if let Some(boxed) = self.0.storage.read().get(&TypeId::of::<T>())
289 && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
290 {
291 val.clone()
292 } else {
293 let val = f();
294 self.0
295 .storage
296 .write()
297 .insert(TypeId::of::<T>(), Box::new(val.clone()));
298 val
299 }
300 }
301}
302
303impl SystemConfig {
304 #[inline]
305 pub fn testing(&self) -> bool {
307 self.testing
308 }
309}
310
311impl fmt::Debug for System {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 f.debug_struct("System")
314 .field("id", &self.0.id)
315 .field("config", &self.0.config)
316 .field("pool", &self.0.pool)
317 .finish()
318 }
319}
320
321impl fmt::Debug for SystemConfig {
322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323 f.debug_struct("SystemConfig")
324 .field("name", &self.name)
325 .field("testing", &self.testing)
326 .field("stack_size", &self.stack_size)
327 .field("stop_on_panic", &self.stop_on_panic)
328 .field("signals_disabled", &self.disable_signals)
329 .finish()
330 }
331}
332
333#[derive(Debug)]
334pub(super) enum SystemCommand {
335 Exit(i32),
336}
337
338#[derive(Debug)]
339struct SystemSupport {
340 sys: System,
341 stop: Option<oneshot::Sender<i32>>,
342 commands: Receiver<SystemCommand>,
343 ping_interval: Duration,
344}
345
346impl SystemSupport {
347 fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
348 Self {
349 sys: sys.clone(),
350 stop: Some(stop),
351 commands: sys.0.receiver.clone(),
352 ping_interval: Duration::from_millis(sys.0.config.ping_interval as u64),
353 }
354 }
355
356 async fn run(mut self) {
357 crate::spawn(ping_arbiters(self.sys.clone(), self.ping_interval));
358
359 loop {
360 match self.commands.recv().await {
361 Ok(SystemCommand::Exit(code)) => {
362 log::debug!("Stopping system with {code} code");
363
364 let mut arbiters = self.sys.0.arbiters.lock();
366 for arb in arbiters.list.drain(..) {
367 arb.stop();
368 }
369 arbiters.all.clear();
370
371 if let Some(stop) = self.stop.take() {
373 let _ = stop.send(code);
374 }
375 }
376 Err(_) => {
377 log::debug!("System stopped");
378 return;
379 }
380 }
381 }
382 }
383}
384
385#[derive(Copy, Clone, Debug)]
386pub struct PingRecord {
387 pub start: Instant,
389 pub rtt: Option<Duration>,
391}
392
393async fn ping_arbiters(sys: System, interval: Duration) {
394 let pings = Rc::new(RefCell::new(HashSet::default()));
395
396 loop {
397 {
399 pings.borrow_mut().clear();
400
401 let start = Instant::now();
402 let arbiters = sys.0.arbiters.lock();
403
404 for arb in &arbiters.list {
405 let id = arb.id();
406 let pings = pings.clone();
407 let fut = arb.handle().spawn(async move {
408 yield_to().await;
409 });
410
411 PINGS.with(|pings| {
413 let mut p = pings.borrow_mut();
414 let recs = p.entry(arb.id()).or_default();
415 recs.push_front(PingRecord { start, rtt: None });
416 recs.truncate(10);
417 });
418
419 crate::spawn(async move {
420 if fut.await.is_ok() {
421 pings.borrow_mut().insert(id);
422
423 PINGS.with(|pings| {
424 pings
425 .borrow_mut()
426 .get_mut(&id)
427 .unwrap()
428 .front_mut()
429 .unwrap()
430 .rtt = Some(start.elapsed());
431 });
432 }
433 });
434 }
435 }
436
437 Delay::new(interval).await;
438
439 #[cfg(target_os = "linux")]
441 {
442 const SPIN: Duration = Duration::from_micros(100);
443
444 let mut no_pongs = Vec::new();
445
446 {
447 for arb in &sys.0.arbiters.lock().list {
448 let pong = pings.borrow_mut().remove(&arb.id());
449 if !pong {
450 no_pongs.push(arb.clone());
451 }
452 }
453 }
454
455 for arb in no_pongs {
456 log::error!("Arbiter {}({:?}) did not return pong", arb.name(), arb.id());
458
459 *CAPTURED.lock() = None;
461 EXPECTED_TID.store(arb.tid(), Ordering::Release);
462 unsafe {
463 libc::syscall(
464 libc::SYS_tgkill,
465 libc::getpid(),
466 arb.tid(),
467 libc::SIGUSR2,
468 );
469 }
470
471 for _ in 0..1000 {
473 Delay::new(SPIN).await;
474 if let Some(bt) = CAPTURED.lock().take() {
475 let bt = ntex_error::Backtrace::from(bt);
476 bt.resolver().resolve();
477 log::error!(
478 "Worker does not returned pong within {interval:?} time.\n{bt:?}"
479 );
480 break;
481 }
482 }
483 }
484 }
485 }
486}
487
488async fn yield_to() {
489 use std::task::{Context, Poll};
490
491 struct Yield {
492 completed: bool,
493 }
494
495 impl Future for Yield {
496 type Output = ();
497
498 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
499 if self.completed {
500 return Poll::Ready(());
501 }
502 self.completed = true;
503 cx.waker().wake_by_ref();
504 Poll::Pending
505 }
506 }
507
508 Yield { completed: false }.await;
509}
510
511#[cfg(target_os = "linux")]
512static EXPECTED_TID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
513#[cfg(target_os = "linux")]
514static CAPTURED: Mutex<Option<ntex_error::BacktraceRaw>> = Mutex::new(None);
515
516#[track_caller]
517#[cfg(target_family = "unix")]
518pub(crate) fn sig_usr2() {
519 #[cfg(target_os = "linux")]
520 #[allow(clippy::cast_possible_truncation)]
521 {
522 let tid = unsafe { libc::syscall(libc::SYS_gettid) } as i32;
523 if EXPECTED_TID.load(Ordering::Acquire) == tid {
524 let bt = ntex_error::BacktraceRaw::new(panic::Location::caller());
527 *CAPTURED.lock() = Some(bt);
528 }
529 }
530}