1use std::any::{Any, TypeId};
2use std::collections::VecDeque;
3use std::sync::{Arc, atomic::AtomicBool, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::{Mutex, RwLock};
10
11use crate::arbiter::Arbiter;
12use crate::pool::ThreadPool;
13use crate::{BlockingResult, Builder, Handle, HashMap, HashSet, Runner, SystemRunner};
14
15static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17thread_local!(
18 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19 RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24 all: HashMap<Id, Arbiter>,
25 list: Vec<Arbiter>,
26}
27
28#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32pub struct System(Arc<SystemInner>);
34
35struct SystemInner {
36 id: usize,
37 arbiter: Arbiter,
38 config: SystemConfig,
39 sender: Sender<SystemCommand>,
40 receiver: Receiver<SystemCommand>,
41 storage: RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>,
42 arbiters: Mutex<Arbiters>,
43 signals: AtomicBool,
44 pool: ThreadPool,
45}
46
47#[derive(Clone)]
48pub struct SystemConfig {
49 pub(super) name: String,
50 pub(super) stack_size: usize,
51 pub(super) stop_on_panic: bool,
52 pub(super) ping_interval: usize,
53 pub(super) pool_limit: usize,
54 pub(super) pool_recv_timeout: Duration,
55 pub(super) testing: bool,
56 pub(super) runner: Arc<dyn Runner>,
57}
58
59thread_local!(
60 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
61);
62
63impl Clone for System {
64 fn clone(&self) -> Self {
65 Self(self.0.clone())
66 }
67}
68
69impl System {
70 pub(super) fn start(config: SystemConfig) -> (Self, oneshot::Receiver<i32>) {
72 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
73 let (sender, receiver) = unbounded();
74
75 let pool =
76 ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
77 let (arbiter, controller) = Arbiter::new_system(id, config.name.clone());
78
79 let mut arbiters = Arbiters::default();
80 arbiters.all.insert(arbiter.id(), arbiter.clone());
81 arbiters.list.push(arbiter.clone());
82
83 let sys = System(Arc::new(SystemInner {
84 id,
85 config,
86 arbiter,
87 sender,
88 receiver,
89 pool,
90 arbiters: Mutex::new(arbiters),
91 storage: RwLock::new(HashMap::default()),
92 signals: AtomicBool::new(false),
93 }));
94 System::set_current(sys.clone());
95
96 let (stop_tx, stop) = oneshot::channel();
97
98 crate::spawn(SystemSupport::new(&sys, stop_tx).run());
100 crate::spawn(controller.run(sys.clone()));
101
102 (sys, stop)
103 }
104
105 pub fn build() -> Builder {
110 Builder::new()
111 }
112
113 #[allow(clippy::new_ret_no_self)]
114 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
118 Self::build().name(name).build(runner)
119 }
120
121 #[allow(clippy::new_ret_no_self)]
122 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
126 Self::build().name(name).build_with(config)
127 }
128
129 pub fn current() -> System {
135 CURRENT.with(|cell| match *cell.borrow() {
136 Some(ref sys) => sys.clone(),
137 None => panic!("System is not running"),
138 })
139 }
140
141 pub fn try_current() -> Option<System> {
143 CURRENT.with(|cell| cell.borrow().as_ref().map(Clone::clone))
144 }
145
146 #[doc(hidden)]
148 pub fn set_current(sys: System) {
149 CURRENT.with(|s| {
150 *s.borrow_mut() = Some(sys);
151 });
152 }
153
154 pub(crate) fn register_arbiter(&self, arb: Arbiter) {
155 CURRENT.with(|s| {
156 *s.borrow_mut() = Some(self.clone());
157 });
158 let mut arbiters = self.0.arbiters.lock();
159 arbiters.all.insert(arb.id(), arb.clone());
160 arbiters.list.push(arb);
161 }
162
163 pub(crate) fn unregister_arbiter(&self, id: Id) {
164 CURRENT.with(|s| {
165 *s.borrow_mut() = None;
166 });
167 let mut arbiters = self.0.arbiters.lock();
168 if let Some(hnd) = arbiters.all.remove(&id) {
169 for (idx, arb) in arbiters.list.iter().enumerate() {
170 if &hnd == arb {
171 arbiters.list.remove(idx);
172 break;
173 }
174 }
175 }
176 }
177
178 pub(super) fn remove_current() {
179 CURRENT.with(|cell| {
180 cell.borrow_mut().take();
181 });
182 }
183
184 pub fn id(&self) -> Id {
186 Id(self.0.id)
187 }
188
189 pub fn name(&self) -> &str {
191 &self.0.config.name
192 }
193
194 pub fn stop(&self) {
196 self.stop_with_code(0);
197 }
198
199 pub fn stop_with_code(&self, code: i32) {
201 let _ = self.0.sender.try_send(SystemCommand::Exit(code));
202 }
203
204 pub fn stop_on_panic(&self) -> bool {
209 self.0.config.stop_on_panic
210 }
211
212 pub fn signals(&self) -> bool {
214 self.0.signals.load(Ordering::Relaxed)
215 }
216
217 pub fn enable_signals(&self) {
219 if !self.signals() {
220 crate::signals::start(self);
221 self.0.signals.store(true, Ordering::Relaxed);
222 }
223 }
224
225 pub fn disable_signals(&self) {
227 if self.signals() {
228 crate::signals::stop(self);
229 self.0.signals.store(false, Ordering::Relaxed);
230 }
231 }
232
233 pub fn arbiter(&self) -> Arbiter {
239 self.0.arbiter.clone()
240 }
241
242 pub fn list_arbiters<F, R>(&self, f: F) -> R
247 where
248 F: FnOnce(&[Arbiter]) -> R,
249 {
250 f(&self.0.arbiters.lock().list)
251 }
252
253 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
258 where
259 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
260 {
261 PINGS.with(|pings| {
262 if let Some(recs) = pings.borrow().get(&id) {
263 f(Some(recs))
264 } else {
265 f(None)
266 }
267 })
268 }
269
270 pub fn config(&self) -> SystemConfig {
272 self.0.config.clone()
273 }
274
275 #[inline]
276 pub fn handle(&self) -> Handle {
278 self.arbiter().handle().clone()
279 }
280
281 pub fn testing(&self) -> bool {
283 self.0.config.testing()
284 }
285
286 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
290 where
291 F: FnOnce() -> R + Send + 'static,
292 R: Send + 'static,
293 {
294 self.0.pool.execute(f)
295 }
296
297 pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
302 where
303 T: Clone + Send + Sync + 'static,
304 {
305 if let Some(boxed) = self.0.storage.read().get(&TypeId::of::<T>())
306 && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
307 {
308 val.clone()
309 } else {
310 let val = f();
311 self.0
312 .storage
313 .write()
314 .insert(TypeId::of::<T>(), Box::new(val.clone()));
315 val
316 }
317 }
318}
319
320impl SystemConfig {
321 #[inline]
322 pub fn testing(&self) -> bool {
324 self.testing
325 }
326}
327
328impl fmt::Debug for System {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 f.debug_struct("System")
331 .field("id", &self.0.id)
332 .field("config", &self.0.config)
333 .field("signals", &self.signals())
334 .field("pool", &self.0.pool)
335 .finish()
336 }
337}
338
339impl fmt::Debug for SystemConfig {
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 f.debug_struct("SystemConfig")
342 .field("name", &self.name)
343 .field("testing", &self.testing)
344 .field("stack_size", &self.stack_size)
345 .field("stop_on_panic", &self.stop_on_panic)
346 .finish()
347 }
348}
349
350#[derive(Debug)]
351pub(super) enum SystemCommand {
352 Exit(i32),
353}
354
355#[derive(Debug)]
356struct SystemSupport {
357 sys: System,
358 stop: Option<oneshot::Sender<i32>>,
359 commands: Receiver<SystemCommand>,
360 ping_interval: Duration,
361}
362
363impl SystemSupport {
364 fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
365 Self {
366 sys: sys.clone(),
367 stop: Some(stop),
368 commands: sys.0.receiver.clone(),
369 ping_interval: Duration::from_millis(sys.0.config.ping_interval as u64),
370 }
371 }
372
373 async fn run(mut self) {
374 crate::spawn(ping_arbiters(self.sys.clone(), self.ping_interval));
375
376 loop {
377 match self.commands.recv().await {
378 Ok(SystemCommand::Exit(code)) => {
379 log::debug!("Stopping system with {code} code");
380
381 let mut arbiters = self.sys.0.arbiters.lock();
383 for arb in arbiters.list.drain(..) {
384 arb.stop();
385 }
386 arbiters.all.clear();
387
388 if let Some(stop) = self.stop.take() {
390 let _ = stop.send(code);
391 }
392 }
393 Err(_) => {
394 log::debug!("System stopped");
395 return;
396 }
397 }
398 }
399 }
400}
401
402#[derive(Copy, Clone, Debug)]
403pub struct PingRecord {
404 pub start: Instant,
406 pub rtt: Option<Duration>,
408}
409
410async fn ping_arbiters(sys: System, interval: Duration) {
411 let pings = Rc::new(RefCell::new(HashSet::default()));
412
413 loop {
414 {
416 pings.borrow_mut().clear();
417
418 let start = Instant::now();
419 let arbiters = sys.0.arbiters.lock();
420
421 for arb in &arbiters.list {
422 let id = arb.id();
423 let pings = pings.clone();
424 let fut = arb.handle().spawn(async move {
425 yield_to().await;
426 });
427
428 PINGS.with(|pings| {
430 let mut p = pings.borrow_mut();
431 let recs = p.entry(arb.id()).or_default();
432 recs.push_front(PingRecord { start, rtt: None });
433 recs.truncate(10);
434 });
435
436 crate::spawn(async move {
437 if fut.await.is_ok() {
438 pings.borrow_mut().insert(id);
439
440 PINGS.with(|pings| {
441 pings
442 .borrow_mut()
443 .get_mut(&id)
444 .unwrap()
445 .front_mut()
446 .unwrap()
447 .rtt = Some(start.elapsed());
448 });
449 }
450 });
451 }
452 }
453
454 Delay::new(interval).await;
455
456 #[cfg(target_os = "linux")]
458 {
459 const SPIN: Duration = Duration::from_micros(100);
460
461 let mut no_pongs = Vec::new();
462
463 {
464 for arb in &sys.0.arbiters.lock().list {
465 let pong = pings.borrow_mut().remove(&arb.id());
466 if !pong {
467 no_pongs.push(arb.clone());
468 }
469 }
470 }
471
472 for arb in no_pongs {
473 log::error!("Arbiter {}({:?}) did not return pong", arb.name(), arb.id());
475
476 *CAPTURED.lock() = None;
478 EXPECTED_TID.store(arb.tid(), Ordering::Release);
479 unsafe {
480 libc::syscall(
481 libc::SYS_tgkill,
482 libc::getpid(),
483 arb.tid(),
484 libc::SIGUSR2,
485 );
486 }
487
488 for _ in 0..1000 {
490 Delay::new(SPIN).await;
491 if let Some(bt) = CAPTURED.lock().take() {
492 let bt = ntex_error::Backtrace::from(bt);
493 bt.resolver().resolve();
494 log::error!(
495 "Worker does not returned pong within {interval:?} time.\n{bt:?}"
496 );
497 break;
498 }
499 }
500 }
501 }
502 }
503}
504
505async fn yield_to() {
506 use std::task::{Context, Poll};
507
508 struct Yield {
509 completed: bool,
510 }
511
512 impl Future for Yield {
513 type Output = ();
514
515 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
516 if self.completed {
517 return Poll::Ready(());
518 }
519 self.completed = true;
520 cx.waker().wake_by_ref();
521 Poll::Pending
522 }
523 }
524
525 Yield { completed: false }.await;
526}
527
528#[cfg(target_os = "linux")]
529static EXPECTED_TID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
530#[cfg(target_os = "linux")]
531static CAPTURED: Mutex<Option<ntex_error::BacktraceRaw>> = Mutex::new(None);
532
533#[track_caller]
534#[cfg(target_family = "unix")]
535pub(crate) fn sig_usr2() {
536 #[cfg(target_os = "linux")]
537 #[allow(clippy::cast_possible_truncation)]
538 {
539 let tid = unsafe { libc::syscall(libc::SYS_gettid) } as i32;
540 if EXPECTED_TID.load(Ordering::Acquire) == tid {
541 let bt = ntex_error::BacktraceRaw::new(panic::Location::caller());
544 *CAPTURED.lock() = Some(bt);
545 }
546 }
547}