Skip to main content

ntex_rt/
arbiter.rs

1#![allow(clippy::missing_panics_doc)]
2use std::any::{Any, TypeId};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
5
6use async_channel::{Receiver, Sender, unbounded};
7
8use crate::Handle;
9use crate::system::{FnExec, Id, System, SystemCommand};
10
11thread_local!(
12    static ADDR: RefCell<Option<Arbiter>> = const { RefCell::new(None) };
13    static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
14);
15
16pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18pub(super) enum ArbiterCommand {
19    Stop,
20    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
21    ExecuteFn(Box<dyn FnExec>),
22}
23
24/// Arbiters provide an asynchronous execution environment for actors, functions
25/// and futures.
26///
27/// When an Arbiter is created, it spawns a new OS thread, and
28/// hosts an event loop. Some Arbiter functions execute on the current thread.
29pub struct Arbiter {
30    id: usize,
31    pub(crate) sys_id: usize,
32    name: Arc<String>,
33    pub(crate) hnd: Option<Handle>,
34    pub(crate) sender: Sender<ArbiterCommand>,
35    thread_handle: Option<thread::JoinHandle<()>>,
36}
37
38impl fmt::Debug for Arbiter {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "Arbiter({:?})", self.name.as_ref())
41    }
42}
43
44impl Default for Arbiter {
45    fn default() -> Arbiter {
46        Arbiter::new()
47    }
48}
49
50impl Clone for Arbiter {
51    fn clone(&self) -> Self {
52        Self {
53            id: self.id,
54            sys_id: self.sys_id,
55            name: self.name.clone(),
56            sender: self.sender.clone(),
57            hnd: self.hnd.clone(),
58            thread_handle: None,
59        }
60    }
61}
62
63impl Arbiter {
64    #[allow(clippy::borrowed_box)]
65    pub(super) fn new_system(sys: System, name: String) -> (Self, ArbiterController) {
66        let (tx, rx) = unbounded();
67
68        let arb = Arbiter::with_sender(sys.id().0, 0, Arc::new(name), tx);
69        ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
70        STORAGE.with(|cell| cell.borrow_mut().clear());
71
72        (
73            arb,
74            ArbiterController {
75                sys,
76                rx,
77                stop: None,
78            },
79        )
80    }
81
82    pub(super) fn dummy() -> Self {
83        Arbiter {
84            id: 0,
85            hnd: None,
86            name: String::new().into(),
87            sys_id: 0,
88            sender: unbounded().0,
89            thread_handle: None,
90        }
91    }
92
93    /// Returns the current thread's arbiter's address
94    ///
95    /// # Panics
96    ///
97    /// Panics if Arbiter is not running
98    pub fn current() -> Arbiter {
99        ADDR.with(|cell| match *cell.borrow() {
100            Some(ref addr) => addr.clone(),
101            None => panic!("Arbiter is not running"),
102        })
103    }
104
105    pub(crate) fn set_current(&self) {
106        ADDR.with(|cell| {
107            *cell.borrow_mut() = Some(self.clone());
108        });
109    }
110
111    /// Stop arbiter from continuing it's event loop.
112    pub fn stop(&self) {
113        let _ = self.sender.try_send(ArbiterCommand::Stop);
114    }
115
116    /// Spawn new thread and run runtime in spawned thread.
117    /// Returns address of newly created arbiter.
118    pub fn new() -> Arbiter {
119        let id = COUNT.load(Ordering::Relaxed) + 1;
120        Arbiter::with_name(format!("{}:arb:{}", System::current().name(), id))
121    }
122
123    /// Spawn new thread and run runtime in spawned thread
124    ///
125    /// Returns address of newly created arbiter.
126    pub fn with_name(name: String) -> Arbiter {
127        let id = COUNT.fetch_add(1, Ordering::Relaxed);
128        let sys = System::current();
129        let name2 = Arc::new(name.clone());
130        let config = sys.config();
131        let (arb_tx, arb_rx) = unbounded();
132        let arb_tx2 = arb_tx.clone();
133
134        let builder = if sys.config().stack_size > 0 {
135            thread::Builder::new()
136                .name(name)
137                .stack_size(sys.config().stack_size)
138        } else {
139            thread::Builder::new().name(name)
140        };
141
142        let name = name2.clone();
143        let sys_id = sys.id();
144        let (arb_hnd_tx, arb_hnd_rx) = oneshot::channel();
145
146        let handle = builder
147            .spawn(move || {
148                log::info!("Starting {name2:?} arbiter");
149
150                let sys2 = sys.clone();
151                let (stop, stop_rx) = oneshot::channel();
152                STORAGE.with(|cell| cell.borrow_mut().clear());
153
154                System::set_current(sys.clone());
155
156                crate::driver::block_on(config.runner.as_ref(), async move {
157                    let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
158                    arb_hnd_tx
159                        .send(arb.hnd.clone())
160                        .expect("Controller thread has gone");
161
162                    // start arbiter controller
163                    crate::spawn(
164                        ArbiterController {
165                            sys,
166                            stop: Some(stop),
167                            rx: arb_rx,
168                        }
169                        .run(),
170                    );
171                    ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
172
173                    // register arbiter
174                    let _ = sys2
175                        .sys()
176                        .try_send(SystemCommand::RegisterArbiter(Id(id), arb));
177
178                    // run loop
179                    let _ = stop_rx.await;
180                });
181
182                // unregister arbiter
183                let _ = System::current()
184                    .sys()
185                    .try_send(SystemCommand::UnregisterArbiter(Id(id)));
186
187                remove_all_items();
188            })
189            .unwrap_or_else(|err| {
190                panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
191            });
192
193        let hnd = arb_hnd_rx.recv().expect("Could not start new arbiter");
194
195        Arbiter {
196            id,
197            hnd,
198            name,
199            sys_id: sys_id.0,
200            sender: arb_tx2,
201            thread_handle: Some(handle),
202        }
203    }
204
205    fn with_sender(
206        sys_id: usize,
207        id: usize,
208        name: Arc<String>,
209        sender: Sender<ArbiterCommand>,
210    ) -> Self {
211        #[cfg(feature = "tokio")]
212        let hnd = { Handle::new(sender.clone()) };
213
214        #[cfg(feature = "compio")]
215        let hnd = { Handle::new(sender.clone()) };
216
217        #[cfg(all(not(feature = "compio"), not(feature = "tokio")))]
218        let hnd = { Handle::current() };
219
220        Self {
221            id,
222            sys_id,
223            name,
224            sender,
225            hnd: Some(hnd),
226            thread_handle: None,
227        }
228    }
229
230    /// Id of the arbiter
231    pub fn id(&self) -> Id {
232        Id(self.id)
233    }
234
235    /// Name of the arbiter
236    pub fn name(&self) -> &str {
237        self.name.as_ref()
238    }
239
240    #[inline]
241    /// Handle to a runtime
242    pub fn handle(&self) -> &Handle {
243        self.hnd.as_ref().unwrap()
244    }
245
246    #[doc(hidden)]
247    #[deprecated(since = "3.8.0", note = "use `ntex_rt::spawn()`")]
248    /// Send a future to the Arbiter's thread, and spawn it.
249    pub fn spawn<F>(&self, future: F)
250    where
251        F: Future<Output = ()> + Send + 'static,
252    {
253        let _ = self
254            .sender
255            .try_send(ArbiterCommand::Execute(Box::pin(future)));
256    }
257
258    #[doc(hidden)]
259    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
260    /// Send a function to the Arbiter's thread and spawns it's resulting future.
261    /// This can be used to spawn non-send futures on the arbiter thread.
262    pub fn spawn_with<F, R, O>(
263        &self,
264        f: F,
265    ) -> impl Future<Output = Result<O, oneshot::RecvError>> + Send + 'static
266    where
267        F: FnOnce() -> R + Send + 'static,
268        R: Future<Output = O> + 'static,
269        O: Send + 'static,
270    {
271        let (tx, rx) = oneshot::async_channel();
272        let _ = self
273            .sender
274            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
275                crate::spawn(async move {
276                    let _ = tx.send(f().await);
277                });
278            })));
279        rx
280    }
281
282    #[doc(hidden)]
283    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
284    /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
285    /// A future is created, and when resolved will contain the result of the function sent
286    /// to the Arbiters thread.
287    pub fn exec<F, R>(
288        &self,
289        f: F,
290    ) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
291    where
292        F: FnOnce() -> R + Send + 'static,
293        R: Send + 'static,
294    {
295        let (tx, rx) = oneshot::async_channel();
296        let _ = self
297            .sender
298            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
299                let _ = tx.send(f());
300            })));
301        rx
302    }
303
304    #[doc(hidden)]
305    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
306    /// Send a function to the Arbiter's thread, and execute it. Any result from the function
307    /// is discarded.
308    pub fn exec_fn<F>(&self, f: F)
309    where
310        F: FnOnce() + Send + 'static,
311    {
312        let _ = self
313            .sender
314            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
315                f();
316            })));
317    }
318
319    #[doc(hidden)]
320    #[deprecated(since = "3.8.0", note = "use `ntex_rt::set_item()`")]
321    /// Set item to current arbiter's storage
322    pub fn set_item<T: 'static>(item: T) {
323        set_item(item);
324    }
325
326    #[doc(hidden)]
327    #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
328    /// Check if arbiter storage contains item
329    pub fn contains_item<T: 'static>() -> bool {
330        STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
331    }
332
333    #[doc(hidden)]
334    #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
335    /// Get a reference to a type previously inserted on this arbiter's storage
336    ///
337    /// # Panics
338    ///
339    /// Panics if item is not inserted
340    pub fn get_item<T: 'static, F, R>(f: F) -> R
341    where
342        F: FnOnce(&T) -> R,
343    {
344        STORAGE.with(move |cell| {
345            let mut st = cell.borrow_mut();
346            let item = st
347                .get_mut(&TypeId::of::<T>())
348                .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
349                .unwrap();
350            f(item)
351        })
352    }
353
354    /// Get a type previously inserted to this runtime or create new one.
355    pub fn get_value<T, F>(f: F) -> T
356    where
357        T: Clone + 'static,
358        F: FnOnce() -> T,
359    {
360        STORAGE.with(move |cell| {
361            let mut st = cell.borrow_mut();
362            if let Some(boxed) = st.get(&TypeId::of::<T>())
363                && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
364            {
365                return val.clone();
366            }
367            let val = f();
368            st.insert(TypeId::of::<T>(), Box::new(val.clone()));
369            val
370        })
371    }
372
373    /// Wait for the event loop to stop by joining the underlying thread (if have Some).
374    pub fn join(&mut self) -> thread::Result<()> {
375        if let Some(thread_handle) = self.thread_handle.take() {
376            thread_handle.join()
377        } else {
378            Ok(())
379        }
380    }
381}
382
383impl Eq for Arbiter {}
384
385impl PartialEq for Arbiter {
386    fn eq(&self, other: &Self) -> bool {
387        self.id == other.id && self.sys_id == other.sys_id
388    }
389}
390
391pub(crate) struct ArbiterController {
392    sys: System,
393    stop: Option<oneshot::Sender<i32>>,
394    rx: Receiver<ArbiterCommand>,
395}
396
397impl Drop for ArbiterController {
398    fn drop(&mut self) {
399        if thread::panicking() {
400            if self.sys.stop_on_panic() {
401                eprintln!("Panic in Arbiter thread, shutting down system.");
402                self.sys.stop_with_code(1);
403            } else {
404                eprintln!("Panic in Arbiter thread.");
405            }
406        }
407    }
408}
409
410impl ArbiterController {
411    pub(super) async fn run(mut self) {
412        loop {
413            match self.rx.recv().await {
414                Ok(ArbiterCommand::Stop) => {
415                    if let Some(stop) = self.stop.take() {
416                        let _ = stop.send(0);
417                    }
418                    break;
419                }
420                Ok(ArbiterCommand::Execute(fut)) => {
421                    crate::spawn(fut);
422                }
423                Ok(ArbiterCommand::ExecuteFn(f)) => {
424                    f.call_box();
425                }
426                Err(_) => break,
427            }
428        }
429    }
430}
431
432/// Set item to current runtime's storage
433pub fn set_item<T: 'static>(item: T) {
434    STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
435}
436
437/// Get a reference to a type previously inserted on this runtime's storage
438pub fn get_item<T: Clone + 'static>() -> Option<T> {
439    STORAGE.with(move |cell| {
440        cell.borrow()
441            .get(&TypeId::of::<T>())
442            .and_then(|boxed| boxed.downcast_ref())
443            .cloned()
444    })
445}
446
447/// Get a reference to a type or create new if it doesnt exists
448pub fn with_item<T: Default + 'static, F, R>(f: F) -> R
449where
450    F: FnOnce(&T) -> R,
451{
452    STORAGE.with(move |cell| {
453        let mut st = cell.borrow_mut();
454        if let Some(boxed) = st.get(&TypeId::of::<T>()) {
455            f(boxed.downcast_ref().unwrap())
456        } else {
457            let item = T::default();
458            let result = f(&item);
459            st.insert(TypeId::of::<T>(), Box::new(item));
460            result
461        }
462    })
463}
464
465/// Remove all items from storage.
466pub fn remove_all_items() {
467    STORAGE.with(move |cell| cell.borrow_mut().clear());
468    System::remove_current();
469}