Skip to main content

ntex_rt/
system.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, RwLock, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::Cell, cell::RefCell, fmt, future::Future, panic, pin::Pin};
5
6use async_channel::{Receiver, Sender, unbounded};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Handle, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17        RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22    all: HashMap<Id, Arbiter>,
23    list: Vec<Arbiter>,
24}
25
26/// System id
27#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
28pub struct Id(pub(crate) usize);
29
30/// System is a runtime manager
31pub struct System {
32    id: usize,
33    arbiter: Cell<Option<Arbiter>>,
34    config: SystemConfig,
35    sender: Sender<SystemCommand>,
36    receiver: Receiver<SystemCommand>,
37    rt: Arc<RwLock<Arbiter>>,
38    pool: ThreadPool,
39}
40
41#[derive(Clone)]
42pub struct SystemConfig {
43    pub(super) name: String,
44    pub(super) stack_size: usize,
45    pub(super) stop_on_panic: bool,
46    pub(super) ping_interval: usize,
47    pub(super) pool_limit: usize,
48    pub(super) pool_bounded: bool,
49    pub(super) pool_recv_timeout: Duration,
50    pub(super) testing: bool,
51    pub(super) runner: Arc<dyn Runner>,
52}
53
54thread_local!(
55    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
56);
57
58impl Clone for System {
59    fn clone(&self) -> Self {
60        Self {
61            id: self.id,
62            arbiter: Cell::new(None),
63            config: self.config.clone(),
64            sender: self.sender.clone(),
65            receiver: self.receiver.clone(),
66            rt: self.rt.clone(),
67            pool: self.pool.clone(),
68        }
69    }
70}
71
72impl System {
73    /// Constructs new system and sets it as current
74    pub(super) fn construct(config: SystemConfig) -> Self {
75        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
76        let (sender, receiver) = unbounded();
77
78        let pool = ThreadPool::new(
79            &config.name,
80            config.pool_limit,
81            config.pool_recv_timeout,
82            config.pool_bounded,
83        );
84
85        System {
86            id,
87            config,
88            sender,
89            receiver,
90            pool,
91            rt: Arc::new(RwLock::new(Arbiter::dummy())),
92            arbiter: Cell::new(None),
93        }
94    }
95
96    /// Constructs new system and sets it as current
97    pub(super) fn start(&mut self) -> oneshot::Receiver<i32> {
98        let (stop_tx, stop) = oneshot::channel();
99        let (arb, controller) = Arbiter::new_system(self.id, self.config.name.clone());
100
101        self.arbiter.set(Some(arb.clone()));
102        *self.rt.write().unwrap() = arb.clone();
103        System::set_current(self.clone());
104
105        // system support tasks
106        crate::spawn(SystemSupport::new(self, stop_tx).run(arb.id(), arb));
107        crate::spawn(controller.run());
108
109        stop
110    }
111
112    /// Build a new system with a customized runtime
113    ///
114    /// This allows to customize the runtime. See struct level docs on
115    /// `Builder` for more information.
116    pub fn build() -> Builder {
117        Builder::new()
118    }
119
120    #[allow(clippy::new_ret_no_self)]
121    /// Create new system
122    ///
123    /// This method panics if it can not create runtime
124    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
125        Self::build().name(name).build(runner)
126    }
127
128    #[allow(clippy::new_ret_no_self)]
129    /// Create new system
130    ///
131    /// This method panics if it can not create runtime
132    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
133        Self::build().name(name).build_with(config)
134    }
135
136    /// Get current running system
137    ///
138    /// # Panics
139    ///
140    /// Panics if System is not running
141    pub fn current() -> System {
142        CURRENT.with(|cell| match *cell.borrow() {
143            Some(ref sys) => sys.clone(),
144            None => panic!("System is not running"),
145        })
146    }
147
148    /// Set current running system
149    #[doc(hidden)]
150    pub fn set_current(sys: System) {
151        sys.arbiter().set_current();
152        CURRENT.with(|s| {
153            *s.borrow_mut() = Some(sys);
154        });
155    }
156
157    /// System id
158    pub fn id(&self) -> Id {
159        Id(self.id)
160    }
161
162    /// System name
163    pub fn name(&self) -> &str {
164        &self.config.name
165    }
166
167    /// Stop the system
168    pub fn stop(&self) {
169        self.stop_with_code(0);
170    }
171
172    /// Stop the system with a particular exit code
173    pub fn stop_with_code(&self, code: i32) {
174        let _ = self.sender.try_send(SystemCommand::Exit(code));
175    }
176
177    /// Return status of `stop_on_panic` option
178    ///
179    /// It controls whether the System is stopped when an
180    /// uncaught panic is thrown from a worker thread.
181    pub fn stop_on_panic(&self) -> bool {
182        self.config.stop_on_panic
183    }
184
185    /// System arbiter
186    ///
187    /// # Panics
188    ///
189    /// Panics if system is not started
190    pub fn arbiter(&self) -> Arbiter {
191        if let Some(arb) = self.arbiter.take() {
192            self.arbiter.set(Some(arb.clone()));
193            if arb.hnd.is_some() {
194                return arb;
195            }
196        }
197
198        let arb = self.rt.read().unwrap().clone();
199        self.arbiter.set(Some(arb.clone()));
200        arb
201    }
202
203    /// Retrieves a list of all arbiters in the system
204    ///
205    /// This method should be called from the thread where the system has been initialized,
206    /// typically the "main" thread.
207    pub fn list_arbiters<F, R>(f: F) -> R
208    where
209        F: FnOnce(&[Arbiter]) -> R,
210    {
211        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
212    }
213
214    /// Retrieves a list of last pings records for specified arbiter
215    ///
216    /// This method should be called from the thread where the system has been initialized,
217    /// typically the "main" thread.
218    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
219    where
220        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
221    {
222        PINGS.with(|pings| {
223            if let Some(recs) = pings.borrow().get(&id) {
224                f(Some(recs))
225            } else {
226                f(None)
227            }
228        })
229    }
230
231    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
232        &self.sender
233    }
234
235    /// System config
236    pub fn config(&self) -> SystemConfig {
237        self.config.clone()
238    }
239
240    #[inline]
241    /// Runtime handle for main thread
242    pub fn handle(&self) -> Handle {
243        self.arbiter().handle().clone()
244    }
245
246    /// Testing flag
247    pub fn testing(&self) -> bool {
248        self.config.testing()
249    }
250
251    /// Spawns a blocking task in a new thread, and wait for it
252    ///
253    /// The task will not be cancelled even if the future is dropped.
254    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
255    where
256        F: FnOnce() -> R + Send + 'static,
257        R: Send + 'static,
258    {
259        self.pool.execute(f)
260    }
261}
262
263impl SystemConfig {
264    #[inline]
265    /// Is current system is testing
266    pub fn testing(&self) -> bool {
267        self.testing
268    }
269}
270
271impl fmt::Debug for System {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        f.debug_struct("System")
274            .field("id", &self.id)
275            .field("config", &self.config)
276            .field("pool", &self.pool)
277            .finish()
278    }
279}
280
281impl fmt::Debug for SystemConfig {
282    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283        f.debug_struct("SystemConfig")
284            .field("name", &self.name)
285            .field("testing", &self.testing)
286            .field("stack_size", &self.stack_size)
287            .field("stop_on_panic", &self.stop_on_panic)
288            .finish()
289    }
290}
291
292#[derive(Debug)]
293pub(super) enum SystemCommand {
294    Exit(i32),
295    RegisterArbiter(Id, Arbiter),
296    UnregisterArbiter(Id),
297}
298
299#[derive(Debug)]
300struct SystemSupport {
301    stop: Option<oneshot::Sender<i32>>,
302    commands: Receiver<SystemCommand>,
303    ping_interval: Duration,
304}
305
306impl SystemSupport {
307    fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
308        Self {
309            stop: Some(stop),
310            commands: sys.receiver.clone(),
311            ping_interval: Duration::from_millis(sys.config.ping_interval as u64),
312        }
313    }
314
315    async fn run(mut self, id: Id, arb: Arbiter) {
316        ARBITERS.with(move |arbs| {
317            let mut arbiters = arbs.borrow_mut();
318            arbiters.all.clear();
319            arbiters.list.clear();
320
321            // system arbiter
322            arbiters.all.insert(id, arb.clone());
323            arbiters.list.push(arb.clone());
324            crate::spawn(ping_arbiter(arb, self.ping_interval));
325        });
326
327        loop {
328            match self.commands.recv().await {
329                Ok(SystemCommand::Exit(code)) => {
330                    log::debug!("Stopping system with {code} code");
331
332                    // stop arbiters
333                    ARBITERS.with(move |arbs| {
334                        let mut arbiters = arbs.borrow_mut();
335                        for arb in arbiters.list.drain(..) {
336                            arb.stop();
337                        }
338                        arbiters.all.clear();
339                    });
340
341                    // stop event loop
342                    if let Some(stop) = self.stop.take() {
343                        let _ = stop.send(code);
344                    }
345                }
346                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
347                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
348                    ARBITERS.with(move |arbs| {
349                        let mut arbiters = arbs.borrow_mut();
350                        arbiters.all.insert(id, hnd.clone());
351                        arbiters.list.push(hnd);
352                    });
353                }
354                Ok(SystemCommand::UnregisterArbiter(id)) => {
355                    ARBITERS.with(move |arbs| {
356                        let mut arbiters = arbs.borrow_mut();
357                        if let Some(hnd) = arbiters.all.remove(&id) {
358                            for (idx, arb) in arbiters.list.iter().enumerate() {
359                                if &hnd == arb {
360                                    arbiters.list.remove(idx);
361                                    break;
362                                }
363                            }
364                        }
365                    });
366                }
367                Err(_) => {
368                    log::debug!("System stopped");
369                    return;
370                }
371            }
372        }
373    }
374}
375
376#[derive(Copy, Clone, Debug)]
377pub struct PingRecord {
378    /// Ping start time
379    pub start: Instant,
380    /// Round-trip time, if value is not set then ping is in process
381    pub rtt: Option<Duration>,
382}
383
384async fn ping_arbiter(arb: Arbiter, interval: Duration) {
385    loop {
386        Delay::new(interval).await;
387
388        // check if arbiter is still active
389        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
390
391        if !is_alive {
392            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
393            break;
394        }
395
396        // calc ttl
397        let start = Instant::now();
398        PINGS.with(|pings| {
399            let mut p = pings.borrow_mut();
400            let recs = p.entry(arb.id()).or_default();
401            recs.push_front(PingRecord { start, rtt: None });
402            recs.truncate(10);
403        });
404
405        let result = arb
406            .handle()
407            .spawn(async {
408                yield_to().await;
409            })
410            .await;
411
412        if result.is_err() {
413            break;
414        }
415
416        PINGS.with(|pings| {
417            pings
418                .borrow_mut()
419                .get_mut(&arb.id())
420                .unwrap()
421                .front_mut()
422                .unwrap()
423                .rtt = Some(start.elapsed());
424        });
425    }
426}
427
428async fn yield_to() {
429    use std::task::{Context, Poll};
430
431    struct Yield {
432        completed: bool,
433    }
434
435    impl Future for Yield {
436        type Output = ();
437
438        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
439            if self.completed {
440                return Poll::Ready(());
441            }
442            self.completed = true;
443            cx.waker().wake_by_ref();
444            Poll::Pending
445        }
446    }
447
448    Yield { completed: false }.await;
449}
450
451pub(super) trait FnExec: Send + 'static {
452    fn call_box(self: Box<Self>);
453}
454
455impl<F> FnExec for F
456where
457    F: FnOnce() + Send + 'static,
458{
459    #[allow(clippy::boxed_local)]
460    fn call_box(self: Box<Self>) {
461        (*self)();
462    }
463}