ntex_rt/
system.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17        RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22    all: HashMap<Id, Arbiter>,
23    list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29/// System is a runtime manager.
30#[derive(Clone, Debug)]
31pub struct System {
32    id: usize,
33    sys: Sender<SystemCommand>,
34    arbiter: Arbiter,
35    config: SystemConfig,
36    pool: ThreadPool,
37}
38
39#[derive(Clone)]
40pub struct SystemConfig {
41    pub(super) runner: Arc<dyn Runner>,
42    pub(super) stack_size: usize,
43    pub(super) stop_on_panic: bool,
44}
45
46thread_local!(
47    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
48);
49
50impl System {
51    /// Constructs new system and sets it as current
52    pub(super) fn construct(
53        sys: Sender<SystemCommand>,
54        mut arbiter: Arbiter,
55        config: SystemConfig,
56        pool: ThreadPool,
57    ) -> Self {
58        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
59        arbiter.sys_id = id;
60
61        let sys = System {
62            id,
63            sys,
64            arbiter,
65            config,
66            pool,
67        };
68        System::set_current(sys.clone());
69        sys
70    }
71
72    /// Build a new system with a customized tokio runtime.
73    ///
74    /// This allows to customize the runtime. See struct level docs on
75    /// `Builder` for more information.
76    pub fn build() -> Builder {
77        Builder::new()
78    }
79
80    #[allow(clippy::new_ret_no_self)]
81    /// Create new system.
82    ///
83    /// This method panics if it can not create tokio runtime
84    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
85        Self::build().name(name).build(runner)
86    }
87
88    #[allow(clippy::new_ret_no_self)]
89    /// Create new system.
90    ///
91    /// This method panics if it can not create tokio runtime
92    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
93        Self::build().name(name).build_with(config)
94    }
95
96    /// Get current running system.
97    pub fn current() -> System {
98        CURRENT.with(|cell| match *cell.borrow() {
99            Some(ref sys) => sys.clone(),
100            None => panic!("System is not running"),
101        })
102    }
103
104    /// Set current running system.
105    #[doc(hidden)]
106    pub fn set_current(sys: System) {
107        CURRENT.with(|s| {
108            *s.borrow_mut() = Some(sys);
109        })
110    }
111
112    /// System id
113    pub fn id(&self) -> Id {
114        Id(self.id)
115    }
116
117    /// Stop the system
118    pub fn stop(&self) {
119        self.stop_with_code(0)
120    }
121
122    /// Stop the system with a particular exit code.
123    pub fn stop_with_code(&self, code: i32) {
124        let _ = self.sys.try_send(SystemCommand::Exit(code));
125    }
126
127    /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an
128    /// uncaught panic is thrown from a worker thread.
129    pub fn stop_on_panic(&self) -> bool {
130        self.config.stop_on_panic
131    }
132
133    /// System arbiter
134    pub fn arbiter(&self) -> &Arbiter {
135        &self.arbiter
136    }
137
138    /// Retrieves a list of all arbiters in the system.
139    ///
140    /// This method should be called from the thread where the system has been initialized,
141    /// typically the "main" thread.
142    pub fn list_arbiters<F, R>(f: F) -> R
143    where
144        F: FnOnce(&[Arbiter]) -> R,
145    {
146        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
147    }
148
149    /// Retrieves a list of last pings records for specified arbiter.
150    ///
151    /// This method should be called from the thread where the system has been initialized,
152    /// typically the "main" thread.
153    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
154    where
155        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
156    {
157        PINGS.with(|pings| {
158            if let Some(recs) = pings.borrow().get(&id) {
159                f(Some(recs))
160            } else {
161                f(None)
162            }
163        })
164    }
165
166    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
167        &self.sys
168    }
169
170    /// System config
171    pub fn config(&self) -> SystemConfig {
172        self.config.clone()
173    }
174
175    /// Spawns a blocking task in a new thread, and wait for it.
176    ///
177    /// The task will not be cancelled even if the future is dropped.
178    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
179    where
180        F: FnOnce() -> R + Send + 'static,
181        R: Send + 'static,
182    {
183        self.pool.dispatch(f)
184    }
185}
186
187impl SystemConfig {
188    /// Execute a future with custom `block_on` method and wait for result.
189    #[inline]
190    pub(super) fn block_on<F, R>(&self, fut: F) -> R
191    where
192        F: Future<Output = R> + 'static,
193        R: 'static,
194    {
195        // run loop
196        let result = Rc::new(RefCell::new(None));
197        let result_inner = result.clone();
198
199        self.runner.block_on(Box::pin(async move {
200            let r = fut.await;
201            *result_inner.borrow_mut() = Some(r);
202        }));
203        result.borrow_mut().take().unwrap()
204    }
205}
206
207impl fmt::Debug for SystemConfig {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("SystemConfig")
210            .field("stack_size", &self.stack_size)
211            .field("stop_on_panic", &self.stop_on_panic)
212            .finish()
213    }
214}
215
216#[derive(Debug)]
217pub(super) enum SystemCommand {
218    Exit(i32),
219    RegisterArbiter(Id, Arbiter),
220    UnregisterArbiter(Id),
221}
222
223pub(super) struct SystemSupport {
224    stop: Option<oneshot::Sender<i32>>,
225    commands: Receiver<SystemCommand>,
226    ping_interval: Duration,
227}
228
229impl SystemSupport {
230    pub(super) fn new(
231        stop: oneshot::Sender<i32>,
232        commands: Receiver<SystemCommand>,
233        ping_interval: usize,
234    ) -> Self {
235        Self {
236            commands,
237            stop: Some(stop),
238            ping_interval: Duration::from_millis(ping_interval as u64),
239        }
240    }
241
242    pub(super) async fn run(mut self) {
243        ARBITERS.with(move |arbs| {
244            let mut arbiters = arbs.borrow_mut();
245            arbiters.all.clear();
246            arbiters.list.clear();
247        });
248
249        loop {
250            match self.commands.recv().await {
251                Ok(SystemCommand::Exit(code)) => {
252                    log::debug!("Stopping system with {code} code");
253
254                    // stop arbiters
255                    ARBITERS.with(move |arbs| {
256                        let mut arbiters = arbs.borrow_mut();
257                        for arb in arbiters.list.drain(..) {
258                            arb.stop();
259                        }
260                        arbiters.all.clear();
261                    });
262
263                    // stop event loop
264                    if let Some(stop) = self.stop.take() {
265                        let _ = stop.send(code);
266                    }
267                }
268                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
269                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
270                    ARBITERS.with(move |arbs| {
271                        let mut arbiters = arbs.borrow_mut();
272                        arbiters.all.insert(id, hnd.clone());
273                        arbiters.list.push(hnd);
274                    });
275                }
276                Ok(SystemCommand::UnregisterArbiter(id)) => {
277                    ARBITERS.with(move |arbs| {
278                        let mut arbiters = arbs.borrow_mut();
279                        if let Some(hnd) = arbiters.all.remove(&id) {
280                            for (idx, arb) in arbiters.list.iter().enumerate() {
281                                if &hnd == arb {
282                                    arbiters.list.remove(idx);
283                                    break;
284                                }
285                            }
286                        }
287                    });
288                }
289                Err(_) => {
290                    log::debug!("System stopped");
291                    return;
292                }
293            }
294        }
295    }
296}
297
298#[derive(Copy, Clone, Debug)]
299pub struct PingRecord {
300    /// Ping start time
301    pub start: Instant,
302    /// Round-trip time, if value is not set then ping is in process
303    pub rtt: Option<Duration>,
304}
305
306async fn ping_arbiter(arb: Arbiter, interval: Duration) {
307    loop {
308        Delay::new(interval).await;
309
310        // check if arbiter is still active
311        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
312
313        if !is_alive {
314            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
315            break;
316        }
317
318        // calc ttl
319        let start = Instant::now();
320        PINGS.with(|pings| {
321            let mut p = pings.borrow_mut();
322            let recs = p.entry(arb.id()).or_default();
323            recs.push_front(PingRecord { start, rtt: None });
324            recs.truncate(10);
325        });
326
327        let result = arb
328            .spawn_with(|| async {
329                yield_to().await;
330            })
331            .await;
332
333        if result.is_err() {
334            break;
335        }
336
337        PINGS.with(|pings| {
338            pings
339                .borrow_mut()
340                .get_mut(&arb.id())
341                .unwrap()
342                .front_mut()
343                .unwrap()
344                .rtt = Some(Instant::now() - start);
345        });
346    }
347}
348
349async fn yield_to() {
350    use std::task::{Context, Poll};
351
352    struct Yield {
353        completed: bool,
354    }
355
356    impl Future for Yield {
357        type Output = ();
358
359        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
360            if self.completed {
361                return Poll::Ready(());
362            }
363            self.completed = true;
364            cx.waker().wake_by_ref();
365            Poll::Pending
366        }
367    }
368
369    Yield { completed: false }.await;
370}
371
372pub(super) trait FnExec: Send + 'static {
373    fn call_box(self: Box<Self>);
374}
375
376impl<F> FnExec for F
377where
378    F: FnOnce() + Send + 'static,
379{
380    #[allow(clippy::boxed_local)]
381    fn call_box(self: Box<Self>) {
382        (*self)()
383    }
384}