Skip to main content

ntex_rt/
system.rs

1use std::any::{Any, TypeId};
2use std::collections::VecDeque;
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::{Mutex, RwLock};
10
11use crate::arbiter::Arbiter;
12use crate::pool::ThreadPool;
13use crate::{BlockingResult, Builder, Handle, HashMap, HashSet, Runner, SystemRunner};
14
15static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17thread_local!(
18    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19        RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24    all: HashMap<Id, Arbiter>,
25    list: Vec<Arbiter>,
26}
27
28/// System id
29#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32/// System is a runtime manager
33pub struct System(Arc<SystemInner>);
34
35struct SystemInner {
36    id: usize,
37    arbiter: Arbiter,
38    config: SystemConfig,
39    sender: Sender<SystemCommand>,
40    receiver: Receiver<SystemCommand>,
41    storage: RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>,
42    arbiters: Mutex<Arbiters>,
43    pool: ThreadPool,
44}
45
46#[derive(Clone)]
47pub struct SystemConfig {
48    pub(super) name: String,
49    pub(super) stack_size: usize,
50    pub(super) stop_on_panic: bool,
51    pub(super) ping_interval: usize,
52    pub(super) pool_limit: usize,
53    pub(super) pool_recv_timeout: Duration,
54    pub(super) disable_signals: bool,
55    pub(super) testing: bool,
56    pub(super) runner: Arc<dyn Runner>,
57}
58
59thread_local!(
60    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
61);
62
63impl Clone for System {
64    fn clone(&self) -> Self {
65        Self(self.0.clone())
66    }
67}
68
69impl System {
70    /// Constructs new system and sets it as current
71    pub(super) fn start(config: SystemConfig) -> (Self, oneshot::Receiver<i32>) {
72        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
73        let (sender, receiver) = unbounded();
74
75        let pool =
76            ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
77        let (arbiter, controller) = Arbiter::new_system(id, config.name.clone());
78
79        let mut arbiters = Arbiters::default();
80        arbiters.all.insert(arbiter.id(), arbiter.clone());
81        arbiters.list.push(arbiter.clone());
82
83        let sys = System(Arc::new(SystemInner {
84            id,
85            config,
86            arbiter,
87            sender,
88            receiver,
89            pool,
90            arbiters: Mutex::new(arbiters),
91            storage: RwLock::new(HashMap::default()),
92        }));
93        System::set_current(sys.clone());
94
95        let (stop_tx, stop) = oneshot::channel();
96
97        // system support tasks
98        crate::spawn(SystemSupport::new(&sys, stop_tx).run());
99        crate::spawn(controller.run(sys.clone()));
100
101        (sys, stop)
102    }
103
104    /// Build a new system with a customized runtime
105    ///
106    /// This allows to customize the runtime. See struct level docs on
107    /// `Builder` for more information.
108    pub fn build() -> Builder {
109        Builder::new()
110    }
111
112    #[allow(clippy::new_ret_no_self)]
113    /// Create new system
114    ///
115    /// This method panics if it can not create runtime
116    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
117        Self::build().name(name).build(runner)
118    }
119
120    #[allow(clippy::new_ret_no_self)]
121    /// Create new system
122    ///
123    /// This method panics if it can not create runtime
124    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
125        Self::build().name(name).build_with(config)
126    }
127
128    /// Get current running system
129    ///
130    /// # Panics
131    ///
132    /// Panics if System is not running
133    pub fn current() -> System {
134        CURRENT.with(|cell| match *cell.borrow() {
135            Some(ref sys) => sys.clone(),
136            None => panic!("System is not running"),
137        })
138    }
139
140    /// Runs a function using the system context.
141    pub fn try_current() -> Option<System> {
142        CURRENT.with(|cell| cell.borrow().as_ref().map(Clone::clone))
143    }
144
145    /// Set current running system
146    #[doc(hidden)]
147    pub fn set_current(sys: System) {
148        CURRENT.with(|s| {
149            *s.borrow_mut() = Some(sys);
150        });
151    }
152
153    pub(crate) fn register_arbiter(&self, arb: Arbiter) {
154        CURRENT.with(|s| {
155            *s.borrow_mut() = Some(self.clone());
156        });
157        let mut arbiters = self.0.arbiters.lock();
158        arbiters.all.insert(arb.id(), arb.clone());
159        arbiters.list.push(arb);
160    }
161
162    pub(crate) fn unregister_arbiter(&self, id: Id) {
163        CURRENT.with(|s| {
164            *s.borrow_mut() = None;
165        });
166        let mut arbiters = self.0.arbiters.lock();
167        if let Some(hnd) = arbiters.all.remove(&id) {
168            for (idx, arb) in arbiters.list.iter().enumerate() {
169                if &hnd == arb {
170                    arbiters.list.remove(idx);
171                    break;
172                }
173            }
174        }
175    }
176
177    pub(super) fn remove_current() {
178        CURRENT.with(|cell| {
179            cell.borrow_mut().take();
180        });
181    }
182
183    /// System id
184    pub fn id(&self) -> Id {
185        Id(self.0.id)
186    }
187
188    /// System name
189    pub fn name(&self) -> &str {
190        &self.0.config.name
191    }
192
193    /// Stop the system
194    pub fn stop(&self) {
195        self.stop_with_code(0);
196    }
197
198    /// Stop the system with a particular exit code
199    pub fn stop_with_code(&self, code: i32) {
200        let _ = self.0.sender.try_send(SystemCommand::Exit(code));
201    }
202
203    /// Return status of `stop_on_panic` option
204    ///
205    /// It controls whether the System is stopped when an
206    /// uncaught panic is thrown from a worker thread.
207    pub fn stop_on_panic(&self) -> bool {
208        self.0.config.stop_on_panic
209    }
210
211    /// Return status of `signals` option
212    pub fn signals_disabled(&self) -> bool {
213        self.0.config.disable_signals
214    }
215
216    /// System arbiter
217    ///
218    /// # Panics
219    ///
220    /// Panics if system is not started
221    pub fn arbiter(&self) -> Arbiter {
222        self.0.arbiter.clone()
223    }
224
225    /// Retrieves a list of all arbiters in the system
226    ///
227    /// This method should be called from the thread where the system has been initialized,
228    /// typically the "main" thread.
229    pub fn list_arbiters<F, R>(&self, f: F) -> R
230    where
231        F: FnOnce(&[Arbiter]) -> R,
232    {
233        f(&self.0.arbiters.lock().list)
234    }
235
236    /// Retrieves a list of last pings records for specified arbiter
237    ///
238    /// This method should be called from the thread where the system has been initialized,
239    /// typically the "main" thread.
240    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
241    where
242        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
243    {
244        PINGS.with(|pings| {
245            if let Some(recs) = pings.borrow().get(&id) {
246                f(Some(recs))
247            } else {
248                f(None)
249            }
250        })
251    }
252
253    /// System config
254    pub fn config(&self) -> SystemConfig {
255        self.0.config.clone()
256    }
257
258    #[inline]
259    /// Runtime handle for main thread
260    pub fn handle(&self) -> Handle {
261        self.arbiter().handle().clone()
262    }
263
264    /// Testing flag
265    pub fn testing(&self) -> bool {
266        self.0.config.testing()
267    }
268
269    /// Spawns a blocking task in a new thread, and wait for it
270    ///
271    /// The task will not be cancelled even if the future is dropped.
272    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
273    where
274        F: FnOnce() -> R + Send + 'static,
275        R: Send + 'static,
276    {
277        self.0.pool.execute(f)
278    }
279
280    /// Returns a previously registered type, or inserts and returns a new one.
281    ///
282    /// This method acquires a lock on the internal data structure.
283    /// To avoid repeated locking, prefer storing a cloned value in the arbiter's storage.
284    pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
285    where
286        T: Clone + Send + Sync + 'static,
287    {
288        if let Some(boxed) = self.0.storage.read().get(&TypeId::of::<T>())
289            && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
290        {
291            val.clone()
292        } else {
293            let val = f();
294            self.0
295                .storage
296                .write()
297                .insert(TypeId::of::<T>(), Box::new(val.clone()));
298            val
299        }
300    }
301}
302
303impl SystemConfig {
304    #[inline]
305    /// Is current system is testing
306    pub fn testing(&self) -> bool {
307        self.testing
308    }
309}
310
311impl fmt::Debug for System {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        f.debug_struct("System")
314            .field("id", &self.0.id)
315            .field("config", &self.0.config)
316            .field("pool", &self.0.pool)
317            .finish()
318    }
319}
320
321impl fmt::Debug for SystemConfig {
322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323        f.debug_struct("SystemConfig")
324            .field("name", &self.name)
325            .field("testing", &self.testing)
326            .field("stack_size", &self.stack_size)
327            .field("stop_on_panic", &self.stop_on_panic)
328            .field("signals_disabled", &self.disable_signals)
329            .finish()
330    }
331}
332
333#[derive(Debug)]
334pub(super) enum SystemCommand {
335    Exit(i32),
336}
337
338#[derive(Debug)]
339struct SystemSupport {
340    sys: System,
341    stop: Option<oneshot::Sender<i32>>,
342    commands: Receiver<SystemCommand>,
343    ping_interval: Duration,
344}
345
346impl SystemSupport {
347    fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
348        Self {
349            sys: sys.clone(),
350            stop: Some(stop),
351            commands: sys.0.receiver.clone(),
352            ping_interval: Duration::from_millis(sys.0.config.ping_interval as u64),
353        }
354    }
355
356    async fn run(mut self) {
357        crate::spawn(ping_arbiters(self.sys.clone(), self.ping_interval));
358
359        loop {
360            match self.commands.recv().await {
361                Ok(SystemCommand::Exit(code)) => {
362                    log::debug!("Stopping system with {code} code");
363
364                    // stop arbiters
365                    let mut arbiters = self.sys.0.arbiters.lock();
366                    for arb in arbiters.list.drain(..) {
367                        arb.stop();
368                    }
369                    arbiters.all.clear();
370
371                    // stop event loop
372                    if let Some(stop) = self.stop.take() {
373                        let _ = stop.send(code);
374                    }
375                }
376                Err(_) => {
377                    log::debug!("System stopped");
378                    return;
379                }
380            }
381        }
382    }
383}
384
385#[derive(Copy, Clone, Debug)]
386pub struct PingRecord {
387    /// Ping start time
388    pub start: Instant,
389    /// Round-trip time, if value is not set then ping is in process
390    pub rtt: Option<Duration>,
391}
392
393async fn ping_arbiters(sys: System, interval: Duration) {
394    let pings = Rc::new(RefCell::new(HashSet::default()));
395
396    loop {
397        // send pings
398        {
399            pings.borrow_mut().clear();
400
401            let start = Instant::now();
402            let arbiters = sys.0.arbiters.lock();
403
404            for arb in &arbiters.list {
405                let id = arb.id();
406                let pings = pings.clone();
407                let fut = arb.handle().spawn(async move {
408                    yield_to().await;
409                });
410
411                // calc ttl
412                PINGS.with(|pings| {
413                    let mut p = pings.borrow_mut();
414                    let recs = p.entry(arb.id()).or_default();
415                    recs.push_front(PingRecord { start, rtt: None });
416                    recs.truncate(10);
417                });
418
419                crate::spawn(async move {
420                    if fut.await.is_ok() {
421                        pings.borrow_mut().insert(id);
422
423                        PINGS.with(|pings| {
424                            pings
425                                .borrow_mut()
426                                .get_mut(&id)
427                                .unwrap()
428                                .front_mut()
429                                .unwrap()
430                                .rtt = Some(start.elapsed());
431                        });
432                    }
433                });
434            }
435        }
436
437        Delay::new(interval).await;
438
439        // check pings
440        #[cfg(target_os = "linux")]
441        {
442            const SPIN: Duration = Duration::from_micros(100);
443
444            let mut no_pongs = Vec::new();
445
446            {
447                for arb in &sys.0.arbiters.lock().list {
448                    let pong = pings.borrow_mut().remove(&arb.id());
449                    if !pong {
450                        no_pongs.push(arb.clone());
451                    }
452                }
453            }
454
455            for arb in no_pongs {
456                // no response from arbiter
457                log::error!("Arbiter {}({:?}) did not return pong", arb.name(), arb.id());
458
459                // send tgkill to thread id to capture backtrace
460                *CAPTURED.lock() = None;
461                EXPECTED_TID.store(arb.tid(), Ordering::Release);
462                unsafe {
463                    libc::syscall(
464                        libc::SYS_tgkill,
465                        libc::getpid(),
466                        arb.tid(),
467                        libc::SIGUSR2,
468                    );
469                }
470
471                // Spin
472                for _ in 0..1000 {
473                    Delay::new(SPIN).await;
474                    if let Some(bt) = CAPTURED.lock().take() {
475                        let bt = ntex_error::Backtrace::from(bt);
476                        bt.resolver().resolve();
477                        log::error!(
478                            "Worker does not returned pong within {interval:?} time.\n{bt:?}"
479                        );
480                        break;
481                    }
482                }
483            }
484        }
485    }
486}
487
488async fn yield_to() {
489    use std::task::{Context, Poll};
490
491    struct Yield {
492        completed: bool,
493    }
494
495    impl Future for Yield {
496        type Output = ();
497
498        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
499            if self.completed {
500                return Poll::Ready(());
501            }
502            self.completed = true;
503            cx.waker().wake_by_ref();
504            Poll::Pending
505        }
506    }
507
508    Yield { completed: false }.await;
509}
510
511#[cfg(target_os = "linux")]
512static EXPECTED_TID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
513#[cfg(target_os = "linux")]
514static CAPTURED: Mutex<Option<ntex_error::BacktraceRaw>> = Mutex::new(None);
515
516#[track_caller]
517#[cfg(target_family = "unix")]
518pub(crate) fn sig_usr2() {
519    #[cfg(target_os = "linux")]
520    #[allow(clippy::cast_possible_truncation)]
521    {
522        let tid = unsafe { libc::syscall(libc::SYS_gettid) } as i32;
523        if EXPECTED_TID.load(Ordering::Acquire) == tid {
524            // backtrace::Backtrace::new_unresolved uses libunwind frame walking,
525            // which is signal-safe. Symbol resolution is NOT — do it later.
526            let bt = ntex_error::BacktraceRaw::new(panic::Location::caller());
527            *CAPTURED.lock() = Some(bt);
528        }
529    }
530}