Skip to main content

ntex_rt/
system.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17        RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22    all: HashMap<Id, Arbiter>,
23    list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29/// System is a runtime manager.
30#[derive(Clone, Debug)]
31pub struct System {
32    id: usize,
33    sys: Sender<SystemCommand>,
34    arbiter: Arbiter,
35    config: SystemConfig,
36    pool: ThreadPool,
37}
38
39#[derive(Clone)]
40pub struct SystemConfig {
41    pub(super) name: String,
42    pub(super) runner: Arc<dyn Runner>,
43    pub(super) stack_size: usize,
44    pub(super) stop_on_panic: bool,
45    pub(super) testing: bool,
46}
47
48thread_local!(
49    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
50);
51
52impl System {
53    /// Constructs new system and sets it as current
54    pub(super) fn construct(
55        sys: Sender<SystemCommand>,
56        mut arbiter: Arbiter,
57        config: SystemConfig,
58        pool: ThreadPool,
59    ) -> Self {
60        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
61        arbiter.sys_id = id;
62
63        let sys = System {
64            id,
65            sys,
66            arbiter,
67            config,
68            pool,
69        };
70        System::set_current(sys.clone());
71        sys
72    }
73
74    /// Build a new system with a customized tokio runtime.
75    ///
76    /// This allows to customize the runtime. See struct level docs on
77    /// `Builder` for more information.
78    pub fn build() -> Builder {
79        Builder::new()
80    }
81
82    #[allow(clippy::new_ret_no_self)]
83    /// Create new system.
84    ///
85    /// This method panics if it can not create tokio runtime
86    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
87        Self::build().name(name).build(runner)
88    }
89
90    #[allow(clippy::new_ret_no_self)]
91    /// Create new system.
92    ///
93    /// This method panics if it can not create tokio runtime
94    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
95        Self::build().name(name).build_with(config)
96    }
97
98    /// Get current running system.
99    pub fn current() -> System {
100        CURRENT.with(|cell| match *cell.borrow() {
101            Some(ref sys) => sys.clone(),
102            None => panic!("System is not running"),
103        })
104    }
105
106    /// Set current running system.
107    #[doc(hidden)]
108    pub fn set_current(sys: System) {
109        CURRENT.with(|s| {
110            *s.borrow_mut() = Some(sys);
111        })
112    }
113
114    /// System id
115    pub fn id(&self) -> Id {
116        Id(self.id)
117    }
118
119    /// System name
120    pub fn name(&self) -> &str {
121        &self.config.name
122    }
123
124    /// Stop the system
125    pub fn stop(&self) {
126        self.stop_with_code(0)
127    }
128
129    /// Stop the system with a particular exit code.
130    pub fn stop_with_code(&self, code: i32) {
131        let _ = self.sys.try_send(SystemCommand::Exit(code));
132    }
133
134    /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an
135    /// uncaught panic is thrown from a worker thread.
136    pub fn stop_on_panic(&self) -> bool {
137        self.config.stop_on_panic
138    }
139
140    /// System arbiter
141    pub fn arbiter(&self) -> &Arbiter {
142        &self.arbiter
143    }
144
145    /// Retrieves a list of all arbiters in the system.
146    ///
147    /// This method should be called from the thread where the system has been initialized,
148    /// typically the "main" thread.
149    pub fn list_arbiters<F, R>(f: F) -> R
150    where
151        F: FnOnce(&[Arbiter]) -> R,
152    {
153        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
154    }
155
156    /// Retrieves a list of last pings records for specified arbiter.
157    ///
158    /// This method should be called from the thread where the system has been initialized,
159    /// typically the "main" thread.
160    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
161    where
162        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
163    {
164        PINGS.with(|pings| {
165            if let Some(recs) = pings.borrow().get(&id) {
166                f(Some(recs))
167            } else {
168                f(None)
169            }
170        })
171    }
172
173    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
174        &self.sys
175    }
176
177    /// System config
178    pub fn config(&self) -> SystemConfig {
179        self.config.clone()
180    }
181
182    /// Testing flag
183    pub fn testing(&self) -> bool {
184        self.config.testing()
185    }
186
187    /// Spawns a blocking task in a new thread, and wait for it.
188    ///
189    /// The task will not be cancelled even if the future is dropped.
190    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
191    where
192        F: FnOnce() -> R + Send + 'static,
193        R: Send + 'static,
194    {
195        self.pool.dispatch(f)
196    }
197}
198
199impl SystemConfig {
200    #[inline]
201    /// Is current system is testing
202    pub fn testing(&self) -> bool {
203        self.testing
204    }
205
206    /// Execute a future with custom `block_on` method and wait for result.
207    #[inline]
208    pub(super) fn block_on<F, R>(&self, fut: F) -> R
209    where
210        F: Future<Output = R> + 'static,
211        R: 'static,
212    {
213        // run loop
214        let result = Rc::new(RefCell::new(None));
215        let result_inner = result.clone();
216
217        self.runner.block_on(Box::pin(async move {
218            let r = fut.await;
219            *result_inner.borrow_mut() = Some(r);
220        }));
221        result.borrow_mut().take().unwrap()
222    }
223}
224
225impl fmt::Debug for SystemConfig {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.debug_struct("SystemConfig")
228            .field("testing", &self.testing)
229            .field("stack_size", &self.stack_size)
230            .field("stop_on_panic", &self.stop_on_panic)
231            .finish()
232    }
233}
234
235#[derive(Debug)]
236pub(super) enum SystemCommand {
237    Exit(i32),
238    RegisterArbiter(Id, Arbiter),
239    UnregisterArbiter(Id),
240}
241
242pub(super) struct SystemSupport {
243    stop: Option<oneshot::Sender<i32>>,
244    commands: Receiver<SystemCommand>,
245    ping_interval: Duration,
246}
247
248impl SystemSupport {
249    pub(super) fn new(
250        stop: oneshot::Sender<i32>,
251        commands: Receiver<SystemCommand>,
252        ping_interval: usize,
253    ) -> Self {
254        Self {
255            commands,
256            stop: Some(stop),
257            ping_interval: Duration::from_millis(ping_interval as u64),
258        }
259    }
260
261    pub(super) async fn run(mut self) {
262        ARBITERS.with(move |arbs| {
263            let mut arbiters = arbs.borrow_mut();
264            arbiters.all.clear();
265            arbiters.list.clear();
266        });
267
268        loop {
269            match self.commands.recv().await {
270                Ok(SystemCommand::Exit(code)) => {
271                    log::debug!("Stopping system with {code} code");
272
273                    // stop arbiters
274                    ARBITERS.with(move |arbs| {
275                        let mut arbiters = arbs.borrow_mut();
276                        for arb in arbiters.list.drain(..) {
277                            arb.stop();
278                        }
279                        arbiters.all.clear();
280                    });
281
282                    // stop event loop
283                    if let Some(stop) = self.stop.take() {
284                        let _ = stop.send(code);
285                    }
286                }
287                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
288                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
289                    ARBITERS.with(move |arbs| {
290                        let mut arbiters = arbs.borrow_mut();
291                        arbiters.all.insert(id, hnd.clone());
292                        arbiters.list.push(hnd);
293                    });
294                }
295                Ok(SystemCommand::UnregisterArbiter(id)) => {
296                    ARBITERS.with(move |arbs| {
297                        let mut arbiters = arbs.borrow_mut();
298                        if let Some(hnd) = arbiters.all.remove(&id) {
299                            for (idx, arb) in arbiters.list.iter().enumerate() {
300                                if &hnd == arb {
301                                    arbiters.list.remove(idx);
302                                    break;
303                                }
304                            }
305                        }
306                    });
307                }
308                Err(_) => {
309                    log::debug!("System stopped");
310                    return;
311                }
312            }
313        }
314    }
315}
316
317#[derive(Copy, Clone, Debug)]
318pub struct PingRecord {
319    /// Ping start time
320    pub start: Instant,
321    /// Round-trip time, if value is not set then ping is in process
322    pub rtt: Option<Duration>,
323}
324
325async fn ping_arbiter(arb: Arbiter, interval: Duration) {
326    loop {
327        Delay::new(interval).await;
328
329        // check if arbiter is still active
330        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
331
332        if !is_alive {
333            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
334            break;
335        }
336
337        // calc ttl
338        let start = Instant::now();
339        PINGS.with(|pings| {
340            let mut p = pings.borrow_mut();
341            let recs = p.entry(arb.id()).or_default();
342            recs.push_front(PingRecord { start, rtt: None });
343            recs.truncate(10);
344        });
345
346        let result = arb
347            .spawn_with(|| async {
348                yield_to().await;
349            })
350            .await;
351
352        if result.is_err() {
353            break;
354        }
355
356        PINGS.with(|pings| {
357            pings
358                .borrow_mut()
359                .get_mut(&arb.id())
360                .unwrap()
361                .front_mut()
362                .unwrap()
363                .rtt = Some(Instant::now() - start);
364        });
365    }
366}
367
368async fn yield_to() {
369    use std::task::{Context, Poll};
370
371    struct Yield {
372        completed: bool,
373    }
374
375    impl Future for Yield {
376        type Output = ();
377
378        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
379            if self.completed {
380                return Poll::Ready(());
381            }
382            self.completed = true;
383            cx.waker().wake_by_ref();
384            Poll::Pending
385        }
386    }
387
388    Yield { completed: false }.await;
389}
390
391pub(super) trait FnExec: Send + 'static {
392    fn call_box(self: Box<Self>);
393}
394
395impl<F> FnExec for F
396where
397    F: FnOnce() + Send + 'static,
398{
399    #[allow(clippy::boxed_local)]
400    fn call_box(self: Box<Self>) {
401        (*self)()
402    }
403}