Skip to main content

ntex_rt/
system.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17        RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22    all: HashMap<Id, Arbiter>,
23    list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29/// System is a runtime manager
30#[derive(Clone, Debug)]
31pub struct System {
32    id: usize,
33    sys: Sender<SystemCommand>,
34    arbiter: Arbiter,
35    config: SystemConfig,
36    pool: ThreadPool,
37}
38
39#[derive(Clone)]
40pub struct SystemConfig {
41    pub(super) name: String,
42    pub(super) runner: Arc<dyn Runner>,
43    pub(super) stack_size: usize,
44    pub(super) stop_on_panic: bool,
45    pub(super) testing: bool,
46}
47
48thread_local!(
49    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
50);
51
52impl System {
53    /// Constructs new system and sets it as current
54    pub(super) fn construct(
55        sys: Sender<SystemCommand>,
56        mut arbiter: Arbiter,
57        config: SystemConfig,
58        pool: ThreadPool,
59    ) -> Self {
60        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
61        arbiter.sys_id = id;
62
63        let sys = System {
64            id,
65            sys,
66            arbiter,
67            config,
68            pool,
69        };
70        System::set_current(sys.clone());
71        sys
72    }
73
74    /// Build a new system with a customized tokio runtime
75    ///
76    /// This allows to customize the runtime. See struct level docs on
77    /// `Builder` for more information.
78    pub fn build() -> Builder {
79        Builder::new()
80    }
81
82    #[allow(clippy::new_ret_no_self)]
83    /// Create new system
84    ///
85    /// This method panics if it can not create tokio runtime
86    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
87        Self::build().name(name).build(runner)
88    }
89
90    #[allow(clippy::new_ret_no_self)]
91    /// Create new system
92    ///
93    /// This method panics if it can not create tokio runtime
94    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
95        Self::build().name(name).build_with(config)
96    }
97
98    /// Get current running system
99    ///
100    /// # Panics
101    ///
102    /// Panics if System is not running
103    pub fn current() -> System {
104        CURRENT.with(|cell| match *cell.borrow() {
105            Some(ref sys) => sys.clone(),
106            None => panic!("System is not running"),
107        })
108    }
109
110    /// Set current running system
111    #[doc(hidden)]
112    pub fn set_current(sys: System) {
113        CURRENT.with(|s| {
114            *s.borrow_mut() = Some(sys);
115        });
116    }
117
118    /// System id
119    pub fn id(&self) -> Id {
120        Id(self.id)
121    }
122
123    /// System name
124    pub fn name(&self) -> &str {
125        &self.config.name
126    }
127
128    /// Stop the system
129    pub fn stop(&self) {
130        self.stop_with_code(0);
131    }
132
133    /// Stop the system with a particular exit code
134    pub fn stop_with_code(&self, code: i32) {
135        let _ = self.sys.try_send(SystemCommand::Exit(code));
136    }
137
138    /// Return status of `stop_on_panic` option
139    ///
140    /// It controls whether the System is stopped when an
141    /// uncaught panic is thrown from a worker thread.
142    pub fn stop_on_panic(&self) -> bool {
143        self.config.stop_on_panic
144    }
145
146    /// System arbiter
147    pub fn arbiter(&self) -> &Arbiter {
148        &self.arbiter
149    }
150
151    /// Retrieves a list of all arbiters in the system
152    ///
153    /// This method should be called from the thread where the system has been initialized,
154    /// typically the "main" thread.
155    pub fn list_arbiters<F, R>(f: F) -> R
156    where
157        F: FnOnce(&[Arbiter]) -> R,
158    {
159        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
160    }
161
162    /// Retrieves a list of last pings records for specified arbiter
163    ///
164    /// This method should be called from the thread where the system has been initialized,
165    /// typically the "main" thread.
166    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
167    where
168        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
169    {
170        PINGS.with(|pings| {
171            if let Some(recs) = pings.borrow().get(&id) {
172                f(Some(recs))
173            } else {
174                f(None)
175            }
176        })
177    }
178
179    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
180        &self.sys
181    }
182
183    /// System config
184    pub fn config(&self) -> SystemConfig {
185        self.config.clone()
186    }
187
188    /// Testing flag
189    pub fn testing(&self) -> bool {
190        self.config.testing()
191    }
192
193    /// Spawns a blocking task in a new thread, and wait for it
194    ///
195    /// The task will not be cancelled even if the future is dropped.
196    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
197    where
198        F: FnOnce() -> R + Send + 'static,
199        R: Send + 'static,
200    {
201        self.pool.dispatch(f)
202    }
203}
204
205impl SystemConfig {
206    #[inline]
207    /// Is current system is testing
208    pub fn testing(&self) -> bool {
209        self.testing
210    }
211
212    /// Execute a future with custom `block_on` method and wait for result
213    #[inline]
214    pub(super) fn block_on<F, R>(&self, fut: F) -> R
215    where
216        F: Future<Output = R> + 'static,
217        R: 'static,
218    {
219        // run loop
220        let result = Rc::new(RefCell::new(None));
221        let result_inner = result.clone();
222
223        self.runner.block_on(Box::pin(async move {
224            let r = fut.await;
225            *result_inner.borrow_mut() = Some(r);
226        }));
227        result.borrow_mut().take().unwrap()
228    }
229}
230
231impl fmt::Debug for SystemConfig {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        f.debug_struct("SystemConfig")
234            .field("name", &self.name)
235            .field("testing", &self.testing)
236            .field("stack_size", &self.stack_size)
237            .field("stop_on_panic", &self.stop_on_panic)
238            .finish()
239    }
240}
241
242#[derive(Debug)]
243pub(super) enum SystemCommand {
244    Exit(i32),
245    RegisterArbiter(Id, Arbiter),
246    UnregisterArbiter(Id),
247}
248
249#[derive(Debug)]
250pub(super) struct SystemSupport {
251    stop: Option<oneshot::Sender<i32>>,
252    commands: Receiver<SystemCommand>,
253    ping_interval: Duration,
254}
255
256impl SystemSupport {
257    pub(super) fn new(
258        stop: oneshot::Sender<i32>,
259        commands: Receiver<SystemCommand>,
260        ping_interval: usize,
261    ) -> Self {
262        Self {
263            commands,
264            stop: Some(stop),
265            ping_interval: Duration::from_millis(ping_interval as u64),
266        }
267    }
268
269    pub(super) async fn run(mut self) {
270        ARBITERS.with(move |arbs| {
271            let mut arbiters = arbs.borrow_mut();
272            arbiters.all.clear();
273            arbiters.list.clear();
274        });
275
276        loop {
277            match self.commands.recv().await {
278                Ok(SystemCommand::Exit(code)) => {
279                    log::debug!("Stopping system with {code} code");
280
281                    // stop arbiters
282                    ARBITERS.with(move |arbs| {
283                        let mut arbiters = arbs.borrow_mut();
284                        for arb in arbiters.list.drain(..) {
285                            arb.stop();
286                        }
287                        arbiters.all.clear();
288                    });
289
290                    // stop event loop
291                    if let Some(stop) = self.stop.take() {
292                        let _ = stop.send(code);
293                    }
294                }
295                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
296                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
297                    ARBITERS.with(move |arbs| {
298                        let mut arbiters = arbs.borrow_mut();
299                        arbiters.all.insert(id, hnd.clone());
300                        arbiters.list.push(hnd);
301                    });
302                }
303                Ok(SystemCommand::UnregisterArbiter(id)) => {
304                    ARBITERS.with(move |arbs| {
305                        let mut arbiters = arbs.borrow_mut();
306                        if let Some(hnd) = arbiters.all.remove(&id) {
307                            for (idx, arb) in arbiters.list.iter().enumerate() {
308                                if &hnd == arb {
309                                    arbiters.list.remove(idx);
310                                    break;
311                                }
312                            }
313                        }
314                    });
315                }
316                Err(_) => {
317                    log::debug!("System stopped");
318                    return;
319                }
320            }
321        }
322    }
323}
324
325#[derive(Copy, Clone, Debug)]
326pub struct PingRecord {
327    /// Ping start time
328    pub start: Instant,
329    /// Round-trip time, if value is not set then ping is in process
330    pub rtt: Option<Duration>,
331}
332
333async fn ping_arbiter(arb: Arbiter, interval: Duration) {
334    loop {
335        Delay::new(interval).await;
336
337        // check if arbiter is still active
338        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
339
340        if !is_alive {
341            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
342            break;
343        }
344
345        // calc ttl
346        let start = Instant::now();
347        PINGS.with(|pings| {
348            let mut p = pings.borrow_mut();
349            let recs = p.entry(arb.id()).or_default();
350            recs.push_front(PingRecord { start, rtt: None });
351            recs.truncate(10);
352        });
353
354        let result = arb
355            .spawn_with(|| async {
356                yield_to().await;
357            })
358            .await;
359
360        if result.is_err() {
361            break;
362        }
363
364        PINGS.with(|pings| {
365            pings
366                .borrow_mut()
367                .get_mut(&arb.id())
368                .unwrap()
369                .front_mut()
370                .unwrap()
371                .rtt = Some(start.elapsed());
372        });
373    }
374}
375
376async fn yield_to() {
377    use std::task::{Context, Poll};
378
379    struct Yield {
380        completed: bool,
381    }
382
383    impl Future for Yield {
384        type Output = ();
385
386        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
387            if self.completed {
388                return Poll::Ready(());
389            }
390            self.completed = true;
391            cx.waker().wake_by_ref();
392            Poll::Pending
393        }
394    }
395
396    Yield { completed: false }.await;
397}
398
399pub(super) trait FnExec: Send + 'static {
400    fn call_box(self: Box<Self>);
401}
402
403impl<F> FnExec for F
404where
405    F: FnOnce() + Send + 'static,
406{
407    #[allow(clippy::boxed_local)]
408    fn call_box(self: Box<Self>) {
409        (*self)();
410    }
411}