Skip to main content

ntex_rt/
arbiter.rs

1#![allow(clippy::missing_panics_doc)]
2use std::any::{Any, TypeId};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
5
6use async_channel::{Receiver, Sender, unbounded};
7
8use crate::Handle;
9use crate::system::{FnExec, Id, System, SystemCommand};
10
11thread_local!(
12    static ADDR: RefCell<Option<Arbiter>> = const { RefCell::new(None) };
13    static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
14);
15
16pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18pub(super) enum ArbiterCommand {
19    Stop,
20    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
21    ExecuteFn(Box<dyn FnExec>),
22}
23
24/// Arbiters provide an asynchronous execution environment for actors, functions
25/// and futures.
26///
27/// When an Arbiter is created, it spawns a new OS thread, and
28/// hosts an event loop. Some Arbiter functions execute on the current thread.
29pub struct Arbiter {
30    id: usize,
31    pub(crate) sys_id: usize,
32    name: Arc<String>,
33    pub(crate) hnd: Option<Handle>,
34    pub(crate) sender: Sender<ArbiterCommand>,
35    thread_handle: Option<thread::JoinHandle<()>>,
36}
37
38impl fmt::Debug for Arbiter {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "Arbiter({:?})", self.name.as_ref())
41    }
42}
43
44impl Default for Arbiter {
45    fn default() -> Arbiter {
46        Arbiter::new()
47    }
48}
49
50impl Clone for Arbiter {
51    fn clone(&self) -> Self {
52        Self {
53            id: self.id,
54            sys_id: self.sys_id,
55            name: self.name.clone(),
56            sender: self.sender.clone(),
57            hnd: self.hnd.clone(),
58            thread_handle: None,
59        }
60    }
61}
62
63impl Arbiter {
64    #[allow(clippy::borrowed_box)]
65    pub(super) fn new_system(sys_id: usize, name: String) -> (Self, ArbiterController) {
66        let (tx, rx) = unbounded();
67
68        let arb = Arbiter::with_sender(sys_id, 0, Arc::new(name), tx);
69        ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
70        STORAGE.with(|cell| cell.borrow_mut().clear());
71
72        (arb, ArbiterController { rx, stop: None })
73    }
74
75    pub(super) fn dummy() -> Self {
76        Arbiter {
77            id: 0,
78            hnd: None,
79            name: String::new().into(),
80            sys_id: 0,
81            sender: unbounded().0,
82            thread_handle: None,
83        }
84    }
85
86    /// Returns the current thread's arbiter's address
87    ///
88    /// # Panics
89    ///
90    /// Panics if Arbiter is not running
91    pub fn current() -> Arbiter {
92        ADDR.with(|cell| match *cell.borrow() {
93            Some(ref addr) => addr.clone(),
94            None => panic!("Arbiter is not running"),
95        })
96    }
97
98    pub(crate) fn set_current(&self) {
99        ADDR.with(|cell| {
100            *cell.borrow_mut() = Some(self.clone());
101        });
102    }
103
104    /// Stop arbiter from continuing it's event loop.
105    pub fn stop(&self) {
106        let _ = self.sender.try_send(ArbiterCommand::Stop);
107    }
108
109    /// Spawn new thread and run runtime in spawned thread.
110    /// Returns address of newly created arbiter.
111    pub fn new() -> Arbiter {
112        let id = COUNT.load(Ordering::Relaxed) + 1;
113        Arbiter::with_name(format!("{}:arb:{}", System::current().name(), id))
114    }
115
116    /// Spawn new thread and run runtime in spawned thread
117    ///
118    /// Returns address of newly created arbiter.
119    pub fn with_name(name: String) -> Arbiter {
120        let id = COUNT.fetch_add(1, Ordering::Relaxed);
121        let sys = System::current();
122        let name2 = Arc::new(name.clone());
123        let config = sys.config();
124        let (arb_tx, arb_rx) = unbounded();
125        let arb_tx2 = arb_tx.clone();
126
127        let builder = if sys.config().stack_size > 0 {
128            thread::Builder::new()
129                .name(name)
130                .stack_size(sys.config().stack_size)
131        } else {
132            thread::Builder::new().name(name)
133        };
134
135        let name = name2.clone();
136        let sys_id = sys.id();
137        let (arb_hnd_tx, arb_hnd_rx) = oneshot::channel();
138
139        let handle = builder
140            .spawn(move || {
141                log::info!("Starting {name2:?} arbiter");
142
143                let (stop, stop_rx) = oneshot::channel();
144                STORAGE.with(|cell| cell.borrow_mut().clear());
145
146                System::set_current(sys.clone());
147
148                crate::driver::block_on(config.runner.as_ref(), async move {
149                    let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
150                    arb_hnd_tx
151                        .send(arb.hnd.clone())
152                        .expect("Controller thread has gone");
153
154                    // start arbiter controller
155                    crate::spawn(
156                        ArbiterController {
157                            stop: Some(stop),
158                            rx: arb_rx,
159                        }
160                        .run(),
161                    );
162                    ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
163
164                    // register arbiter
165                    let _ = sys
166                        .sys()
167                        .try_send(SystemCommand::RegisterArbiter(Id(id), arb));
168
169                    // run loop
170                    let _ = stop_rx.await;
171                });
172
173                // unregister arbiter
174                let _ = System::current()
175                    .sys()
176                    .try_send(SystemCommand::UnregisterArbiter(Id(id)));
177
178                STORAGE.with(|cell| cell.borrow_mut().clear());
179            })
180            .unwrap_or_else(|err| {
181                panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
182            });
183
184        let hnd = arb_hnd_rx.recv().expect("Could not start new arbiter");
185
186        Arbiter {
187            id,
188            hnd,
189            name,
190            sys_id: sys_id.0,
191            sender: arb_tx2,
192            thread_handle: Some(handle),
193        }
194    }
195
196    fn with_sender(
197        sys_id: usize,
198        id: usize,
199        name: Arc<String>,
200        sender: Sender<ArbiterCommand>,
201    ) -> Self {
202        #[cfg(feature = "tokio")]
203        let hnd = { Handle::new(sender.clone()) };
204
205        #[cfg(feature = "compio")]
206        let hnd = { Handle::new(sender.clone()) };
207
208        #[cfg(all(not(feature = "compio"), not(feature = "tokio")))]
209        let hnd = { Handle::current() };
210
211        Self {
212            id,
213            sys_id,
214            name,
215            sender,
216            hnd: Some(hnd),
217            thread_handle: None,
218        }
219    }
220
221    /// Id of the arbiter
222    pub fn id(&self) -> Id {
223        Id(self.id)
224    }
225
226    /// Name of the arbiter
227    pub fn name(&self) -> &str {
228        self.name.as_ref()
229    }
230
231    #[inline]
232    /// Handle to a runtime
233    pub fn handle(&self) -> &Handle {
234        self.hnd.as_ref().unwrap()
235    }
236
237    #[doc(hidden)]
238    #[deprecated(since = "3.8.0", note = "use `ntex_rt::spawn()`")]
239    /// Send a future to the Arbiter's thread, and spawn it.
240    pub fn spawn<F>(&self, future: F)
241    where
242        F: Future<Output = ()> + Send + 'static,
243    {
244        let _ = self
245            .sender
246            .try_send(ArbiterCommand::Execute(Box::pin(future)));
247    }
248
249    #[doc(hidden)]
250    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
251    /// Send a function to the Arbiter's thread and spawns it's resulting future.
252    /// This can be used to spawn non-send futures on the arbiter thread.
253    pub fn spawn_with<F, R, O>(
254        &self,
255        f: F,
256    ) -> impl Future<Output = Result<O, oneshot::RecvError>> + Send + 'static
257    where
258        F: FnOnce() -> R + Send + 'static,
259        R: Future<Output = O> + 'static,
260        O: Send + 'static,
261    {
262        let (tx, rx) = oneshot::async_channel();
263        let _ = self
264            .sender
265            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
266                crate::spawn(async move {
267                    let _ = tx.send(f().await);
268                });
269            })));
270        rx
271    }
272
273    #[doc(hidden)]
274    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
275    /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
276    /// A future is created, and when resolved will contain the result of the function sent
277    /// to the Arbiters thread.
278    pub fn exec<F, R>(
279        &self,
280        f: F,
281    ) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
282    where
283        F: FnOnce() -> R + Send + 'static,
284        R: Send + 'static,
285    {
286        let (tx, rx) = oneshot::async_channel();
287        let _ = self
288            .sender
289            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
290                let _ = tx.send(f());
291            })));
292        rx
293    }
294
295    #[doc(hidden)]
296    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
297    /// Send a function to the Arbiter's thread, and execute it. Any result from the function
298    /// is discarded.
299    pub fn exec_fn<F>(&self, f: F)
300    where
301        F: FnOnce() + Send + 'static,
302    {
303        let _ = self
304            .sender
305            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
306                f();
307            })));
308    }
309
310    #[doc(hidden)]
311    #[deprecated(since = "3.8.0", note = "use `ntex_rt::set_item()`")]
312    /// Set item to current arbiter's storage
313    pub fn set_item<T: 'static>(item: T) {
314        set_item(item);
315    }
316
317    #[doc(hidden)]
318    #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
319    /// Check if arbiter storage contains item
320    pub fn contains_item<T: 'static>() -> bool {
321        STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
322    }
323
324    #[doc(hidden)]
325    #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
326    /// Get a reference to a type previously inserted on this arbiter's storage
327    ///
328    /// # Panics
329    ///
330    /// Panics if item is not inserted
331    pub fn get_item<T: 'static, F, R>(f: F) -> R
332    where
333        F: FnOnce(&T) -> R,
334    {
335        STORAGE.with(move |cell| {
336            let mut st = cell.borrow_mut();
337            let item = st
338                .get_mut(&TypeId::of::<T>())
339                .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
340                .unwrap();
341            f(item)
342        })
343    }
344
345    /// Get a type previously inserted to this runtime or create new one.
346    pub fn get_value<T, F>(f: F) -> T
347    where
348        T: Clone + 'static,
349        F: FnOnce() -> T,
350    {
351        STORAGE.with(move |cell| {
352            let mut st = cell.borrow_mut();
353            if let Some(boxed) = st.get(&TypeId::of::<T>())
354                && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
355            {
356                return val.clone();
357            }
358            let val = f();
359            st.insert(TypeId::of::<T>(), Box::new(val.clone()));
360            val
361        })
362    }
363
364    /// Wait for the event loop to stop by joining the underlying thread (if have Some).
365    pub fn join(&mut self) -> thread::Result<()> {
366        if let Some(thread_handle) = self.thread_handle.take() {
367            thread_handle.join()
368        } else {
369            Ok(())
370        }
371    }
372}
373
374impl Eq for Arbiter {}
375
376impl PartialEq for Arbiter {
377    fn eq(&self, other: &Self) -> bool {
378        self.id == other.id && self.sys_id == other.sys_id
379    }
380}
381
382pub(crate) struct ArbiterController {
383    stop: Option<oneshot::Sender<i32>>,
384    rx: Receiver<ArbiterCommand>,
385}
386
387impl Drop for ArbiterController {
388    fn drop(&mut self) {
389        if thread::panicking() {
390            if System::current().stop_on_panic() {
391                eprintln!("Panic in Arbiter thread, shutting down system.");
392                System::current().stop_with_code(1);
393            } else {
394                eprintln!("Panic in Arbiter thread.");
395            }
396        }
397    }
398}
399
400impl ArbiterController {
401    pub(super) async fn run(mut self) {
402        loop {
403            match self.rx.recv().await {
404                Ok(ArbiterCommand::Stop) => {
405                    if let Some(stop) = self.stop.take() {
406                        let _ = stop.send(0);
407                    }
408                    break;
409                }
410                Ok(ArbiterCommand::Execute(fut)) => {
411                    crate::spawn(fut);
412                }
413                Ok(ArbiterCommand::ExecuteFn(f)) => {
414                    f.call_box();
415                }
416                Err(_) => break,
417            }
418        }
419    }
420}
421
422/// Set item to current runtime's storage
423pub fn set_item<T: 'static>(item: T) {
424    STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
425}
426
427/// Get a reference to a type previously inserted on this runtime's storage
428pub fn get_item<T: Clone + 'static>() -> Option<T> {
429    STORAGE.with(move |cell| {
430        cell.borrow()
431            .get(&TypeId::of::<T>())
432            .and_then(|boxed| boxed.downcast_ref())
433            .cloned()
434    })
435}