requiem_rt/
arbiter.rs

1use std::any::{Any, TypeId};
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::task::{Context, Poll};
7use std::{fmt, thread};
8
9use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
10use futures::channel::oneshot::{channel, Canceled, Sender};
11use futures::{future, Future, FutureExt, Stream};
12
13use crate::runtime::Runtime;
14use crate::system::System;
15
16use copyless::BoxHelper;
17
18thread_local!(
19    static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
20    static RUNNING: Cell<bool> = Cell::new(false);
21    static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
22    static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
23);
24
25pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
26
27pub(crate) enum ArbiterCommand {
28    Stop,
29    Execute(Box<dyn Future<Output = ()> + Unpin + Send>),
30    ExecuteFn(Box<dyn FnExec>),
31}
32
33impl fmt::Debug for ArbiterCommand {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
37            ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
38            ArbiterCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"),
39        }
40    }
41}
42
43#[derive(Debug)]
44/// Arbiters provide an asynchronous execution environment for actors, functions
45/// and futures. When an Arbiter is created, it spawns a new OS thread, and
46/// hosts an event loop. Some Arbiter functions execute on the current thread.
47pub struct Arbiter {
48    sender: UnboundedSender<ArbiterCommand>,
49    thread_handle: Option<thread::JoinHandle<()>>,
50}
51
52impl Clone for Arbiter {
53    fn clone(&self) -> Self {
54        Self::with_sender(self.sender.clone())
55    }
56}
57
58impl Default for Arbiter {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl Arbiter {
65    pub(crate) fn new_system() -> Self {
66        let (tx, rx) = unbounded();
67
68        let arb = Arbiter::with_sender(tx);
69        ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
70        RUNNING.with(|cell| cell.set(false));
71        STORAGE.with(|cell| cell.borrow_mut().clear());
72        Arbiter::spawn(ArbiterController { stop: None, rx });
73
74        arb
75    }
76
77    /// Returns the current thread's arbiter's address. If no Arbiter is present, then this
78    /// function will panic!
79    pub fn current() -> Arbiter {
80        ADDR.with(|cell| match *cell.borrow() {
81            Some(ref addr) => addr.clone(),
82            None => panic!("Arbiter is not running"),
83        })
84    }
85
86    /// Stop arbiter from continuing it's event loop.
87    pub fn stop(&self) {
88        let _ = self.sender.unbounded_send(ArbiterCommand::Stop);
89    }
90
91    /// Spawn new thread and run event loop in spawned thread.
92    /// Returns address of newly created arbiter.
93    pub fn new() -> Arbiter {
94        let id = COUNT.fetch_add(1, Ordering::Relaxed);
95        let name = format!("actix-rt:worker:{}", id);
96        let sys = System::current();
97        let (arb_tx, arb_rx) = unbounded();
98        let arb_tx2 = arb_tx.clone();
99
100        let handle = thread::Builder::new()
101            .name(name.clone())
102            .spawn(move || {
103                let mut rt = Runtime::new().expect("Can not create Runtime");
104                let arb = Arbiter::with_sender(arb_tx);
105
106                let (stop, stop_rx) = channel();
107                RUNNING.with(|cell| cell.set(true));
108                STORAGE.with(|cell| cell.borrow_mut().clear());
109
110                System::set_current(sys);
111
112                // start arbiter controller
113                rt.spawn(ArbiterController {
114                    stop: Some(stop),
115                    rx: arb_rx,
116                });
117                ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
118
119                // register arbiter
120                let _ = System::current()
121                    .sys()
122                    .unbounded_send(SystemCommand::RegisterArbiter(id, arb));
123
124                // run loop
125                let _ = match rt.block_on(stop_rx) {
126                    Ok(code) => code,
127                    Err(_) => 1,
128                };
129
130                // unregister arbiter
131                let _ = System::current()
132                    .sys()
133                    .unbounded_send(SystemCommand::UnregisterArbiter(id));
134            })
135            .unwrap_or_else(|err| {
136                panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
137            });
138
139        Arbiter {
140            sender: arb_tx2,
141            thread_handle: Some(handle),
142        }
143    }
144
145    pub(crate) fn run_system(rt: Option<&Runtime>) {
146        RUNNING.with(|cell| cell.set(true));
147        Q.with(|cell| {
148            let mut v = cell.borrow_mut();
149            for fut in v.drain(..) {
150                if let Some(rt) = rt {
151                    rt.spawn(fut);
152                } else {
153                    tokio::task::spawn_local(fut);
154                }
155            }
156        });
157    }
158
159    pub(crate) fn stop_system() {
160        RUNNING.with(|cell| cell.set(false));
161    }
162
163    /// Spawn a future on the current thread. This does not create a new Arbiter
164    /// or Arbiter address, it is simply a helper for spawning futures on the current
165    /// thread.
166    pub fn spawn<F>(future: F)
167    where
168        F: Future<Output = ()> + 'static,
169    {
170        RUNNING.with(move |cell| {
171            if cell.get() {
172                // Spawn the future on running executor
173                tokio::task::spawn_local(future);
174            } else {
175                // Box the future and push it to the queue, this results in double boxing
176                // because the executor boxes the future again, but works for now
177                Q.with(move |cell| {
178                    cell.borrow_mut()
179                        .push(unsafe { Pin::new_unchecked(Box::alloc().init(future)) })
180                });
181            }
182        });
183    }
184
185    /// Executes a future on the current thread. This does not create a new Arbiter
186    /// or Arbiter address, it is simply a helper for executing futures on the current
187    /// thread.
188    pub fn spawn_fn<F, R>(f: F)
189    where
190        F: FnOnce() -> R + 'static,
191        R: Future<Output = ()> + 'static,
192    {
193        Arbiter::spawn(future::lazy(|_| f()).flatten())
194    }
195
196    /// Send a future to the Arbiter's thread, and spawn it.
197    pub fn send<F>(&self, future: F)
198    where
199        F: Future<Output = ()> + Send + Unpin + 'static,
200    {
201        let _ = self
202            .sender
203            .unbounded_send(ArbiterCommand::Execute(Box::new(future)));
204    }
205
206    /// Send a function to the Arbiter's thread, and execute it. Any result from the function
207    /// is discarded.
208    pub fn exec_fn<F>(&self, f: F)
209    where
210        F: FnOnce() + Send + 'static,
211    {
212        let _ = self
213            .sender
214            .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
215                f();
216            })));
217    }
218
219    /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
220    /// A future is created, and when resolved will contain the result of the function sent
221    /// to the Arbiters thread.
222    pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, Canceled>>
223    where
224        F: FnOnce() -> R + Send + 'static,
225        R: Send + 'static,
226    {
227        let (tx, rx) = channel();
228        let _ = self
229            .sender
230            .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || {
231                if !tx.is_canceled() {
232                    let _ = tx.send(f());
233                }
234            })));
235        rx
236    }
237
238    /// Set item to arbiter storage
239    pub fn set_item<T: 'static>(item: T) {
240        STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
241    }
242
243    /// Check if arbiter storage contains item
244    pub fn contains_item<T: 'static>() -> bool {
245        STORAGE.with(move |cell| cell.borrow().get(&TypeId::of::<T>()).is_some())
246    }
247
248    /// Get a reference to a type previously inserted on this arbiter's storage.
249    ///
250    /// Panics is item is not inserted
251    pub fn get_item<T: 'static, F, R>(mut f: F) -> R
252    where
253        F: FnMut(&T) -> R,
254    {
255        STORAGE.with(move |cell| {
256            let st = cell.borrow();
257            let item = st
258                .get(&TypeId::of::<T>())
259                .and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
260                .unwrap();
261            f(item)
262        })
263    }
264
265    /// Get a mutable reference to a type previously inserted on this arbiter's storage.
266    ///
267    /// Panics is item is not inserted
268    pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
269    where
270        F: FnMut(&mut T) -> R,
271    {
272        STORAGE.with(move |cell| {
273            let mut st = cell.borrow_mut();
274            let item = st
275                .get_mut(&TypeId::of::<T>())
276                .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
277                .unwrap();
278            f(item)
279        })
280    }
281
282    fn with_sender(sender: UnboundedSender<ArbiterCommand>) -> Self {
283        Self {
284            sender,
285            thread_handle: None,
286        }
287    }
288
289    /// Wait for the event loop to stop by joining the underlying thread (if have Some).
290    pub fn join(&mut self) -> thread::Result<()> {
291        if let Some(thread_handle) = self.thread_handle.take() {
292            thread_handle.join()
293        } else {
294            Ok(())
295        }
296    }
297}
298
299struct ArbiterController {
300    stop: Option<Sender<i32>>,
301    rx: UnboundedReceiver<ArbiterCommand>,
302}
303
304impl Drop for ArbiterController {
305    fn drop(&mut self) {
306        if thread::panicking() {
307            if System::current().stop_on_panic() {
308                eprintln!("Panic in Arbiter thread, shutting down system.");
309                System::current().stop_with_code(1)
310            } else {
311                eprintln!("Panic in Arbiter thread.");
312            }
313        }
314    }
315}
316
317impl Future for ArbiterController {
318    type Output = ();
319
320    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
321        loop {
322            match Pin::new(&mut self.rx).poll_next(cx) {
323                Poll::Ready(None) => return Poll::Ready(()),
324                Poll::Ready(Some(item)) => match item {
325                    ArbiterCommand::Stop => {
326                        if let Some(stop) = self.stop.take() {
327                            let _ = stop.send(0);
328                        };
329                        return Poll::Ready(());
330                    }
331                    ArbiterCommand::Execute(fut) => {
332                        tokio::task::spawn_local(fut);
333                    }
334                    ArbiterCommand::ExecuteFn(f) => {
335                        f.call_box();
336                    }
337                },
338                Poll::Pending => return Poll::Pending,
339            }
340        }
341    }
342}
343
344#[derive(Debug)]
345pub(crate) enum SystemCommand {
346    Exit(i32),
347    RegisterArbiter(usize, Arbiter),
348    UnregisterArbiter(usize),
349}
350
351#[derive(Debug)]
352pub(crate) struct SystemArbiter {
353    stop: Option<Sender<i32>>,
354    commands: UnboundedReceiver<SystemCommand>,
355    arbiters: HashMap<usize, Arbiter>,
356}
357
358impl SystemArbiter {
359    pub(crate) fn new(stop: Sender<i32>, commands: UnboundedReceiver<SystemCommand>) -> Self {
360        SystemArbiter {
361            commands,
362            stop: Some(stop),
363            arbiters: HashMap::new(),
364        }
365    }
366}
367
368impl Future for SystemArbiter {
369    type Output = ();
370
371    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
372        loop {
373            match Pin::new(&mut self.commands).poll_next(cx) {
374                Poll::Ready(None) => return Poll::Ready(()),
375                Poll::Ready(Some(cmd)) => match cmd {
376                    SystemCommand::Exit(code) => {
377                        // stop arbiters
378                        for arb in self.arbiters.values() {
379                            arb.stop();
380                        }
381                        // stop event loop
382                        if let Some(stop) = self.stop.take() {
383                            let _ = stop.send(code);
384                        }
385                    }
386                    SystemCommand::RegisterArbiter(name, hnd) => {
387                        self.arbiters.insert(name, hnd);
388                    }
389                    SystemCommand::UnregisterArbiter(name) => {
390                        self.arbiters.remove(&name);
391                    }
392                },
393                Poll::Pending => return Poll::Pending,
394            }
395        }
396    }
397}
398
399pub trait FnExec: Send + 'static {
400    fn call_box(self: Box<Self>);
401}
402
403impl<F> FnExec for F
404where
405    F: FnOnce() + Send + 'static,
406{
407    #[allow(clippy::boxed_local)]
408    fn call_box(self: Box<Self>) {
409        (*self)()
410    }
411}