Skip to main content

ntex_rt/
arbiter.rs

1#![allow(clippy::missing_panics_doc)]
2use std::any::{Any, TypeId};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::{cell::RefCell, collections::HashMap, fmt, future::Future, pin::Pin, thread};
5
6use async_channel::{Receiver, Sender, unbounded};
7
8use crate::Handle;
9use crate::system::{FnExec, Id, System, SystemCommand};
10
11thread_local!(
12    static ADDR: RefCell<Option<Arbiter>> = const { RefCell::new(None) };
13    static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
14);
15
16pub(super) static COUNT: AtomicUsize = AtomicUsize::new(0);
17
18pub(super) enum ArbiterCommand {
19    Stop,
20    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
21    ExecuteFn(Box<dyn FnExec>),
22}
23
24/// Arbiters provide an asynchronous execution environment for actors, functions
25/// and futures.
26///
27/// When an Arbiter is created, it spawns a new OS thread, and
28/// hosts an event loop. Some Arbiter functions execute on the current thread.
29pub struct Arbiter {
30    id: usize,
31    pub(crate) sys_id: usize,
32    name: Arc<String>,
33    pub(crate) hnd: Option<Handle>,
34    pub(crate) sender: Sender<ArbiterCommand>,
35    thread_handle: Option<thread::JoinHandle<()>>,
36}
37
38impl fmt::Debug for Arbiter {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "Arbiter({:?})", self.name.as_ref())
41    }
42}
43
44impl Default for Arbiter {
45    fn default() -> Arbiter {
46        Arbiter::new()
47    }
48}
49
50impl Clone for Arbiter {
51    fn clone(&self) -> Self {
52        Self {
53            id: self.id,
54            sys_id: self.sys_id,
55            name: self.name.clone(),
56            sender: self.sender.clone(),
57            hnd: self.hnd.clone(),
58            thread_handle: None,
59        }
60    }
61}
62
63impl Arbiter {
64    #[allow(clippy::borrowed_box)]
65    pub(super) fn new_system(sys: System, name: String) -> (Self, ArbiterController) {
66        let (tx, rx) = unbounded();
67
68        let arb = Arbiter::with_sender(sys.id().0, 0, Arc::new(name), tx);
69        ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
70        STORAGE.with(|cell| cell.borrow_mut().clear());
71
72        (
73            arb,
74            ArbiterController {
75                sys,
76                rx,
77                stop: None,
78            },
79        )
80    }
81
82    pub(super) fn dummy() -> Self {
83        Arbiter {
84            id: 0,
85            hnd: None,
86            name: String::new().into(),
87            sys_id: 0,
88            sender: unbounded().0,
89            thread_handle: None,
90        }
91    }
92
93    /// Returns the current thread's arbiter's address
94    ///
95    /// # Panics
96    ///
97    /// Panics if Arbiter is not running
98    pub fn current() -> Arbiter {
99        ADDR.with(|cell| match *cell.borrow() {
100            Some(ref addr) => addr.clone(),
101            None => panic!("Arbiter is not running"),
102        })
103    }
104
105    pub(crate) fn set_current(&self) {
106        ADDR.with(|cell| {
107            *cell.borrow_mut() = Some(self.clone());
108        });
109    }
110
111    /// Stop arbiter from continuing it's event loop.
112    pub fn stop(&self) {
113        let _ = self.sender.try_send(ArbiterCommand::Stop);
114    }
115
116    /// Spawn new thread and run runtime in spawned thread.
117    /// Returns address of newly created arbiter.
118    pub fn new() -> Arbiter {
119        let id = COUNT.load(Ordering::Relaxed) + 1;
120        Arbiter::with_name(format!("{}:arb:{}", System::current().name(), id))
121    }
122
123    /// Spawn new thread and run runtime in spawned thread
124    ///
125    /// Returns address of newly created arbiter.
126    pub fn with_name(name: String) -> Arbiter {
127        let id = COUNT.fetch_add(1, Ordering::Relaxed);
128        let sys = System::current();
129        let name2 = Arc::new(name.clone());
130        let config = sys.config();
131        let (arb_tx, arb_rx) = unbounded();
132        let arb_tx2 = arb_tx.clone();
133
134        let builder = if sys.config().stack_size > 0 {
135            thread::Builder::new()
136                .name(name)
137                .stack_size(sys.config().stack_size)
138        } else {
139            thread::Builder::new().name(name)
140        };
141
142        let name = name2.clone();
143        let sys_id = sys.id();
144        let (arb_hnd_tx, arb_hnd_rx) = oneshot::channel();
145
146        let handle = builder
147            .spawn(move || {
148                log::info!("Starting {name2:?} arbiter");
149
150                let sys2 = sys.clone();
151                let sys3 = sys.clone();
152                let (stop, stop_rx) = oneshot::channel();
153                STORAGE.with(|cell| cell.borrow_mut().clear());
154
155                System::set_current(sys.clone());
156
157                crate::driver::block_on(config.runner.as_ref(), async move {
158                    let arb = Arbiter::with_sender(sys_id.0, id, name2, arb_tx);
159                    arb_hnd_tx
160                        .send(arb.hnd.clone())
161                        .expect("Controller thread has gone");
162
163                    // start arbiter controller
164                    crate::spawn(
165                        ArbiterController {
166                            sys,
167                            stop: Some(stop),
168                            rx: arb_rx,
169                        }
170                        .run(),
171                    );
172                    ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
173
174                    // register arbiter
175                    let _ = sys2
176                        .sys()
177                        .try_send(SystemCommand::RegisterArbiter(Id(id), arb));
178
179                    // run loop
180                    let _ = stop_rx.await;
181                });
182
183                // unregister arbiter
184                let _ = sys3
185                    .sys()
186                    .try_send(SystemCommand::UnregisterArbiter(Id(id)));
187
188                remove_all_items();
189            })
190            .unwrap_or_else(|err| {
191                panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
192            });
193
194        let hnd = arb_hnd_rx.recv().expect("Could not start new arbiter");
195
196        Arbiter {
197            id,
198            hnd,
199            name,
200            sys_id: sys_id.0,
201            sender: arb_tx2,
202            thread_handle: Some(handle),
203        }
204    }
205
206    fn with_sender(
207        sys_id: usize,
208        id: usize,
209        name: Arc<String>,
210        sender: Sender<ArbiterCommand>,
211    ) -> Self {
212        #[cfg(feature = "tokio")]
213        let hnd = { Handle::new(sender.clone()) };
214
215        #[cfg(feature = "compio")]
216        let hnd = { Handle::new(sender.clone()) };
217
218        #[cfg(all(not(feature = "compio"), not(feature = "tokio")))]
219        let hnd = { Handle::current() };
220
221        Self {
222            id,
223            sys_id,
224            name,
225            sender,
226            hnd: Some(hnd),
227            thread_handle: None,
228        }
229    }
230
231    /// Id of the arbiter
232    pub fn id(&self) -> Id {
233        Id(self.id)
234    }
235
236    /// Name of the arbiter
237    pub fn name(&self) -> &str {
238        self.name.as_ref()
239    }
240
241    #[inline]
242    /// Handle to a runtime
243    pub fn handle(&self) -> &Handle {
244        self.hnd.as_ref().unwrap()
245    }
246
247    #[doc(hidden)]
248    #[deprecated(since = "3.8.0", note = "use `ntex_rt::spawn()`")]
249    /// Send a future to the Arbiter's thread, and spawn it.
250    pub fn spawn<F>(&self, future: F)
251    where
252        F: Future<Output = ()> + Send + 'static,
253    {
254        let _ = self
255            .sender
256            .try_send(ArbiterCommand::Execute(Box::pin(future)));
257    }
258
259    #[doc(hidden)]
260    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
261    /// Send a function to the Arbiter's thread and spawns it's resulting future.
262    /// This can be used to spawn non-send futures on the arbiter thread.
263    pub fn spawn_with<F, R, O>(
264        &self,
265        f: F,
266    ) -> impl Future<Output = Result<O, oneshot::RecvError>> + Send + 'static
267    where
268        F: FnOnce() -> R + Send + 'static,
269        R: Future<Output = O> + 'static,
270        O: Send + 'static,
271    {
272        let (tx, rx) = oneshot::async_channel();
273        let _ = self
274            .sender
275            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
276                crate::spawn(async move {
277                    let _ = tx.send(f().await);
278                });
279            })));
280        rx
281    }
282
283    #[doc(hidden)]
284    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
285    /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
286    /// A future is created, and when resolved will contain the result of the function sent
287    /// to the Arbiters thread.
288    pub fn exec<F, R>(
289        &self,
290        f: F,
291    ) -> impl Future<Output = Result<R, oneshot::RecvError>> + Send + 'static
292    where
293        F: FnOnce() -> R + Send + 'static,
294        R: Send + 'static,
295    {
296        let (tx, rx) = oneshot::async_channel();
297        let _ = self
298            .sender
299            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
300                let _ = tx.send(f());
301            })));
302        rx
303    }
304
305    #[doc(hidden)]
306    #[deprecated(since = "3.8.0", note = "use `ntex_rt::Handle::spawn()`")]
307    /// Send a function to the Arbiter's thread, and execute it. Any result from the function
308    /// is discarded.
309    pub fn exec_fn<F>(&self, f: F)
310    where
311        F: FnOnce() + Send + 'static,
312    {
313        let _ = self
314            .sender
315            .try_send(ArbiterCommand::ExecuteFn(Box::new(move || {
316                f();
317            })));
318    }
319
320    #[doc(hidden)]
321    #[deprecated(since = "3.8.0", note = "use `ntex_rt::set_item()`")]
322    /// Set item to current arbiter's storage
323    pub fn set_item<T: 'static>(item: T) {
324        set_item(item);
325    }
326
327    #[doc(hidden)]
328    #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
329    /// Check if arbiter storage contains item
330    pub fn contains_item<T: 'static>() -> bool {
331        STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
332    }
333
334    #[doc(hidden)]
335    #[deprecated(since = "3.8.0", note = "use `ntex_rt::get_item()`")]
336    /// Get a reference to a type previously inserted on this arbiter's storage
337    ///
338    /// # Panics
339    ///
340    /// Panics if item is not inserted
341    pub fn get_item<T: 'static, F, R>(f: F) -> R
342    where
343        F: FnOnce(&T) -> R,
344    {
345        STORAGE.with(move |cell| {
346            let mut st = cell.borrow_mut();
347            let item = st
348                .get_mut(&TypeId::of::<T>())
349                .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
350                .unwrap();
351            f(item)
352        })
353    }
354
355    /// Get a type previously inserted to this runtime or create new one.
356    pub fn get_value<T, F>(f: F) -> T
357    where
358        T: Clone + 'static,
359        F: FnOnce() -> T,
360    {
361        STORAGE.with(move |cell| {
362            let mut st = cell.borrow_mut();
363            if let Some(boxed) = st.get(&TypeId::of::<T>())
364                && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
365            {
366                return val.clone();
367            }
368            let val = f();
369            st.insert(TypeId::of::<T>(), Box::new(val.clone()));
370            val
371        })
372    }
373
374    /// Wait for the event loop to stop by joining the underlying thread (if have Some).
375    pub fn join(&mut self) -> thread::Result<()> {
376        if let Some(thread_handle) = self.thread_handle.take() {
377            thread_handle.join()
378        } else {
379            Ok(())
380        }
381    }
382}
383
384impl Eq for Arbiter {}
385
386impl PartialEq for Arbiter {
387    fn eq(&self, other: &Self) -> bool {
388        self.id == other.id && self.sys_id == other.sys_id
389    }
390}
391
392pub(crate) struct ArbiterController {
393    sys: System,
394    stop: Option<oneshot::Sender<i32>>,
395    rx: Receiver<ArbiterCommand>,
396}
397
398impl Drop for ArbiterController {
399    fn drop(&mut self) {
400        if thread::panicking() {
401            if self.sys.stop_on_panic() {
402                eprintln!("Panic in Arbiter thread, shutting down system.");
403                self.sys.stop_with_code(1);
404            } else {
405                eprintln!("Panic in Arbiter thread.");
406            }
407        }
408    }
409}
410
411impl ArbiterController {
412    pub(super) async fn run(mut self) {
413        loop {
414            match self.rx.recv().await {
415                Ok(ArbiterCommand::Stop) => {
416                    if let Some(stop) = self.stop.take() {
417                        let _ = stop.send(0);
418                    }
419                    break;
420                }
421                Ok(ArbiterCommand::Execute(fut)) => {
422                    crate::spawn(fut);
423                }
424                Ok(ArbiterCommand::ExecuteFn(f)) => {
425                    f.call_box();
426                }
427                Err(_) => break,
428            }
429        }
430    }
431}
432
433/// Set item to current runtime's storage
434pub fn set_item<T: 'static>(item: T) {
435    STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
436}
437
438/// Get a reference to a type previously inserted on this runtime's storage
439pub fn get_item<T: Clone + 'static>() -> Option<T> {
440    STORAGE.with(move |cell| {
441        cell.borrow()
442            .get(&TypeId::of::<T>())
443            .and_then(|boxed| boxed.downcast_ref())
444            .cloned()
445    })
446}
447
448/// Get a reference to a type or create new if it doesnt exists
449pub fn with_item<T: Default + 'static, F, R>(f: F) -> R
450where
451    F: FnOnce(&T) -> R,
452{
453    STORAGE.with(move |cell| {
454        let mut st = cell.borrow_mut();
455        if let Some(boxed) = st.get(&TypeId::of::<T>()) {
456            f(boxed.downcast_ref().unwrap())
457        } else {
458            let item = T::default();
459            let result = f(&item);
460            st.insert(TypeId::of::<T>(), Box::new(item));
461            result
462        }
463    })
464}
465
466/// Remove all items from storage.
467pub fn remove_all_items() {
468    STORAGE.with(move |cell| cell.borrow_mut().clear());
469    System::remove_current();
470}