ntex_rt/
system.rs

1use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
2use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc};
3
4use async_channel::Sender;
5
6use super::arbiter::{Arbiter, SystemCommand};
7use super::builder::{Builder, SystemRunner};
8
9static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
10
11/// System is a runtime manager.
12#[derive(Clone, Debug)]
13pub struct System {
14    id: usize,
15    sys: Sender<SystemCommand>,
16    arbiter: Arbiter,
17    config: SystemConfig,
18}
19
20#[derive(Clone)]
21pub(super) struct SystemConfig {
22    pub(super) stack_size: usize,
23    pub(super) stop_on_panic: bool,
24    pub(super) block_on:
25        Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
26}
27
28thread_local!(
29    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
30);
31
32impl System {
33    /// Constructs new system and sets it as current
34    pub(super) fn construct(
35        sys: Sender<SystemCommand>,
36        arbiter: Arbiter,
37        config: SystemConfig,
38    ) -> Self {
39        let sys = System {
40            sys,
41            config,
42            arbiter,
43            id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
44        };
45        System::set_current(sys.clone());
46        sys
47    }
48
49    /// Build a new system with a customized tokio runtime.
50    ///
51    /// This allows to customize the runtime. See struct level docs on
52    /// `Builder` for more information.
53    pub fn build() -> Builder {
54        Builder::new()
55    }
56
57    #[allow(clippy::new_ret_no_self)]
58    /// Create new system.
59    ///
60    /// This method panics if it can not create tokio runtime
61    pub fn new(name: &str) -> SystemRunner {
62        Self::build().name(name).finish()
63    }
64
65    /// Get current running system.
66    pub fn current() -> System {
67        CURRENT.with(|cell| match *cell.borrow() {
68            Some(ref sys) => sys.clone(),
69            None => panic!("System is not running"),
70        })
71    }
72
73    /// Set current running system.
74    #[doc(hidden)]
75    pub fn set_current(sys: System) {
76        CURRENT.with(|s| {
77            *s.borrow_mut() = Some(sys);
78        })
79    }
80
81    /// System id
82    pub fn id(&self) -> usize {
83        self.id
84    }
85
86    /// Stop the system
87    pub fn stop(&self) {
88        self.stop_with_code(0)
89    }
90
91    /// Stop the system with a particular exit code.
92    pub fn stop_with_code(&self, code: i32) {
93        let _ = self.sys.try_send(SystemCommand::Exit(code));
94    }
95
96    /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an
97    /// uncaught panic is thrown from a worker thread.
98    pub fn stop_on_panic(&self) -> bool {
99        self.config.stop_on_panic
100    }
101
102    /// System arbiter
103    pub fn arbiter(&self) -> &Arbiter {
104        &self.arbiter
105    }
106
107    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
108        &self.sys
109    }
110
111    /// System config
112    pub(super) fn config(&self) -> SystemConfig {
113        self.config.clone()
114    }
115}
116
117impl SystemConfig {
118    /// Execute a future with custom `block_on` method and wait for result.
119    #[inline]
120    pub(super) fn block_on<F, R>(&self, fut: F) -> R
121    where
122        F: Future<Output = R> + 'static,
123        R: 'static,
124    {
125        // run loop
126        let result = Rc::new(RefCell::new(None));
127        let result_inner = result.clone();
128
129        if let Some(block_on) = &self.block_on {
130            (*block_on)(Box::pin(async move {
131                let r = fut.await;
132                *result_inner.borrow_mut() = Some(r);
133            }));
134        } else {
135            crate::block_on(Box::pin(async move {
136                let r = fut.await;
137                *result_inner.borrow_mut() = Some(r);
138            }));
139        }
140        let res = result.borrow_mut().take().unwrap();
141        res
142    }
143}
144
145impl fmt::Debug for SystemConfig {
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        f.debug_struct("SystemConfig")
148            .field("stack_size", &self.stack_size)
149            .field("stop_on_panic", &self.stop_on_panic)
150            .finish()
151    }
152}