requiem_rt/
builder.rs

1use std::borrow::Cow;
2use std::io;
3
4use futures::channel::mpsc::unbounded;
5use futures::channel::oneshot::{channel, Receiver};
6use futures::future::{lazy, Future, FutureExt};
7use tokio::task::LocalSet;
8
9use crate::arbiter::{Arbiter, SystemArbiter};
10use crate::runtime::Runtime;
11use crate::system::System;
12
13/// Builder struct for a actix runtime.
14///
15/// Either use `Builder::build` to create a system and start actors.
16/// Alternatively, use `Builder::run` to start the tokio runtime and
17/// run a function in its context.
18pub struct Builder {
19    /// Name of the System. Defaults to "actix" if unset.
20    name: Cow<'static, str>,
21
22    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
23    stop_on_panic: bool,
24}
25
26impl Builder {
27    pub(crate) fn new() -> Self {
28        Builder {
29            name: Cow::Borrowed("actix"),
30            stop_on_panic: false,
31        }
32    }
33
34    /// Sets the name of the System.
35    pub fn name<T: Into<String>>(mut self, name: T) -> Self {
36        self.name = Cow::Owned(name.into());
37        self
38    }
39
40    /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
41    /// uncaught panic is thrown from a worker thread.
42    ///
43    /// Defaults to false.
44    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
45        self.stop_on_panic = stop_on_panic;
46        self
47    }
48
49    /// Create new System.
50    ///
51    /// This method panics if it can not create tokio runtime
52    pub fn build(self) -> SystemRunner {
53        self.create_runtime(|| {})
54    }
55
56    /// Create new System that can run asynchronously.
57    ///
58    /// This method panics if it cannot start the system arbiter
59    pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
60        self.create_async_runtime(local)
61    }
62
63    /// This function will start tokio runtime and will finish once the
64    /// `System::stop()` message get called.
65    /// Function `f` get called within tokio runtime context.
66    pub fn run<F>(self, f: F) -> io::Result<()>
67    where
68        F: FnOnce() + 'static,
69    {
70        self.create_runtime(f).run()
71    }
72
73    fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
74        let (stop_tx, stop) = channel();
75        let (sys_sender, sys_receiver) = unbounded();
76
77        let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
78
79        // system arbiter
80        let arb = SystemArbiter::new(stop_tx, sys_receiver);
81
82        // start the system arbiter
83        let _ = local.spawn_local(arb);
84
85        AsyncSystemRunner { stop, system }
86    }
87
88    fn create_runtime<F>(self, f: F) -> SystemRunner
89    where
90        F: FnOnce() + 'static,
91    {
92        let (stop_tx, stop) = channel();
93        let (sys_sender, sys_receiver) = unbounded();
94
95        let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
96
97        // system arbiter
98        let arb = SystemArbiter::new(stop_tx, sys_receiver);
99
100        let mut rt = Runtime::new().unwrap();
101        rt.spawn(arb);
102
103        // init system arbiter and run configuration method
104        rt.block_on(lazy(move |_| f()));
105
106        SystemRunner { rt, stop, system }
107    }
108}
109
110#[derive(Debug)]
111pub(crate) struct AsyncSystemRunner {
112    stop: Receiver<i32>,
113    system: System,
114}
115
116impl AsyncSystemRunner {
117    /// This function will start event loop and returns a future that
118    /// resolves once the `System::stop()` function is called.
119    pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
120        let AsyncSystemRunner { stop, .. } = self;
121
122        // run loop
123        lazy(|_| {
124            Arbiter::run_system(None);
125            async {
126                let res = match stop.await {
127                    Ok(code) => {
128                        if code != 0 {
129                            Err(io::Error::new(
130                                io::ErrorKind::Other,
131                                format!("Non-zero exit code: {}", code),
132                            ))
133                        } else {
134                            Ok(())
135                        }
136                    }
137                    Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
138                };
139                Arbiter::stop_system();
140                return res;
141            }
142        })
143        .flatten()
144    }
145}
146
147/// Helper object that runs System's event loop
148#[must_use = "SystemRunner must be run"]
149#[derive(Debug)]
150pub struct SystemRunner {
151    rt: Runtime,
152    stop: Receiver<i32>,
153    system: System,
154}
155
156impl SystemRunner {
157    /// This function will start event loop and will finish once the
158    /// `System::stop()` function is called.
159    pub fn run(self) -> io::Result<()> {
160        let SystemRunner { mut rt, stop, .. } = self;
161
162        // run loop
163        Arbiter::run_system(Some(&rt));
164        let result = match rt.block_on(stop) {
165            Ok(code) => {
166                if code != 0 {
167                    Err(io::Error::new(
168                        io::ErrorKind::Other,
169                        format!("Non-zero exit code: {}", code),
170                    ))
171                } else {
172                    Ok(())
173                }
174            }
175            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
176        };
177        Arbiter::stop_system();
178        result
179    }
180
181    /// Execute a future and wait for result.
182    pub fn block_on<F, O>(&mut self, fut: F) -> O
183    where
184        F: Future<Output = O> + 'static,
185    {
186        Arbiter::run_system(Some(&self.rt));
187        let res = self.rt.block_on(fut);
188        Arbiter::stop_system();
189        res
190    }
191}