Skip to main content

ntex_rt/
system.rs

1use std::any::{Any, TypeId};
2use std::collections::VecDeque;
3use std::sync::{Arc, atomic::AtomicBool, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::{Mutex, RwLock};
10
11use crate::arbiter::Arbiter;
12use crate::pool::ThreadPool;
13use crate::{BlockingResult, Builder, Handle, HashMap, HashSet, Runner, SystemRunner};
14
15static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17thread_local!(
18    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19        RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24    all: HashMap<Id, Arbiter>,
25    list: Vec<Arbiter>,
26}
27
28/// System id
29#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32/// System is a runtime manager
33pub struct System(Arc<SystemInner>);
34
35struct SystemInner {
36    id: usize,
37    arbiter: Arbiter,
38    config: SystemConfig,
39    sender: Sender<SystemCommand>,
40    receiver: Receiver<SystemCommand>,
41    storage: RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>,
42    arbiters: Mutex<Arbiters>,
43    signals: AtomicBool,
44    pool: ThreadPool,
45}
46
47#[derive(Clone)]
48pub struct SystemConfig {
49    pub(super) name: String,
50    pub(super) stack_size: usize,
51    pub(super) stop_on_panic: bool,
52    pub(super) ping_interval: usize,
53    pub(super) pool_limit: usize,
54    pub(super) pool_recv_timeout: Duration,
55    pub(super) testing: bool,
56    pub(super) runner: Arc<dyn Runner>,
57}
58
59thread_local!(
60    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
61);
62
63impl Clone for System {
64    fn clone(&self) -> Self {
65        Self(self.0.clone())
66    }
67}
68
69impl System {
70    /// Constructs new system and sets it as current
71    pub(super) fn start(config: SystemConfig) -> (Self, oneshot::Receiver<i32>) {
72        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
73        let (sender, receiver) = unbounded();
74
75        let pool =
76            ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
77        let (arbiter, controller) = Arbiter::new_system(id, config.name.clone());
78
79        let mut arbiters = Arbiters::default();
80        arbiters.all.insert(arbiter.id(), arbiter.clone());
81        arbiters.list.push(arbiter.clone());
82
83        let sys = System(Arc::new(SystemInner {
84            id,
85            config,
86            arbiter,
87            sender,
88            receiver,
89            pool,
90            arbiters: Mutex::new(arbiters),
91            storage: RwLock::new(HashMap::default()),
92            signals: AtomicBool::new(false),
93        }));
94        System::set_current(sys.clone());
95
96        let (stop_tx, stop) = oneshot::channel();
97
98        // system support tasks
99        crate::spawn(SystemSupport::new(&sys, stop_tx).run());
100        crate::spawn(controller.run(sys.clone()));
101
102        (sys, stop)
103    }
104
105    /// Build a new system with a customized runtime
106    ///
107    /// This allows to customize the runtime. See struct level docs on
108    /// `Builder` for more information.
109    pub fn build() -> Builder {
110        Builder::new()
111    }
112
113    #[allow(clippy::new_ret_no_self)]
114    /// Create new system
115    ///
116    /// This method panics if it can not create runtime
117    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
118        Self::build().name(name).build(runner)
119    }
120
121    #[allow(clippy::new_ret_no_self)]
122    /// Create new system
123    ///
124    /// This method panics if it can not create runtime
125    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
126        Self::build().name(name).build_with(config)
127    }
128
129    /// Get current running system
130    ///
131    /// # Panics
132    ///
133    /// Panics if System is not running
134    pub fn current() -> System {
135        CURRENT.with(|cell| match *cell.borrow() {
136            Some(ref sys) => sys.clone(),
137            None => panic!("System is not running"),
138        })
139    }
140
141    /// Runs a function using the system context.
142    pub fn try_current() -> Option<System> {
143        CURRENT.with(|cell| cell.borrow().as_ref().map(Clone::clone))
144    }
145
146    /// Set current running system
147    #[doc(hidden)]
148    pub fn set_current(sys: System) {
149        CURRENT.with(|s| {
150            *s.borrow_mut() = Some(sys);
151        });
152    }
153
154    pub(crate) fn register_arbiter(&self, arb: Arbiter) {
155        CURRENT.with(|s| {
156            *s.borrow_mut() = Some(self.clone());
157        });
158        let mut arbiters = self.0.arbiters.lock();
159        arbiters.all.insert(arb.id(), arb.clone());
160        arbiters.list.push(arb);
161    }
162
163    pub(crate) fn unregister_arbiter(&self, id: Id) {
164        CURRENT.with(|s| {
165            *s.borrow_mut() = None;
166        });
167        let mut arbiters = self.0.arbiters.lock();
168        if let Some(hnd) = arbiters.all.remove(&id) {
169            for (idx, arb) in arbiters.list.iter().enumerate() {
170                if &hnd == arb {
171                    arbiters.list.remove(idx);
172                    break;
173                }
174            }
175        }
176    }
177
178    pub(super) fn remove_current() {
179        CURRENT.with(|cell| {
180            cell.borrow_mut().take();
181        });
182    }
183
184    /// System id
185    pub fn id(&self) -> Id {
186        Id(self.0.id)
187    }
188
189    /// System name
190    pub fn name(&self) -> &str {
191        &self.0.config.name
192    }
193
194    /// Stop the system
195    pub fn stop(&self) {
196        self.stop_with_code(0);
197    }
198
199    /// Stop the system with a particular exit code
200    pub fn stop_with_code(&self, code: i32) {
201        let _ = self.0.sender.try_send(SystemCommand::Exit(code));
202    }
203
204    /// Return status of `stop_on_panic` option
205    ///
206    /// It controls whether the System is stopped when an
207    /// uncaught panic is thrown from a worker thread.
208    pub fn stop_on_panic(&self) -> bool {
209        self.0.config.stop_on_panic
210    }
211
212    /// Return status of `signals` option
213    pub fn signals(&self) -> bool {
214        self.0.signals.load(Ordering::Relaxed)
215    }
216
217    /// Enable `signals` handling
218    pub fn enable_signals(&self) {
219        if !self.signals() {
220            crate::signals::start(self);
221            self.0.signals.store(true, Ordering::Relaxed);
222        }
223    }
224
225    /// Disable `signals` handling
226    pub fn disable_signals(&self) {
227        if self.signals() {
228            crate::signals::stop(self);
229            self.0.signals.store(false, Ordering::Relaxed);
230        }
231    }
232
233    /// System arbiter
234    ///
235    /// # Panics
236    ///
237    /// Panics if system is not started
238    pub fn arbiter(&self) -> Arbiter {
239        self.0.arbiter.clone()
240    }
241
242    /// Retrieves a list of all arbiters in the system
243    ///
244    /// This method should be called from the thread where the system has been initialized,
245    /// typically the "main" thread.
246    pub fn list_arbiters<F, R>(&self, f: F) -> R
247    where
248        F: FnOnce(&[Arbiter]) -> R,
249    {
250        f(&self.0.arbiters.lock().list)
251    }
252
253    /// Retrieves a list of last pings records for specified arbiter
254    ///
255    /// This method should be called from the thread where the system has been initialized,
256    /// typically the "main" thread.
257    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
258    where
259        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
260    {
261        PINGS.with(|pings| {
262            if let Some(recs) = pings.borrow().get(&id) {
263                f(Some(recs))
264            } else {
265                f(None)
266            }
267        })
268    }
269
270    #[cfg(target_os = "linux")]
271    #[doc(hidden)]
272    /// Set arbiter latency callback.
273    ///
274    /// This callback is called when the arbiter response latency exceeds the
275    /// configured threshold. The provided backtrace is not resolved.
276    ///
277    /// Note: This callback is not thread-safe.
278    pub fn set_latency_callback<F: Fn(ntex_error::Backtrace) + 'static>(f: F) {
279        unsafe {
280            ARB_CB = Some(Box::new(f));
281        }
282    }
283
284    /// System config
285    pub fn config(&self) -> SystemConfig {
286        self.0.config.clone()
287    }
288
289    #[inline]
290    /// Runtime handle for main thread
291    pub fn handle(&self) -> Handle {
292        self.arbiter().handle().clone()
293    }
294
295    /// Testing flag
296    pub fn testing(&self) -> bool {
297        self.0.config.testing()
298    }
299
300    /// Spawns a blocking task in a new thread, and wait for it
301    ///
302    /// The task will not be cancelled even if the future is dropped.
303    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
304    where
305        F: FnOnce() -> R + Send + 'static,
306        R: Send + 'static,
307    {
308        self.0.pool.execute(f)
309    }
310
311    /// Returns a previously registered type, or inserts and returns a new one.
312    ///
313    /// This method acquires a lock on the internal data structure.
314    /// To avoid repeated locking, prefer storing a cloned value in the arbiter's storage.
315    pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
316    where
317        T: Clone + Send + Sync + 'static,
318    {
319        if let Some(boxed) = self.0.storage.read().get(&TypeId::of::<T>())
320            && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
321        {
322            val.clone()
323        } else {
324            let val = f();
325            self.0
326                .storage
327                .write()
328                .insert(TypeId::of::<T>(), Box::new(val.clone()));
329            val
330        }
331    }
332}
333
334impl SystemConfig {
335    #[inline]
336    /// Is current system is testing
337    pub fn testing(&self) -> bool {
338        self.testing
339    }
340}
341
342impl fmt::Debug for System {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        f.debug_struct("System")
345            .field("id", &self.0.id)
346            .field("config", &self.0.config)
347            .field("signals", &self.signals())
348            .field("pool", &self.0.pool)
349            .finish()
350    }
351}
352
353impl fmt::Debug for SystemConfig {
354    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355        f.debug_struct("SystemConfig")
356            .field("name", &self.name)
357            .field("testing", &self.testing)
358            .field("stack_size", &self.stack_size)
359            .field("stop_on_panic", &self.stop_on_panic)
360            .finish()
361    }
362}
363
364#[derive(Debug)]
365pub(super) enum SystemCommand {
366    Exit(i32),
367}
368
369#[derive(Debug)]
370struct SystemSupport {
371    sys: System,
372    stop: Option<oneshot::Sender<i32>>,
373    commands: Receiver<SystemCommand>,
374    ping_interval: Duration,
375}
376
377impl SystemSupport {
378    fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
379        Self {
380            sys: sys.clone(),
381            stop: Some(stop),
382            commands: sys.0.receiver.clone(),
383            ping_interval: Duration::from_millis(sys.0.config.ping_interval as u64),
384        }
385    }
386
387    async fn run(mut self) {
388        crate::spawn(ping_arbiters(self.sys.clone(), self.ping_interval));
389
390        loop {
391            match self.commands.recv().await {
392                Ok(SystemCommand::Exit(code)) => {
393                    log::debug!("Stopping system with {code} code");
394
395                    // stop arbiters
396                    let mut arbiters = self.sys.0.arbiters.lock();
397                    for arb in arbiters.list.drain(..) {
398                        arb.stop();
399                    }
400                    arbiters.all.clear();
401
402                    // stop event loop
403                    if let Some(stop) = self.stop.take() {
404                        let _ = stop.send(code);
405                    }
406                }
407                Err(_) => {
408                    log::debug!("System stopped");
409                    return;
410                }
411            }
412        }
413    }
414}
415
416#[derive(Copy, Clone, Debug)]
417pub struct PingRecord {
418    /// Ping start time
419    pub start: Instant,
420    /// Round-trip time, if value is not set then ping is in process
421    pub rtt: Option<Duration>,
422}
423
424async fn ping_arbiters(sys: System, interval: Duration) {
425    let pings = Rc::new(RefCell::new(HashSet::default()));
426
427    loop {
428        // send pings
429        {
430            pings.borrow_mut().clear();
431
432            let start = Instant::now();
433            let arbiters = sys.0.arbiters.lock();
434
435            for arb in &arbiters.list {
436                let id = arb.id();
437                let pings = pings.clone();
438                let fut = arb.handle().spawn(async move {
439                    yield_to().await;
440                });
441
442                // calc ttl
443                PINGS.with(|pings| {
444                    let mut p = pings.borrow_mut();
445                    let recs = p.entry(arb.id()).or_default();
446                    recs.push_front(PingRecord { start, rtt: None });
447                    recs.truncate(10);
448                });
449
450                crate::spawn(async move {
451                    if fut.await.is_ok() {
452                        pings.borrow_mut().insert(id);
453
454                        PINGS.with(|pings| {
455                            pings
456                                .borrow_mut()
457                                .get_mut(&id)
458                                .unwrap()
459                                .front_mut()
460                                .unwrap()
461                                .rtt = Some(start.elapsed());
462                        });
463                    }
464                });
465            }
466        }
467
468        Delay::new(interval).await;
469
470        // check pings
471        #[cfg(target_os = "linux")]
472        {
473            const SPIN: Duration = Duration::from_micros(100);
474
475            let mut no_pongs = Vec::new();
476
477            {
478                for arb in &sys.0.arbiters.lock().list {
479                    let pong = pings.borrow_mut().remove(&arb.id());
480                    if !pong {
481                        no_pongs.push(arb.clone());
482                    }
483                }
484            }
485
486            for arb in no_pongs {
487                // no response from arbiter
488                log::error!("Arbiter {}({:?}) did not return pong", arb.name(), arb.id());
489
490                // send tgkill to thread id to capture backtrace
491                *CAPTURED.lock() = None;
492                EXPECTED_TID.store(arb.tid(), Ordering::Release);
493                unsafe {
494                    libc::syscall(
495                        libc::SYS_tgkill,
496                        libc::getpid(),
497                        arb.tid(),
498                        libc::SIGUSR2,
499                    );
500                }
501
502                // Spin
503                for _ in 0..1000 {
504                    Delay::new(SPIN).await;
505                    if let Some(bt) = CAPTURED.lock().take() {
506                        let bt = ntex_error::Backtrace::from(bt);
507                        #[allow(static_mut_refs)]
508                        if let Some(f) = unsafe { ARB_CB.as_ref() } {
509                            f(bt);
510                        } else {
511                            bt.resolver().resolve();
512                            log::error!(
513                                "Worker does not returned pong within {interval:?} time.\n{bt:?}"
514                            );
515                        }
516                        break;
517                    }
518                }
519            }
520        }
521    }
522}
523
524async fn yield_to() {
525    use std::task::{Context, Poll};
526
527    struct Yield {
528        completed: bool,
529    }
530
531    impl Future for Yield {
532        type Output = ();
533
534        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
535            if self.completed {
536                return Poll::Ready(());
537            }
538            self.completed = true;
539            cx.waker().wake_by_ref();
540            Poll::Pending
541        }
542    }
543
544    Yield { completed: false }.await;
545}
546
547#[cfg(target_os = "linux")]
548static mut ARB_CB: Option<Box<dyn Fn(ntex_error::Backtrace)>> = None;
549
550#[cfg(target_os = "linux")]
551static EXPECTED_TID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
552#[cfg(target_os = "linux")]
553static CAPTURED: Mutex<Option<ntex_error::BacktraceRaw>> = Mutex::new(None);
554
555#[track_caller]
556#[cfg(target_family = "unix")]
557pub(crate) fn sig_usr2() {
558    #[cfg(target_os = "linux")]
559    #[allow(clippy::cast_possible_truncation)]
560    {
561        let tid = unsafe { libc::syscall(libc::SYS_gettid) } as i32;
562        if EXPECTED_TID.load(Ordering::Acquire) == tid {
563            // backtrace::Backtrace::new_unresolved uses libunwind frame walking,
564            // which is signal-safe. Symbol resolution is NOT — do it later.
565            let bt = ntex_error::BacktraceRaw::new(panic::Location::caller());
566            *CAPTURED.lock() = Some(bt);
567        }
568    }
569}