ntex_rt/
builder.rs

1use std::{future::Future, io, marker::PhantomData, pin::Pin, rc::Rc, sync::Arc};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
7
8/// Builder struct for a ntex runtime.
9///
10/// Either use `Builder::build` to create a system and start actors.
11/// Alternatively, use `Builder::run` to start the tokio runtime and
12/// run a function in its context.
13pub struct Builder {
14    /// Name of the System. Defaults to "ntex" if unset.
15    name: String,
16    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
17    stop_on_panic: bool,
18    /// New thread stack size
19    stack_size: usize,
20    /// Arbiters ping interval
21    ping_interval: usize,
22    /// Block on fn
23    block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
24}
25
26impl Builder {
27    pub(super) fn new() -> Self {
28        Builder {
29            name: "ntex".into(),
30            stop_on_panic: false,
31            stack_size: 0,
32            block_on: None,
33            ping_interval: 1000,
34        }
35    }
36
37    /// Sets the name of the System.
38    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
39        self.name = name.as_ref().into();
40        self
41    }
42
43    /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
44    /// uncaught panic is thrown from a worker thread.
45    ///
46    /// Defaults to false.
47    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
48        self.stop_on_panic = stop_on_panic;
49        self
50    }
51
52    /// Sets the size of the stack (in bytes) for the new thread.
53    pub fn stack_size(mut self, size: usize) -> Self {
54        self.stack_size = size;
55        self
56    }
57
58    /// Sets ping interval for spawned arbiters.
59    ///
60    /// Interval is in milliseconds. By default 5000 milliseconds is set.
61    /// To disable pings set value to zero.
62    pub fn ping_interval(mut self, interval: usize) -> Self {
63        self.ping_interval = interval;
64        self
65    }
66
67    /// Use custom block_on function
68    pub fn block_on<F>(mut self, block_on: F) -> Self
69    where
70        F: Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send + 'static,
71    {
72        self.block_on = Some(Arc::new(block_on));
73        self
74    }
75
76    /// Create new System.
77    ///
78    /// This method panics if it can not create tokio runtime
79    pub fn finish(self) -> SystemRunner {
80        let (stop_tx, stop) = oneshot::channel();
81        let (sys_sender, sys_receiver) = unbounded();
82
83        let config = SystemConfig {
84            block_on: self.block_on,
85            stack_size: self.stack_size,
86            stop_on_panic: self.stop_on_panic,
87        };
88
89        let (arb, controller) = Arbiter::new_system(self.name.clone());
90        let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
91        let system = System::construct(sys_sender, arb.clone(), config);
92
93        // system arbiter
94        let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
95
96        // init system arbiter and run configuration method
97        SystemRunner {
98            stop,
99            support,
100            controller,
101            system,
102            _t: PhantomData,
103        }
104    }
105}
106
107/// Helper object that runs System's event loop
108#[must_use = "SystemRunner must be run"]
109pub struct SystemRunner {
110    stop: oneshot::Receiver<i32>,
111    support: SystemSupport,
112    controller: ArbiterController,
113    system: System,
114    _t: PhantomData<Rc<()>>,
115}
116
117impl SystemRunner {
118    /// Get current system.
119    pub fn system(&self) -> System {
120        self.system.clone()
121    }
122
123    /// This function will start event loop and will finish once the
124    /// `System::stop()` function is called.
125    pub fn run_until_stop(self) -> io::Result<()> {
126        self.run(|| Ok(()))
127    }
128
129    /// This function will start event loop and will finish once the
130    /// `System::stop()` function is called.
131    pub fn run<F>(self, f: F) -> io::Result<()>
132    where
133        F: FnOnce() -> io::Result<()> + 'static,
134    {
135        let SystemRunner {
136            controller,
137            stop,
138            support,
139            system,
140            ..
141        } = self;
142
143        // run loop
144        system.config().block_on(async move {
145            f()?;
146
147            let _ = crate::spawn(support.run());
148            let _ = crate::spawn(controller.run());
149            match stop.await {
150                Ok(code) => {
151                    if code != 0 {
152                        Err(io::Error::other(format!("Non-zero exit code: {}", code)))
153                    } else {
154                        Ok(())
155                    }
156                }
157                Err(_) => Err(io::Error::other("Closed")),
158            }
159        })
160    }
161
162    /// Execute a future and wait for result.
163    pub fn block_on<F, R>(self, fut: F) -> R
164    where
165        F: Future<Output = R> + 'static,
166        R: 'static,
167    {
168        let SystemRunner {
169            controller,
170            support,
171            system,
172            ..
173        } = self;
174
175        system.config().block_on(async move {
176            let _ = crate::spawn(support.run());
177            let _ = crate::spawn(controller.run());
178            fut.await
179        })
180    }
181
182    #[cfg(feature = "tokio")]
183    /// Execute a future and wait for result.
184    pub async fn run_local<F, R>(self, fut: F) -> R
185    where
186        F: Future<Output = R> + 'static,
187        R: 'static,
188    {
189        let SystemRunner {
190            controller,
191            support,
192            ..
193        } = self;
194
195        // run loop
196        tok_io::task::LocalSet::new()
197            .run_until(async move {
198                let _ = crate::spawn(support.run());
199                let _ = crate::spawn(controller.run());
200                fut.await
201            })
202            .await
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::sync::mpsc;
209    use std::thread;
210
211    use super::*;
212
213    #[test]
214    fn test_async() {
215        let (tx, rx) = mpsc::channel();
216
217        thread::spawn(move || {
218            let runner = crate::System::build().stop_on_panic(true).finish();
219
220            tx.send(runner.system()).unwrap();
221            let _ = runner.run_until_stop();
222        });
223        let s = System::new("test");
224
225        let sys = rx.recv().unwrap();
226        let id = sys.id();
227        let (tx, rx) = mpsc::channel();
228        sys.arbiter().exec_fn(move || {
229            let _ = tx.send(System::current().id());
230        });
231        let id2 = rx.recv().unwrap();
232        assert_eq!(id, id2);
233
234        let id2 = s
235            .block_on(sys.arbiter().exec(|| System::current().id()))
236            .unwrap();
237        assert_eq!(id, id2);
238
239        let (tx, rx) = mpsc::channel();
240        sys.arbiter().spawn(Box::pin(async move {
241            let _ = tx.send(System::current().id());
242        }));
243        let id2 = rx.recv().unwrap();
244        assert_eq!(id, id2);
245    }
246
247    #[cfg(feature = "tokio")]
248    #[test]
249    fn test_block_on() {
250        let (tx, rx) = mpsc::channel();
251
252        thread::spawn(move || {
253            let runner = crate::System::build()
254                .stop_on_panic(true)
255                .ping_interval(25)
256                .block_on(|fut| {
257                    let rt = tok_io::runtime::Builder::new_current_thread()
258                        .enable_all()
259                        .build()
260                        .unwrap();
261                    tok_io::task::LocalSet::new().block_on(&rt, fut);
262                })
263                .finish();
264
265            tx.send(runner.system()).unwrap();
266            let _ = runner.run_until_stop();
267        });
268        let s = System::new("test");
269
270        let sys = rx.recv().unwrap();
271        let id = sys.id();
272        let (tx, rx) = mpsc::channel();
273        sys.arbiter().exec_fn(move || {
274            let _ = tx.send(System::current().id());
275        });
276        let id2 = rx.recv().unwrap();
277        assert_eq!(id, id2);
278
279        let id2 = s
280            .block_on(sys.arbiter().exec(|| System::current().id()))
281            .unwrap();
282        assert_eq!(id, id2);
283
284        let (tx, rx) = mpsc::channel();
285        sys.arbiter().spawn(async move {
286            futures_timer::Delay::new(std::time::Duration::from_millis(100)).await;
287
288            let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| {
289                recs.unwrap().clone()
290            });
291            let _ = tx.send(recs);
292        });
293        let recs = rx.recv().unwrap();
294
295        assert!(!recs.is_empty());
296        sys.stop();
297    }
298}