Skip to main content

ntex_rt/
system.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, RwLock, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::Cell, cell::RefCell, fmt, future::Future, panic, pin::Pin};
5
6use async_channel::{Receiver, Sender, unbounded};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Handle, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17        RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22    all: HashMap<Id, Arbiter>,
23    list: Vec<Arbiter>,
24}
25
26/// System id
27#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
28pub struct Id(pub(crate) usize);
29
30/// System is a runtime manager
31pub struct System {
32    id: usize,
33    arbiter: Cell<Option<Arbiter>>,
34    config: SystemConfig,
35    sender: Sender<SystemCommand>,
36    receiver: Receiver<SystemCommand>,
37    rt: Arc<RwLock<Arbiter>>,
38    pool: ThreadPool,
39}
40
41#[derive(Clone)]
42pub struct SystemConfig {
43    pub(super) name: String,
44    pub(super) stack_size: usize,
45    pub(super) stop_on_panic: bool,
46    pub(super) ping_interval: usize,
47    pub(super) pool_limit: usize,
48    pub(super) pool_recv_timeout: Duration,
49    pub(super) testing: bool,
50    pub(super) runner: Arc<dyn Runner>,
51}
52
53thread_local!(
54    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
55);
56
57impl Clone for System {
58    fn clone(&self) -> Self {
59        Self {
60            id: self.id,
61            arbiter: Cell::new(None),
62            config: self.config.clone(),
63            sender: self.sender.clone(),
64            receiver: self.receiver.clone(),
65            rt: self.rt.clone(),
66            pool: self.pool.clone(),
67        }
68    }
69}
70
71impl System {
72    /// Constructs new system and sets it as current
73    pub(super) fn construct(config: SystemConfig) -> Self {
74        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
75        let (sender, receiver) = unbounded();
76
77        let pool =
78            ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
79
80        System {
81            id,
82            config,
83            sender,
84            receiver,
85            pool,
86            rt: Arc::new(RwLock::new(Arbiter::dummy())),
87            arbiter: Cell::new(None),
88        }
89    }
90
91    /// Constructs new system and sets it as current
92    pub(super) fn start(&mut self) -> oneshot::Receiver<i32> {
93        let (stop_tx, stop) = oneshot::channel();
94        let (arb, controller) = Arbiter::new_system(self.id, self.config.name.clone());
95
96        self.arbiter.set(Some(arb.clone()));
97        *self.rt.write().unwrap() = arb.clone();
98        System::set_current(self.clone());
99
100        // system support tasks
101        crate::spawn(SystemSupport::new(self, stop_tx).run(arb.id(), arb));
102        crate::spawn(controller.run());
103
104        stop
105    }
106
107    /// Build a new system with a customized runtime
108    ///
109    /// This allows to customize the runtime. See struct level docs on
110    /// `Builder` for more information.
111    pub fn build() -> Builder {
112        Builder::new()
113    }
114
115    #[allow(clippy::new_ret_no_self)]
116    /// Create new system
117    ///
118    /// This method panics if it can not create runtime
119    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
120        Self::build().name(name).build(runner)
121    }
122
123    #[allow(clippy::new_ret_no_self)]
124    /// Create new system
125    ///
126    /// This method panics if it can not create runtime
127    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
128        Self::build().name(name).build_with(config)
129    }
130
131    /// Get current running system
132    ///
133    /// # Panics
134    ///
135    /// Panics if System is not running
136    pub fn current() -> System {
137        CURRENT.with(|cell| match *cell.borrow() {
138            Some(ref sys) => sys.clone(),
139            None => panic!("System is not running"),
140        })
141    }
142
143    /// Set current running system
144    #[doc(hidden)]
145    pub fn set_current(sys: System) {
146        sys.arbiter().set_current();
147        CURRENT.with(|s| {
148            *s.borrow_mut() = Some(sys);
149        });
150    }
151
152    /// System id
153    pub fn id(&self) -> Id {
154        Id(self.id)
155    }
156
157    /// System name
158    pub fn name(&self) -> &str {
159        &self.config.name
160    }
161
162    /// Stop the system
163    pub fn stop(&self) {
164        self.stop_with_code(0);
165    }
166
167    /// Stop the system with a particular exit code
168    pub fn stop_with_code(&self, code: i32) {
169        let _ = self.sender.try_send(SystemCommand::Exit(code));
170    }
171
172    /// Return status of `stop_on_panic` option
173    ///
174    /// It controls whether the System is stopped when an
175    /// uncaught panic is thrown from a worker thread.
176    pub fn stop_on_panic(&self) -> bool {
177        self.config.stop_on_panic
178    }
179
180    /// System arbiter
181    ///
182    /// # Panics
183    ///
184    /// Panics if system is not started
185    pub fn arbiter(&self) -> Arbiter {
186        if let Some(arb) = self.arbiter.take() {
187            self.arbiter.set(Some(arb.clone()));
188            if arb.hnd.is_some() {
189                return arb;
190            }
191        }
192
193        let arb = self.rt.read().unwrap().clone();
194        self.arbiter.set(Some(arb.clone()));
195        arb
196    }
197
198    /// Retrieves a list of all arbiters in the system
199    ///
200    /// This method should be called from the thread where the system has been initialized,
201    /// typically the "main" thread.
202    pub fn list_arbiters<F, R>(f: F) -> R
203    where
204        F: FnOnce(&[Arbiter]) -> R,
205    {
206        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
207    }
208
209    /// Retrieves a list of last pings records for specified arbiter
210    ///
211    /// This method should be called from the thread where the system has been initialized,
212    /// typically the "main" thread.
213    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
214    where
215        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
216    {
217        PINGS.with(|pings| {
218            if let Some(recs) = pings.borrow().get(&id) {
219                f(Some(recs))
220            } else {
221                f(None)
222            }
223        })
224    }
225
226    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
227        &self.sender
228    }
229
230    /// System config
231    pub fn config(&self) -> SystemConfig {
232        self.config.clone()
233    }
234
235    #[inline]
236    /// Runtime handle for main thread
237    pub fn handle(&self) -> Handle {
238        self.arbiter().handle().clone()
239    }
240
241    /// Testing flag
242    pub fn testing(&self) -> bool {
243        self.config.testing()
244    }
245
246    /// Spawns a blocking task in a new thread, and wait for it
247    ///
248    /// The task will not be cancelled even if the future is dropped.
249    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
250    where
251        F: FnOnce() -> R + Send + 'static,
252        R: Send + 'static,
253    {
254        self.pool.dispatch(f)
255    }
256}
257
258impl SystemConfig {
259    #[inline]
260    /// Is current system is testing
261    pub fn testing(&self) -> bool {
262        self.testing
263    }
264}
265
266impl fmt::Debug for System {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        f.debug_struct("System")
269            .field("id", &self.id)
270            .field("config", &self.config)
271            .field("pool", &self.pool)
272            .finish()
273    }
274}
275
276impl fmt::Debug for SystemConfig {
277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278        f.debug_struct("SystemConfig")
279            .field("name", &self.name)
280            .field("testing", &self.testing)
281            .field("stack_size", &self.stack_size)
282            .field("stop_on_panic", &self.stop_on_panic)
283            .finish()
284    }
285}
286
287#[derive(Debug)]
288pub(super) enum SystemCommand {
289    Exit(i32),
290    RegisterArbiter(Id, Arbiter),
291    UnregisterArbiter(Id),
292}
293
294#[derive(Debug)]
295struct SystemSupport {
296    stop: Option<oneshot::Sender<i32>>,
297    commands: Receiver<SystemCommand>,
298    ping_interval: Duration,
299}
300
301impl SystemSupport {
302    fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
303        Self {
304            stop: Some(stop),
305            commands: sys.receiver.clone(),
306            ping_interval: Duration::from_millis(sys.config.ping_interval as u64),
307        }
308    }
309
310    async fn run(mut self, id: Id, arb: Arbiter) {
311        ARBITERS.with(move |arbs| {
312            let mut arbiters = arbs.borrow_mut();
313            arbiters.all.clear();
314            arbiters.list.clear();
315
316            // system arbiter
317            arbiters.all.insert(id, arb.clone());
318            arbiters.list.push(arb.clone());
319            crate::spawn(ping_arbiter(arb, self.ping_interval));
320        });
321
322        loop {
323            match self.commands.recv().await {
324                Ok(SystemCommand::Exit(code)) => {
325                    log::debug!("Stopping system with {code} code");
326
327                    // stop arbiters
328                    ARBITERS.with(move |arbs| {
329                        let mut arbiters = arbs.borrow_mut();
330                        for arb in arbiters.list.drain(..) {
331                            arb.stop();
332                        }
333                        arbiters.all.clear();
334                    });
335
336                    // stop event loop
337                    if let Some(stop) = self.stop.take() {
338                        let _ = stop.send(code);
339                    }
340                }
341                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
342                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
343                    ARBITERS.with(move |arbs| {
344                        let mut arbiters = arbs.borrow_mut();
345                        arbiters.all.insert(id, hnd.clone());
346                        arbiters.list.push(hnd);
347                    });
348                }
349                Ok(SystemCommand::UnregisterArbiter(id)) => {
350                    ARBITERS.with(move |arbs| {
351                        let mut arbiters = arbs.borrow_mut();
352                        if let Some(hnd) = arbiters.all.remove(&id) {
353                            for (idx, arb) in arbiters.list.iter().enumerate() {
354                                if &hnd == arb {
355                                    arbiters.list.remove(idx);
356                                    break;
357                                }
358                            }
359                        }
360                    });
361                }
362                Err(_) => {
363                    log::debug!("System stopped");
364                    return;
365                }
366            }
367        }
368    }
369}
370
371#[derive(Copy, Clone, Debug)]
372pub struct PingRecord {
373    /// Ping start time
374    pub start: Instant,
375    /// Round-trip time, if value is not set then ping is in process
376    pub rtt: Option<Duration>,
377}
378
379async fn ping_arbiter(arb: Arbiter, interval: Duration) {
380    loop {
381        Delay::new(interval).await;
382
383        // check if arbiter is still active
384        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
385
386        if !is_alive {
387            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
388            break;
389        }
390
391        // calc ttl
392        let start = Instant::now();
393        PINGS.with(|pings| {
394            let mut p = pings.borrow_mut();
395            let recs = p.entry(arb.id()).or_default();
396            recs.push_front(PingRecord { start, rtt: None });
397            recs.truncate(10);
398        });
399
400        let result = arb
401            .handle()
402            .spawn(async {
403                yield_to().await;
404            })
405            .await;
406
407        if result.is_err() {
408            break;
409        }
410
411        PINGS.with(|pings| {
412            pings
413                .borrow_mut()
414                .get_mut(&arb.id())
415                .unwrap()
416                .front_mut()
417                .unwrap()
418                .rtt = Some(start.elapsed());
419        });
420    }
421}
422
423async fn yield_to() {
424    use std::task::{Context, Poll};
425
426    struct Yield {
427        completed: bool,
428    }
429
430    impl Future for Yield {
431        type Output = ();
432
433        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
434            if self.completed {
435                return Poll::Ready(());
436            }
437            self.completed = true;
438            cx.waker().wake_by_ref();
439            Poll::Pending
440        }
441    }
442
443    Yield { completed: false }.await;
444}
445
446pub(super) trait FnExec: Send + 'static {
447    fn call_box(self: Box<Self>);
448}
449
450impl<F> FnExec for F
451where
452    F: FnOnce() + Send + 'static,
453{
454    #[allow(clippy::boxed_local)]
455    fn call_box(self: Box<Self>) {
456        (*self)();
457    }
458}