Skip to main content

ntex_rt/
arbiter.rs

1#![allow(clippy::missing_panics_doc)]
2use std::any::{Any, TypeId};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
5
6use async_channel::{Receiver, Sender, unbounded};
7
8use crate::system::{FnExec, Id, System, SystemCommand};
9
10thread_local!(
11    static ADDR: RefCell<Option<Arbiter>> = const { RefCell::new(None) };
12    static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
13);
14
15pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
16
17pub(super) enum ArbiterCommand {
18    Stop,
19    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
20    ExecuteFn(Box<dyn FnExec>),
21}
22
23/// Arbiters provide an asynchronous execution environment for actors, functions
24/// and futures.
25///
26/// When an Arbiter is created, it spawns a new OS thread, and
27/// hosts an event loop. Some Arbiter functions execute on the current thread.
28pub struct Arbiter {
29    id: usize,
30    pub(crate) sys_id: usize,
31    name: Arc<String>,
32    sender: Sender<ArbiterCommand>,
33    thread_handle: Option<thread::JoinHandle<()>>,
34}
35
36impl fmt::Debug for Arbiter {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "Arbiter({:?})", self.name.as_ref())
39    }
40}
41
42impl Default for Arbiter {
43    fn default() -> Arbiter {
44        Arbiter::new()
45    }
46}
47
48impl Clone for Arbiter {
49    fn clone(&self) -> Self {
50        Self::with_sender(self.sys_id, self.id, self.name.clone(), self.sender.clone())
51    }
52}
53
54impl Arbiter {
55    #[allow(clippy::borrowed_box)]
56    pub(super) fn new_system(name: String) -> (Self, ArbiterController) {
57        let (tx, rx) = unbounded();
58
59        let arb = Arbiter::with_sender(0, 0, Arc::new(name), tx);
60        ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
61        STORAGE.with(|cell| cell.borrow_mut().clear());
62
63        (arb, ArbiterController { rx, stop: None })
64    }
65
66    /// Returns the current thread's arbiter's address
67    ///
68    /// # Panics
69    ///
70    /// Panics if Arbiter is not running
71    pub fn current() -> Arbiter {
72        ADDR.with(|cell| match *cell.borrow() {
73            Some(ref addr) => addr.clone(),
74            None => panic!("Arbiter is not running"),
75        })
76    }
77
78    /// Stop arbiter from continuing it's event loop.
79    pub fn stop(&self) {
80        let _ = self.sender.try_send(ArbiterCommand::Stop);
81    }
82
83    /// Spawn new thread and run runtime in spawned thread.
84    /// Returns address of newly created arbiter.
85    pub fn new() -> Arbiter {
86        let id = COUNT.load(Ordering::Relaxed) + 1;
87        Arbiter::with_name(format!("{}:arb:{}", System::current().name(), id))
88    }
89
90    /// Spawn new thread and run runtime in spawned thread
91    ///
92    /// Returns address of newly created arbiter.
93    pub fn with_name(name: String) -> Arbiter {
94        let id = COUNT.fetch_add(1, Ordering::Relaxed);
95        let sys = System::current();
96        let name2 = Arc::new(name.clone());
97        let config = sys.config();
98        let (arb_tx, arb_rx) = unbounded();
99        let arb_tx2 = arb_tx.clone();
100
101        let builder = if sys.config().stack_size > 0 {
102            thread::Builder::new()
103                .name(name)
104                .stack_size(sys.config().stack_size)
105        } else {
106            thread::Builder::new().name(name)
107        };
108
109        let name = name2.clone();
110        let sys_id = sys.id();
111
112        let handle = builder
113            .spawn(move || {
114                log::info!("Starting {name2:?} arbiter");
115
116                let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
117
118                let (stop, stop_rx) = oneshot::channel();
119                STORAGE.with(|cell| cell.borrow_mut().clear());
120
121                System::set_current(sys);
122
123                config.block_on(async move {
124                    // start arbiter controller
125                    crate::spawn(
126                        ArbiterController {
127                            stop: Some(stop),
128                            rx: arb_rx,
129                        }
130                        .run(),
131                    );
132                    ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
133
134                    // register arbiter
135                    let _ = System::current()
136                        .sys()
137                        .try_send(SystemCommand::RegisterArbiter(Id(id), arb));
138
139                    // run loop
140                    let _ = stop_rx.await;
141                });
142
143                // unregister arbiter
144                let _ = System::current()
145                    .sys()
146                    .try_send(SystemCommand::UnregisterArbiter(Id(id)));
147            })
148            .unwrap_or_else(|err| {
149                panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
150            });
151
152        Arbiter {
153            id,
154            name,
155            sys_id: sys_id.0,
156            sender: arb_tx2,
157            thread_handle: Some(handle),
158        }
159    }
160
161    fn with_sender(
162        sys_id: usize,
163        id: usize,
164        name: Arc<String>,
165        sender: Sender<ArbiterCommand>,
166    ) -> Self {
167        Self {
168            id,
169            sys_id,
170            name,
171            sender,
172            thread_handle: None,
173        }
174    }
175
176    /// Id of the arbiter
177    pub fn id(&self) -> Id {
178        Id(self.id)
179    }
180
181    /// Name of the arbiter
182    pub fn name(&self) -> &str {
183        self.name.as_ref()
184    }
185
186    /// Send a future to the Arbiter's thread, and spawn it.
187    pub fn spawn<F>(&self, future: F)
188    where
189        F: Future<Output = ()> + Send + 'static,
190    {
191        let _ = self
192            .sender
193            .try_send(ArbiterCommand::Execute(Box::pin(future)));
194    }
195
196    #[rustfmt::skip]
197    /// Send a function to the Arbiter's thread and spawns it's resulting future.
198    /// This can be used to spawn non-send futures on the arbiter thread.
199    pub fn spawn_with<F, R, O>(
200        &self,
201        f: F
202    ) -> impl Future<Output = Result<O, oneshot::RecvError>> + Send + 'static
203    where
204        F: FnOnce() -> R + Send + 'static,
205        R: Future<Output = O> + 'static,
206        O: Send + 'static,
207    {
208        let (tx, rx) = oneshot::channel();
209        let _ = self
210            .sender
211            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
212                crate::spawn(async move {
213                    let _ = tx.send(f().await);
214                });
215            })));
216        rx
217    }
218
219    #[rustfmt::skip]
220    /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
221    /// A future is created, and when resolved will contain the result of the function sent
222    /// to the Arbiters thread.
223    pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
224    where
225        F: FnOnce() -> R + Send + 'static,
226        R: Send + 'static,
227    {
228        let (tx, rx) = oneshot::channel();
229        let _ = self
230            .sender
231            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
232                let _ = tx.send(f());
233            })));
234        rx
235    }
236
237    /// Send a function to the Arbiter's thread, and execute it. Any result from the function
238    /// is discarded.
239    pub fn exec_fn<F>(&self, f: F)
240    where
241        F: FnOnce() + Send + 'static,
242    {
243        let _ = self
244            .sender
245            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
246                f();
247            })));
248    }
249
250    /// Set item to current arbiter's storage
251    pub fn set_item<T: 'static>(item: T) {
252        STORAGE
253            .with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
254    }
255
256    /// Check if arbiter storage contains item
257    pub fn contains_item<T: 'static>() -> bool {
258        STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
259    }
260
261    /// Get a reference to a type previously inserted on this arbiter's storage
262    ///
263    /// # Panics
264    ///
265    /// Panics if item is not inserted
266    pub fn get_item<T: 'static, F, R>(f: F) -> R
267    where
268        F: FnOnce(&T) -> R,
269    {
270        STORAGE.with(move |cell| {
271            let mut st = cell.borrow_mut();
272            let item = st
273                .get_mut(&TypeId::of::<T>())
274                .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
275                .unwrap();
276            f(item)
277        })
278    }
279
280    /// Get a type previously inserted to this runtime or create new one.
281    pub fn get_value<T, F>(f: F) -> T
282    where
283        T: Clone + 'static,
284        F: FnOnce() -> T,
285    {
286        STORAGE.with(move |cell| {
287            let mut st = cell.borrow_mut();
288            if let Some(boxed) = st.get(&TypeId::of::<T>())
289                && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
290            {
291                return val.clone();
292            }
293            let val = f();
294            st.insert(TypeId::of::<T>(), Box::new(val.clone()));
295            val
296        })
297    }
298
299    /// Wait for the event loop to stop by joining the underlying thread (if have Some).
300    pub fn join(&mut self) -> thread::Result<()> {
301        if let Some(thread_handle) = self.thread_handle.take() {
302            thread_handle.join()
303        } else {
304            Ok(())
305        }
306    }
307}
308
309impl Eq for Arbiter {}
310
311impl PartialEq for Arbiter {
312    fn eq(&self, other: &Self) -> bool {
313        self.id == other.id && self.sys_id == other.sys_id
314    }
315}
316
317pub(crate) struct ArbiterController {
318    stop: Option<oneshot::Sender<i32>>,
319    rx: Receiver<ArbiterCommand>,
320}
321
322impl Drop for ArbiterController {
323    fn drop(&mut self) {
324        if thread::panicking() {
325            if System::current().stop_on_panic() {
326                eprintln!("Panic in Arbiter thread, shutting down system.");
327                System::current().stop_with_code(1);
328            } else {
329                eprintln!("Panic in Arbiter thread.");
330            }
331        }
332    }
333}
334
335impl ArbiterController {
336    pub(super) async fn run(mut self) {
337        loop {
338            match self.rx.recv().await {
339                Ok(ArbiterCommand::Stop) => {
340                    if let Some(stop) = self.stop.take() {
341                        let _ = stop.send(0);
342                    }
343                    break;
344                }
345                Ok(ArbiterCommand::Execute(fut)) => {
346                    crate::spawn(fut);
347                }
348                Ok(ArbiterCommand::ExecuteFn(f)) => {
349                    f.call_box();
350                }
351                Err(_) => break,
352            }
353        }
354    }
355}