requiem_rt/
system.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::io;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6use futures::channel::mpsc::UnboundedSender;
7use tokio::task::LocalSet;
8
9use crate::arbiter::{Arbiter, SystemCommand};
10use crate::builder::{Builder, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14/// System is a runtime manager.
15#[derive(Clone, Debug)]
16pub struct System {
17    id: usize,
18    sys: UnboundedSender<SystemCommand>,
19    arbiter: Arbiter,
20    stop_on_panic: bool,
21}
22
23thread_local!(
24    static CURRENT: RefCell<Option<System>> = RefCell::new(None);
25);
26
27impl System {
28    /// Constructs new system and sets it as current
29    pub(crate) fn construct(
30        sys: UnboundedSender<SystemCommand>,
31        arbiter: Arbiter,
32        stop_on_panic: bool,
33    ) -> Self {
34        let sys = System {
35            sys,
36            arbiter,
37            stop_on_panic,
38            id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
39        };
40        System::set_current(sys.clone());
41        sys
42    }
43
44    /// Build a new system with a customized tokio runtime.
45    ///
46    /// This allows to customize the runtime. See struct level docs on
47    /// `Builder` for more information.
48    pub fn builder() -> Builder {
49        Builder::new()
50    }
51
52    #[allow(clippy::new_ret_no_self)]
53    /// Create new system.
54    ///
55    /// This method panics if it can not create tokio runtime
56    pub fn new<T: Into<String>>(name: T) -> SystemRunner {
57        Self::builder().name(name).build()
58    }
59
60    #[allow(clippy::new_ret_no_self)]
61    /// Create new system using provided tokio Handle.
62    ///
63    /// This method panics if it can not spawn system arbiter
64    pub fn run_in_tokio<T: Into<String>>(
65        name: T,
66        local: &LocalSet,
67    ) -> impl Future<Output = io::Result<()>> {
68        Self::builder()
69            .name(name)
70            .build_async(local)
71            .run_nonblocking()
72    }
73
74    /// Get current running system.
75    pub fn current() -> System {
76        CURRENT.with(|cell| match *cell.borrow() {
77            Some(ref sys) => sys.clone(),
78            None => panic!("System is not running"),
79        })
80    }
81
82    /// Set current running system.
83    pub(crate) fn is_set() -> bool {
84        CURRENT.with(|cell| cell.borrow().is_some())
85    }
86
87    /// Set current running system.
88    #[doc(hidden)]
89    pub fn set_current(sys: System) {
90        CURRENT.with(|s| {
91            *s.borrow_mut() = Some(sys);
92        })
93    }
94
95    /// Execute function with system reference.
96    pub fn with_current<F, R>(f: F) -> R
97    where
98        F: FnOnce(&System) -> R,
99    {
100        CURRENT.with(|cell| match *cell.borrow() {
101            Some(ref sys) => f(sys),
102            None => panic!("System is not running"),
103        })
104    }
105
106    /// System id
107    pub fn id(&self) -> usize {
108        self.id
109    }
110
111    /// Stop the system
112    pub fn stop(&self) {
113        self.stop_with_code(0)
114    }
115
116    /// Stop the system with a particular exit code.
117    pub fn stop_with_code(&self, code: i32) {
118        let _ = self.sys.unbounded_send(SystemCommand::Exit(code));
119    }
120
121    pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> {
122        &self.sys
123    }
124
125    /// Return status of 'stop_on_panic' option which controls whether the System is stopped when an
126    /// uncaught panic is thrown from a worker thread.
127    pub fn stop_on_panic(&self) -> bool {
128        self.stop_on_panic
129    }
130
131    /// System arbiter
132    pub fn arbiter(&self) -> &Arbiter {
133        &self.arbiter
134    }
135
136    /// This function will start tokio runtime and will finish once the
137    /// `System::stop()` message get called.
138    /// Function `f` get called within tokio runtime context.
139    pub fn run<F>(f: F) -> io::Result<()>
140    where
141        F: FnOnce() + 'static,
142    {
143        Self::builder().run(f)
144    }
145}