Skip to main content

ntex_rt/
system.rs

1use std::any::{Any, TypeId};
2use std::collections::VecDeque;
3use std::sync::{Arc, atomic::AtomicBool, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::{Mutex, RwLock};
10
11use crate::arbiter::Arbiter;
12use crate::pool::ThreadPool;
13use crate::{BlockingResult, Builder, Handle, HashMap, HashSet, Runner, SystemRunner};
14
15static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
16
17thread_local!(
18    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19        RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24    all: HashMap<Id, Arbiter>,
25    list: Vec<Arbiter>,
26}
27
28/// System id
29#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32/// System is a runtime manager
33pub struct System(Arc<SystemInner>);
34
35struct SystemInner {
36    id: usize,
37    arbiter: Arbiter,
38    config: SystemConfig,
39    sender: Sender<SystemCommand>,
40    receiver: Receiver<SystemCommand>,
41    storage: RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>,
42    arbiters: Mutex<Arbiters>,
43    signals: AtomicBool,
44    pool: ThreadPool,
45}
46
47#[derive(Clone)]
48pub struct SystemConfig {
49    pub(super) name: String,
50    pub(super) stack_size: usize,
51    pub(super) stop_on_panic: bool,
52    pub(super) ping_interval: usize,
53    pub(super) pool_limit: usize,
54    pub(super) pool_recv_timeout: Duration,
55    pub(super) testing: bool,
56    pub(super) runner: Arc<dyn Runner>,
57}
58
59thread_local!(
60    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
61);
62
63impl Clone for System {
64    fn clone(&self) -> Self {
65        Self(self.0.clone())
66    }
67}
68
69impl System {
70    /// Constructs new system and sets it as current
71    pub(super) fn start(config: SystemConfig) -> (Self, oneshot::Receiver<i32>) {
72        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
73        let (sender, receiver) = unbounded();
74
75        let pool =
76            ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
77        let (arbiter, controller) = Arbiter::new_system(id, config.name.clone());
78
79        let mut arbiters = Arbiters::default();
80        arbiters.all.insert(arbiter.id(), arbiter.clone());
81        arbiters.list.push(arbiter.clone());
82
83        let sys = System(Arc::new(SystemInner {
84            id,
85            config,
86            arbiter,
87            sender,
88            receiver,
89            pool,
90            arbiters: Mutex::new(arbiters),
91            storage: RwLock::new(HashMap::default()),
92            signals: AtomicBool::new(false),
93        }));
94        System::set_current(sys.clone());
95
96        let (stop_tx, stop) = oneshot::channel();
97
98        // system support tasks
99        crate::spawn(SystemSupport::new(&sys, stop_tx).run());
100        crate::spawn(controller.run(sys.clone()));
101
102        (sys, stop)
103    }
104
105    /// Build a new system with a customized runtime
106    ///
107    /// This allows to customize the runtime. See struct level docs on
108    /// `Builder` for more information.
109    pub fn build() -> Builder {
110        Builder::new()
111    }
112
113    #[allow(clippy::new_ret_no_self)]
114    /// Create new system
115    ///
116    /// This method panics if it can not create runtime
117    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
118        Self::build().name(name).build(runner)
119    }
120
121    #[allow(clippy::new_ret_no_self)]
122    /// Create new system
123    ///
124    /// This method panics if it can not create runtime
125    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
126        Self::build().name(name).build_with(config)
127    }
128
129    /// Get current running system
130    ///
131    /// # Panics
132    ///
133    /// Panics if System is not running
134    pub fn current() -> System {
135        CURRENT.with(|cell| match *cell.borrow() {
136            Some(ref sys) => sys.clone(),
137            None => panic!("System is not running"),
138        })
139    }
140
141    /// Runs a function using the system context.
142    pub fn try_current() -> Option<System> {
143        CURRENT.with(|cell| cell.borrow().as_ref().map(Clone::clone))
144    }
145
146    /// Set current running system
147    #[doc(hidden)]
148    pub fn set_current(sys: System) {
149        CURRENT.with(|s| {
150            *s.borrow_mut() = Some(sys);
151        });
152    }
153
154    pub(crate) fn register_arbiter(&self, arb: Arbiter) {
155        CURRENT.with(|s| {
156            *s.borrow_mut() = Some(self.clone());
157        });
158        let mut arbiters = self.0.arbiters.lock();
159        arbiters.all.insert(arb.id(), arb.clone());
160        arbiters.list.push(arb);
161    }
162
163    pub(crate) fn unregister_arbiter(&self, id: Id) {
164        CURRENT.with(|s| {
165            *s.borrow_mut() = None;
166        });
167        let mut arbiters = self.0.arbiters.lock();
168        if let Some(hnd) = arbiters.all.remove(&id) {
169            for (idx, arb) in arbiters.list.iter().enumerate() {
170                if &hnd == arb {
171                    arbiters.list.remove(idx);
172                    break;
173                }
174            }
175        }
176    }
177
178    pub(super) fn remove_current() {
179        CURRENT.with(|cell| {
180            cell.borrow_mut().take();
181        });
182    }
183
184    /// System id
185    pub fn id(&self) -> Id {
186        Id(self.0.id)
187    }
188
189    /// System name
190    pub fn name(&self) -> &str {
191        &self.0.config.name
192    }
193
194    /// Stop the system
195    pub fn stop(&self) {
196        self.stop_with_code(0);
197    }
198
199    /// Stop the system with a particular exit code
200    pub fn stop_with_code(&self, code: i32) {
201        let _ = self.0.sender.try_send(SystemCommand::Exit(code));
202    }
203
204    /// Return status of `stop_on_panic` option
205    ///
206    /// It controls whether the System is stopped when an
207    /// uncaught panic is thrown from a worker thread.
208    pub fn stop_on_panic(&self) -> bool {
209        self.0.config.stop_on_panic
210    }
211
212    /// Return status of `signals` option
213    pub fn signals(&self) -> bool {
214        self.0.signals.load(Ordering::Relaxed)
215    }
216
217    /// Enable `signals` handling
218    pub fn enable_signals(&self) {
219        if !self.signals() {
220            crate::signals::start(self);
221            self.0.signals.store(true, Ordering::Relaxed);
222        }
223    }
224
225    /// Disable `signals` handling
226    pub fn disable_signals(&self) {
227        if self.signals() {
228            crate::signals::stop(self);
229            self.0.signals.store(false, Ordering::Relaxed);
230        }
231    }
232
233    /// System arbiter
234    ///
235    /// # Panics
236    ///
237    /// Panics if system is not started
238    pub fn arbiter(&self) -> Arbiter {
239        self.0.arbiter.clone()
240    }
241
242    /// Retrieves a list of all arbiters in the system
243    ///
244    /// This method should be called from the thread where the system has been initialized,
245    /// typically the "main" thread.
246    pub fn list_arbiters<F, R>(&self, f: F) -> R
247    where
248        F: FnOnce(&[Arbiter]) -> R,
249    {
250        f(&self.0.arbiters.lock().list)
251    }
252
253    /// Retrieves a list of last pings records for specified arbiter
254    ///
255    /// This method should be called from the thread where the system has been initialized,
256    /// typically the "main" thread.
257    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
258    where
259        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
260    {
261        PINGS.with(|pings| {
262            if let Some(recs) = pings.borrow().get(&id) {
263                f(Some(recs))
264            } else {
265                f(None)
266            }
267        })
268    }
269
270    /// System config
271    pub fn config(&self) -> SystemConfig {
272        self.0.config.clone()
273    }
274
275    #[inline]
276    /// Runtime handle for main thread
277    pub fn handle(&self) -> Handle {
278        self.arbiter().handle().clone()
279    }
280
281    /// Testing flag
282    pub fn testing(&self) -> bool {
283        self.0.config.testing()
284    }
285
286    /// Spawns a blocking task in a new thread, and wait for it
287    ///
288    /// The task will not be cancelled even if the future is dropped.
289    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
290    where
291        F: FnOnce() -> R + Send + 'static,
292        R: Send + 'static,
293    {
294        self.0.pool.execute(f)
295    }
296
297    /// Returns a previously registered type, or inserts and returns a new one.
298    ///
299    /// This method acquires a lock on the internal data structure.
300    /// To avoid repeated locking, prefer storing a cloned value in the arbiter's storage.
301    pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
302    where
303        T: Clone + Send + Sync + 'static,
304    {
305        if let Some(boxed) = self.0.storage.read().get(&TypeId::of::<T>())
306            && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
307        {
308            val.clone()
309        } else {
310            let val = f();
311            self.0
312                .storage
313                .write()
314                .insert(TypeId::of::<T>(), Box::new(val.clone()));
315            val
316        }
317    }
318}
319
320impl SystemConfig {
321    #[inline]
322    /// Is current system is testing
323    pub fn testing(&self) -> bool {
324        self.testing
325    }
326}
327
328impl fmt::Debug for System {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        f.debug_struct("System")
331            .field("id", &self.0.id)
332            .field("config", &self.0.config)
333            .field("signals", &self.signals())
334            .field("pool", &self.0.pool)
335            .finish()
336    }
337}
338
339impl fmt::Debug for SystemConfig {
340    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341        f.debug_struct("SystemConfig")
342            .field("name", &self.name)
343            .field("testing", &self.testing)
344            .field("stack_size", &self.stack_size)
345            .field("stop_on_panic", &self.stop_on_panic)
346            .finish()
347    }
348}
349
350#[derive(Debug)]
351pub(super) enum SystemCommand {
352    Exit(i32),
353}
354
355#[derive(Debug)]
356struct SystemSupport {
357    sys: System,
358    stop: Option<oneshot::Sender<i32>>,
359    commands: Receiver<SystemCommand>,
360    ping_interval: Duration,
361}
362
363impl SystemSupport {
364    fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
365        Self {
366            sys: sys.clone(),
367            stop: Some(stop),
368            commands: sys.0.receiver.clone(),
369            ping_interval: Duration::from_millis(sys.0.config.ping_interval as u64),
370        }
371    }
372
373    async fn run(mut self) {
374        crate::spawn(ping_arbiters(self.sys.clone(), self.ping_interval));
375
376        loop {
377            match self.commands.recv().await {
378                Ok(SystemCommand::Exit(code)) => {
379                    log::debug!("Stopping system with {code} code");
380
381                    // stop arbiters
382                    let mut arbiters = self.sys.0.arbiters.lock();
383                    for arb in arbiters.list.drain(..) {
384                        arb.stop();
385                    }
386                    arbiters.all.clear();
387
388                    // stop event loop
389                    if let Some(stop) = self.stop.take() {
390                        let _ = stop.send(code);
391                    }
392                }
393                Err(_) => {
394                    log::debug!("System stopped");
395                    return;
396                }
397            }
398        }
399    }
400}
401
402#[derive(Copy, Clone, Debug)]
403pub struct PingRecord {
404    /// Ping start time
405    pub start: Instant,
406    /// Round-trip time, if value is not set then ping is in process
407    pub rtt: Option<Duration>,
408}
409
410async fn ping_arbiters(sys: System, interval: Duration) {
411    let pings = Rc::new(RefCell::new(HashSet::default()));
412
413    loop {
414        // send pings
415        {
416            pings.borrow_mut().clear();
417
418            let start = Instant::now();
419            let arbiters = sys.0.arbiters.lock();
420
421            for arb in &arbiters.list {
422                let id = arb.id();
423                let pings = pings.clone();
424                let fut = arb.handle().spawn(async move {
425                    yield_to().await;
426                });
427
428                // calc ttl
429                PINGS.with(|pings| {
430                    let mut p = pings.borrow_mut();
431                    let recs = p.entry(arb.id()).or_default();
432                    recs.push_front(PingRecord { start, rtt: None });
433                    recs.truncate(10);
434                });
435
436                crate::spawn(async move {
437                    if fut.await.is_ok() {
438                        pings.borrow_mut().insert(id);
439
440                        PINGS.with(|pings| {
441                            pings
442                                .borrow_mut()
443                                .get_mut(&id)
444                                .unwrap()
445                                .front_mut()
446                                .unwrap()
447                                .rtt = Some(start.elapsed());
448                        });
449                    }
450                });
451            }
452        }
453
454        Delay::new(interval).await;
455
456        // check pings
457        #[cfg(target_os = "linux")]
458        {
459            const SPIN: Duration = Duration::from_micros(100);
460
461            let mut no_pongs = Vec::new();
462
463            {
464                for arb in &sys.0.arbiters.lock().list {
465                    let pong = pings.borrow_mut().remove(&arb.id());
466                    if !pong {
467                        no_pongs.push(arb.clone());
468                    }
469                }
470            }
471
472            for arb in no_pongs {
473                // no response from arbiter
474                log::error!("Arbiter {}({:?}) did not return pong", arb.name(), arb.id());
475
476                // send tgkill to thread id to capture backtrace
477                *CAPTURED.lock() = None;
478                EXPECTED_TID.store(arb.tid(), Ordering::Release);
479                unsafe {
480                    libc::syscall(
481                        libc::SYS_tgkill,
482                        libc::getpid(),
483                        arb.tid(),
484                        libc::SIGUSR2,
485                    );
486                }
487
488                // Spin
489                for _ in 0..1000 {
490                    Delay::new(SPIN).await;
491                    if let Some(bt) = CAPTURED.lock().take() {
492                        let bt = ntex_error::Backtrace::from(bt);
493                        bt.resolver().resolve();
494                        log::error!(
495                            "Worker does not returned pong within {interval:?} time.\n{bt:?}"
496                        );
497                        break;
498                    }
499                }
500            }
501        }
502    }
503}
504
505async fn yield_to() {
506    use std::task::{Context, Poll};
507
508    struct Yield {
509        completed: bool,
510    }
511
512    impl Future for Yield {
513        type Output = ();
514
515        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
516            if self.completed {
517                return Poll::Ready(());
518            }
519            self.completed = true;
520            cx.waker().wake_by_ref();
521            Poll::Pending
522        }
523    }
524
525    Yield { completed: false }.await;
526}
527
528#[cfg(target_os = "linux")]
529static EXPECTED_TID: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(0);
530#[cfg(target_os = "linux")]
531static CAPTURED: Mutex<Option<ntex_error::BacktraceRaw>> = Mutex::new(None);
532
533#[track_caller]
534#[cfg(target_family = "unix")]
535pub(crate) fn sig_usr2() {
536    #[cfg(target_os = "linux")]
537    #[allow(clippy::cast_possible_truncation)]
538    {
539        let tid = unsafe { libc::syscall(libc::SYS_gettid) } as i32;
540        if EXPECTED_TID.load(Ordering::Acquire) == tid {
541            // backtrace::Backtrace::new_unresolved uses libunwind frame walking,
542            // which is signal-safe. Symbol resolution is NOT — do it later.
543            let bt = ntex_error::BacktraceRaw::new(panic::Location::caller());
544            *CAPTURED.lock() = Some(bt);
545        }
546    }
547}