ntex_rt/
system.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use super::arbiter::Arbiter;
10use super::builder::{Builder, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17        RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22    all: HashMap<Id, Arbiter>,
23    list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29/// System is a runtime manager.
30#[derive(Clone, Debug)]
31pub struct System {
32    id: usize,
33    sys: Sender<SystemCommand>,
34    arbiter: Arbiter,
35    config: SystemConfig,
36}
37
38#[derive(Clone)]
39pub(super) struct SystemConfig {
40    pub(super) stack_size: usize,
41    pub(super) stop_on_panic: bool,
42    pub(super) block_on:
43        Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
44}
45
46thread_local!(
47    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
48);
49
50impl System {
51    /// Constructs new system and sets it as current
52    pub(super) fn construct(
53        sys: Sender<SystemCommand>,
54        mut arbiter: Arbiter,
55        config: SystemConfig,
56    ) -> Self {
57        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
58        arbiter.sys_id = id;
59
60        let sys = System {
61            id,
62            sys,
63            config,
64            arbiter,
65        };
66        System::set_current(sys.clone());
67        sys
68    }
69
70    /// Build a new system with a customized tokio runtime.
71    ///
72    /// This allows to customize the runtime. See struct level docs on
73    /// `Builder` for more information.
74    pub fn build() -> Builder {
75        Builder::new()
76    }
77
78    #[allow(clippy::new_ret_no_self)]
79    /// Create new system.
80    ///
81    /// This method panics if it can not create tokio runtime
82    pub fn new(name: &str) -> SystemRunner {
83        Self::build().name(name).finish()
84    }
85
86    /// Get current running system.
87    pub fn current() -> System {
88        CURRENT.with(|cell| match *cell.borrow() {
89            Some(ref sys) => sys.clone(),
90            None => panic!("System is not running"),
91        })
92    }
93
94    /// Set current running system.
95    #[doc(hidden)]
96    pub fn set_current(sys: System) {
97        CURRENT.with(|s| {
98            *s.borrow_mut() = Some(sys);
99        })
100    }
101
102    /// System id
103    pub fn id(&self) -> Id {
104        Id(self.id)
105    }
106
107    /// Stop the system
108    pub fn stop(&self) {
109        self.stop_with_code(0)
110    }
111
112    /// Stop the system with a particular exit code.
113    pub fn stop_with_code(&self, code: i32) {
114        let _ = self.sys.try_send(SystemCommand::Exit(code));
115    }
116
117    /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an
118    /// uncaught panic is thrown from a worker thread.
119    pub fn stop_on_panic(&self) -> bool {
120        self.config.stop_on_panic
121    }
122
123    /// System arbiter
124    pub fn arbiter(&self) -> &Arbiter {
125        &self.arbiter
126    }
127
128    /// Retrieves a list of all arbiters in the system.
129    ///
130    /// This method should be called from the thread where the system has been initialized,
131    /// typically the "main" thread.
132    pub fn list_arbiters<F, R>(f: F) -> R
133    where
134        F: FnOnce(&[Arbiter]) -> R,
135    {
136        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
137    }
138
139    /// Retrieves a list of last pings records for specified arbiter.
140    ///
141    /// This method should be called from the thread where the system has been initialized,
142    /// typically the "main" thread.
143    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
144    where
145        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
146    {
147        PINGS.with(|pings| {
148            if let Some(recs) = pings.borrow().get(&id) {
149                f(Some(recs))
150            } else {
151                f(None)
152            }
153        })
154    }
155
156    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
157        &self.sys
158    }
159
160    /// System config
161    pub(super) fn config(&self) -> SystemConfig {
162        self.config.clone()
163    }
164}
165
166impl SystemConfig {
167    /// Execute a future with custom `block_on` method and wait for result.
168    #[inline]
169    pub(super) fn block_on<F, R>(&self, fut: F) -> R
170    where
171        F: Future<Output = R> + 'static,
172        R: 'static,
173    {
174        // run loop
175        let result = Rc::new(RefCell::new(None));
176        let result_inner = result.clone();
177
178        if let Some(block_on) = &self.block_on {
179            (*block_on)(Box::pin(async move {
180                let r = fut.await;
181                *result_inner.borrow_mut() = Some(r);
182            }));
183        } else {
184            crate::block_on(Box::pin(async move {
185                let r = fut.await;
186                *result_inner.borrow_mut() = Some(r);
187            }));
188        }
189        let res = result.borrow_mut().take().unwrap();
190        res
191    }
192}
193
194impl fmt::Debug for SystemConfig {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        f.debug_struct("SystemConfig")
197            .field("stack_size", &self.stack_size)
198            .field("stop_on_panic", &self.stop_on_panic)
199            .finish()
200    }
201}
202
203#[derive(Debug)]
204pub(super) enum SystemCommand {
205    Exit(i32),
206    RegisterArbiter(Id, Arbiter),
207    UnregisterArbiter(Id),
208}
209
210pub(super) struct SystemSupport {
211    stop: Option<oneshot::Sender<i32>>,
212    commands: Receiver<SystemCommand>,
213    ping_interval: Duration,
214}
215
216impl SystemSupport {
217    pub(super) fn new(
218        stop: oneshot::Sender<i32>,
219        commands: Receiver<SystemCommand>,
220        ping_interval: usize,
221    ) -> Self {
222        Self {
223            commands,
224            stop: Some(stop),
225            ping_interval: Duration::from_millis(ping_interval as u64),
226        }
227    }
228
229    pub(super) async fn run(mut self) {
230        ARBITERS.with(move |arbs| {
231            let mut arbiters = arbs.borrow_mut();
232            arbiters.all.clear();
233            arbiters.list.clear();
234        });
235
236        loop {
237            match self.commands.recv().await {
238                Ok(SystemCommand::Exit(code)) => {
239                    log::debug!("Stopping system with {} code", code);
240
241                    // stop arbiters
242                    ARBITERS.with(move |arbs| {
243                        let mut arbiters = arbs.borrow_mut();
244                        for arb in arbiters.list.drain(..) {
245                            arb.stop();
246                        }
247                        arbiters.all.clear();
248                    });
249
250                    // stop event loop
251                    if let Some(stop) = self.stop.take() {
252                        let _ = stop.send(code);
253                    }
254                }
255                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
256                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
257                    ARBITERS.with(move |arbs| {
258                        let mut arbiters = arbs.borrow_mut();
259                        arbiters.all.insert(id, hnd.clone());
260                        arbiters.list.push(hnd);
261                    });
262                }
263                Ok(SystemCommand::UnregisterArbiter(id)) => {
264                    ARBITERS.with(move |arbs| {
265                        let mut arbiters = arbs.borrow_mut();
266                        if let Some(hnd) = arbiters.all.remove(&id) {
267                            for (idx, arb) in arbiters.list.iter().enumerate() {
268                                if &hnd == arb {
269                                    arbiters.list.remove(idx);
270                                    break;
271                                }
272                            }
273                        }
274                    });
275                }
276                Err(_) => {
277                    log::debug!("System stopped");
278                    return;
279                }
280            }
281        }
282    }
283}
284
285#[derive(Copy, Clone, Debug)]
286pub struct PingRecord {
287    /// Ping start time
288    pub start: Instant,
289    /// Round-trip time, if value is not set then ping is in process
290    pub rtt: Option<Duration>,
291}
292
293async fn ping_arbiter(arb: Arbiter, interval: Duration) {
294    loop {
295        Delay::new(interval).await;
296
297        // check if arbiter is still active
298        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
299
300        if !is_alive {
301            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
302            break;
303        }
304
305        // calc ttl
306        let start = Instant::now();
307        PINGS.with(|pings| {
308            let mut p = pings.borrow_mut();
309            let recs = p.entry(arb.id()).or_default();
310            recs.push_front(PingRecord { start, rtt: None });
311            recs.truncate(10);
312        });
313
314        let result = arb
315            .spawn_with(|| async {
316                yield_to().await;
317            })
318            .await;
319
320        if result.is_err() {
321            break;
322        }
323
324        PINGS.with(|pings| {
325            pings
326                .borrow_mut()
327                .get_mut(&arb.id())
328                .unwrap()
329                .front_mut()
330                .unwrap()
331                .rtt = Some(Instant::now() - start);
332        });
333    }
334}
335
336async fn yield_to() {
337    use std::task::{Context, Poll};
338
339    struct Yield {
340        completed: bool,
341    }
342
343    impl Future for Yield {
344        type Output = ();
345
346        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
347            if self.completed {
348                return Poll::Ready(());
349            }
350            self.completed = true;
351            cx.waker().wake_by_ref();
352            Poll::Pending
353        }
354    }
355
356    Yield { completed: false }.await;
357}
358
359pub(super) trait FnExec: Send + 'static {
360    fn call_box(self: Box<Self>);
361}
362
363impl<F> FnExec for F
364where
365    F: FnOnce() + Send + 'static,
366{
367    #[allow(clippy::boxed_local)]
368    fn call_box(self: Box<Self>) {
369        (*self)()
370    }
371}