1use std::any::{Any, TypeId};
2use std::collections::VecDeque;
3use std::sync::{Arc, atomic::AtomicBool, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::{Mutex, RwLock};
10
11use crate::arbiter::Arbiter;
12use crate::pool::ThreadPool;
13use crate::{BlockingResult, Builder, Handle, HashMap, HashSet, Runner, SystemRunner};
14
15static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17thread_local!(
18 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19 RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24 all: HashMap<Id, Arbiter>,
25 list: Vec<Arbiter>,
26}
27
28#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32pub struct System(Arc<SystemInner>);
34
35struct SystemInner {
36 id: usize,
37 arbiter: Arbiter,
38 config: SystemConfig,
39 sender: Sender<SystemCommand>,
40 receiver: Receiver<SystemCommand>,
41 storage: RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>,
42 arbiters: Mutex<Arbiters>,
43 signals: AtomicBool,
44 pool: ThreadPool,
45}
46
47#[derive(Clone)]
48pub struct SystemConfig {
49 pub(super) name: String,
50 pub(super) stack_size: usize,
51 pub(super) stop_on_panic: bool,
52 pub(super) ping_interval: usize,
53 pub(super) pool_limit: usize,
54 pub(super) pool_recv_timeout: Duration,
55 pub(super) testing: bool,
56 pub(super) runner: Arc<dyn Runner>,
57}
58
59thread_local!(
60 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
61);
62
63impl Clone for System {
64 fn clone(&self) -> Self {
65 Self(self.0.clone())
66 }
67}
68
69impl System {
70 pub(super) fn start(config: SystemConfig) -> (Self, oneshot::Receiver<i32>) {
72 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
73 let (sender, receiver) = unbounded();
74
75 let pool =
76 ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
77 let (arbiter, controller) = Arbiter::new_system(id, config.name.clone());
78
79 let mut arbiters = Arbiters::default();
80 arbiters.all.insert(arbiter.id(), arbiter.clone());
81 arbiters.list.push(arbiter.clone());
82
83 let sys = System(Arc::new(SystemInner {
84 id,
85 config,
86 arbiter,
87 sender,
88 receiver,
89 pool,
90 arbiters: Mutex::new(arbiters),
91 storage: RwLock::new(HashMap::default()),
92 signals: AtomicBool::new(false),
93 }));
94 System::set_current(sys.clone());
95
96 let (stop_tx, stop) = oneshot::channel();
97
98 crate::spawn(SystemSupport::new(&sys, stop_tx).run());
100 crate::spawn(controller.run(sys.clone()));
101
102 (sys, stop)
103 }
104
105 pub fn build() -> Builder {
110 Builder::new()
111 }
112
113 #[allow(clippy::new_ret_no_self)]
114 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
118 Self::build().name(name).build(runner)
119 }
120
121 #[allow(clippy::new_ret_no_self)]
122 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
126 Self::build().name(name).build_with(config)
127 }
128
129 pub fn current() -> System {
135 CURRENT.with(|cell| match *cell.borrow() {
136 Some(ref sys) => sys.clone(),
137 None => panic!("System is not running"),
138 })
139 }
140
141 pub fn try_current() -> Option<System> {
143 CURRENT.with(|cell| cell.borrow().as_ref().map(Clone::clone))
144 }
145
146 #[doc(hidden)]
148 pub fn set_current(sys: System) {
149 CURRENT.with(|s| {
150 *s.borrow_mut() = Some(sys);
151 });
152 }
153
154 pub(crate) fn register_arbiter(&self, arb: Arbiter) {
155 CURRENT.with(|s| {
156 *s.borrow_mut() = Some(self.clone());
157 });
158 let mut arbiters = self.0.arbiters.lock();
159 arbiters.all.insert(arb.id(), arb.clone());
160 arbiters.list.push(arb);
161 }
162
163 pub(crate) fn unregister_arbiter(&self, id: Id) {
164 CURRENT.with(|s| {
165 *s.borrow_mut() = None;
166 });
167 let mut arbiters = self.0.arbiters.lock();
168 if let Some(hnd) = arbiters.all.remove(&id) {
169 for (idx, arb) in arbiters.list.iter().enumerate() {
170 if &hnd == arb {
171 arbiters.list.remove(idx);
172 break;
173 }
174 }
175 }
176 }
177
178 pub(super) fn remove_current() {
179 CURRENT.with(|cell| {
180 cell.borrow_mut().take();
181 });
182 }
183
184 pub fn id(&self) -> Id {
186 Id(self.0.id)
187 }
188
189 pub fn name(&self) -> &str {
191 &self.0.config.name
192 }
193
194 pub fn stop(&self) {
196 self.stop_with_code(0);
197 }
198
199 pub fn stop_with_code(&self, code: i32) {
201 let _ = self.0.sender.try_send(SystemCommand::Exit(code));
202 }
203
204 pub fn stop_on_panic(&self) -> bool {
209 self.0.config.stop_on_panic
210 }
211
212 pub fn signals(&self) -> bool {
214 self.0.signals.load(Ordering::Relaxed)
215 }
216
217 pub fn enable_signals(&self) {
219 if !self.signals() {
220 crate::signals::start(self);
221 self.0.signals.store(true, Ordering::Relaxed);
222 }
223 }
224
225 pub fn disable_signals(&self) {
227 if self.signals() {
228 crate::signals::stop(self);
229 self.0.signals.store(false, Ordering::Relaxed);
230 }
231 }
232
233 pub fn arbiter(&self) -> Arbiter {
239 self.0.arbiter.clone()
240 }
241
242 pub fn list_arbiters<F, R>(&self, f: F) -> R
247 where
248 F: FnOnce(&[Arbiter]) -> R,
249 {
250 f(&self.0.arbiters.lock().list)
251 }
252
253 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
258 where
259 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
260 {
261 PINGS.with(|pings| {
262 if let Some(recs) = pings.borrow().get(&id) {
263 f(Some(recs))
264 } else {
265 f(None)
266 }
267 })
268 }
269
270 #[cfg(target_os = "linux")]
271 #[doc(hidden)]
272 pub fn set_latency_callback<F: Fn(ntex_error::Backtrace) + 'static>(f: F) {
279 unsafe {
280 ARB_CB = Some(Box::new(f));
281 }
282 }
283
284 pub fn config(&self) -> SystemConfig {
286 self.0.config.clone()
287 }
288
289 #[inline]
290 pub fn handle(&self) -> Handle {
292 self.arbiter().handle().clone()
293 }
294
295 pub fn testing(&self) -> bool {
297 self.0.config.testing()
298 }
299
300 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
304 where
305 F: FnOnce() -> R + Send + 'static,
306 R: Send + 'static,
307 {
308 self.0.pool.execute(f)
309 }
310
311 pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
316 where
317 T: Clone + Send + Sync + 'static,
318 {
319 if let Some(boxed) = self.0.storage.read().get(&TypeId::of::<T>())
320 && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
321 {
322 val.clone()
323 } else {
324 let val = f();
325 self.0
326 .storage
327 .write()
328 .insert(TypeId::of::<T>(), Box::new(val.clone()));
329 val
330 }
331 }
332}
333
334impl SystemConfig {
335 #[inline]
336 pub fn testing(&self) -> bool {
338 self.testing
339 }
340}
341
342impl fmt::Debug for System {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 f.debug_struct("System")
345 .field("id", &self.0.id)
346 .field("config", &self.0.config)
347 .field("signals", &self.signals())
348 .field("pool", &self.0.pool)
349 .finish()
350 }
351}
352
353impl fmt::Debug for SystemConfig {
354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355 f.debug_struct("SystemConfig")
356 .field("name", &self.name)
357 .field("testing", &self.testing)
358 .field("stack_size", &self.stack_size)
359 .field("stop_on_panic", &self.stop_on_panic)
360 .finish()
361 }
362}
363
364#[derive(Debug)]
365pub(super) enum SystemCommand {
366 Exit(i32),
367}
368
369#[derive(Debug)]
370struct SystemSupport {
371 sys: System,
372 stop: Option<oneshot::Sender<i32>>,
373 commands: Receiver<SystemCommand>,
374 ping_interval: Duration,
375}
376
377impl SystemSupport {
378 fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
379 Self {
380 sys: sys.clone(),
381 stop: Some(stop),
382 commands: sys.0.receiver.clone(),
383 ping_interval: Duration::from_millis(sys.0.config.ping_interval as u64),
384 }
385 }
386
387 async fn run(mut self) {
388 crate::spawn(ping_arbiters(self.sys.clone(), self.ping_interval));
389
390 loop {
391 match self.commands.recv().await {
392 Ok(SystemCommand::Exit(code)) => {
393 log::debug!("Stopping system with {code} code");
394
395 let mut arbiters = self.sys.0.arbiters.lock();
397 for arb in arbiters.list.drain(..) {
398 arb.stop();
399 }
400 arbiters.all.clear();
401
402 if let Some(stop) = self.stop.take() {
404 let _ = stop.send(code);
405 }
406 }
407 Err(_) => {
408 log::debug!("System stopped");
409 return;
410 }
411 }
412 }
413 }
414}
415
416#[derive(Copy, Clone, Debug)]
417pub struct PingRecord {
418 pub start: Instant,
420 pub rtt: Option<Duration>,
422}
423
424async fn ping_arbiters(sys: System, interval: Duration) {
425 let pings = Rc::new(RefCell::new(HashSet::default()));
426
427 loop {
428 {
430 pings.borrow_mut().clear();
431
432 let start = Instant::now();
433 let arbiters = sys.0.arbiters.lock();
434
435 for arb in &arbiters.list {
436 let id = arb.id();
437 let pings = pings.clone();
438 let fut = arb.handle().spawn(async move {
439 yield_to().await;
440 });
441
442 PINGS.with(|pings| {
444 let mut p = pings.borrow_mut();
445 let recs = p.entry(arb.id()).or_default();
446 recs.push_front(PingRecord { start, rtt: None });
447 recs.truncate(10);
448 });
449
450 crate::spawn(async move {
451 if fut.await.is_ok() {
452 pings.borrow_mut().insert(id);
453
454 PINGS.with(|pings| {
455 pings
456 .borrow_mut()
457 .get_mut(&id)
458 .unwrap()
459 .front_mut()
460 .unwrap()
461 .rtt = Some(start.elapsed());
462 });
463 }
464 });
465 }
466 }
467
468 Delay::new(interval).await;
469
470 #[cfg(target_os = "linux")]
472 {
473 const SPIN: Duration = Duration::from_micros(100);
474
475 let mut no_pongs = Vec::new();
476
477 {
478 for arb in &sys.0.arbiters.lock().list {
479 let pong = pings.borrow_mut().remove(&arb.id());
480 if !pong {
481 no_pongs.push(arb.clone());
482 }
483 }
484 }
485
486 for arb in no_pongs {
487 log::error!("Arbiter {}({:?}) did not return pong", arb.name(), arb.id());
489
490 *CAPTURED.lock() = None;
492 EXPECTED_TID.store(arb.tid(), Ordering::Release);
493 unsafe {
494 libc::syscall(
495 libc::SYS_tgkill,
496 libc::getpid(),
497 arb.tid(),
498 libc::SIGUSR2,
499 );
500 }
501
502 for _ in 0..1000 {
504 Delay::new(SPIN).await;
505 if let Some(bt) = CAPTURED.lock().take() {
506 let bt = ntex_error::Backtrace::from(bt);
507 #[allow(static_mut_refs)]
508 if let Some(f) = unsafe { ARB_CB.as_ref() } {
509 f(bt);
510 } else {
511 bt.resolver().resolve();
512 log::error!(
513 "Worker does not returned pong within {interval:?} time.\n{bt:?}"
514 );
515 }
516 break;
517 }
518 }
519 }
520 }
521 }
522}
523
524async fn yield_to() {
525 use std::task::{Context, Poll};
526
527 struct Yield {
528 completed: bool,
529 }
530
531 impl Future for Yield {
532 type Output = ();
533
534 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
535 if self.completed {
536 return Poll::Ready(());
537 }
538 self.completed = true;
539 cx.waker().wake_by_ref();
540 Poll::Pending
541 }
542 }
543
544 Yield { completed: false }.await;
545}
546
547#[cfg(target_os = "linux")]
548static mut ARB_CB: Option<Box<dyn Fn(ntex_error::Backtrace)>> = None;
549
550#[cfg(target_os = "linux")]
551static EXPECTED_TID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
552#[cfg(target_os = "linux")]
553static CAPTURED: Mutex<Option<ntex_error::BacktraceRaw>> = Mutex::new(None);
554
555#[track_caller]
556#[cfg(target_family = "unix")]
557pub(crate) fn sig_usr2() {
558 #[cfg(target_os = "linux")]
559 #[allow(clippy::cast_possible_truncation)]
560 {
561 let tid = unsafe { libc::syscall(libc::SYS_gettid) } as i32;
562 if EXPECTED_TID.load(Ordering::Acquire) == tid {
563 let bt = ntex_error::BacktraceRaw::new(panic::Location::caller());
566 *CAPTURED.lock() = Some(bt);
567 }
568 }
569}