ntex_rt/
builder.rs

1use std::{future::Future, io, marker::PhantomData, pin::Pin, rc::Rc, sync::Arc};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
7
8/// Builder struct for a ntex runtime.
9///
10/// Either use `Builder::build` to create a system and start actors.
11/// Alternatively, use `Builder::run` to start the tokio runtime and
12/// run a function in its context.
13pub struct Builder {
14    /// Name of the System. Defaults to "ntex" if unset.
15    name: String,
16    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
17    stop_on_panic: bool,
18    /// New thread stack size
19    stack_size: usize,
20    /// Arbiters ping interval
21    ping_interval: usize,
22    /// Block on fn
23    block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
24}
25
26impl Builder {
27    pub(super) fn new() -> Self {
28        Builder {
29            name: "ntex".into(),
30            stop_on_panic: false,
31            stack_size: 0,
32            block_on: None,
33            ping_interval: 1000,
34        }
35    }
36
37    /// Sets the name of the System.
38    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
39        self.name = name.as_ref().into();
40        self
41    }
42
43    /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
44    /// uncaught panic is thrown from a worker thread.
45    ///
46    /// Defaults to false.
47    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
48        self.stop_on_panic = stop_on_panic;
49        self
50    }
51
52    /// Sets the size of the stack (in bytes) for the new thread.
53    pub fn stack_size(mut self, size: usize) -> Self {
54        self.stack_size = size;
55        self
56    }
57
58    /// Sets ping interval for spawned arbiters.
59    ///
60    /// Interval is in milliseconds. By default 5000 milliseconds is set.
61    /// To disable pings set value to zero.
62    pub fn ping_interval(mut self, interval: usize) -> Self {
63        self.ping_interval = interval;
64        self
65    }
66
67    /// Use custom block_on function
68    pub fn block_on<F>(mut self, block_on: F) -> Self
69    where
70        F: Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send + 'static,
71    {
72        self.block_on = Some(Arc::new(block_on));
73        self
74    }
75
76    /// Create new System.
77    ///
78    /// This method panics if it can not create tokio runtime
79    pub fn finish(self) -> SystemRunner {
80        let (stop_tx, stop) = oneshot::channel();
81        let (sys_sender, sys_receiver) = unbounded();
82
83        let config = SystemConfig {
84            block_on: self.block_on,
85            stack_size: self.stack_size,
86            stop_on_panic: self.stop_on_panic,
87        };
88
89        let (arb, controller) = Arbiter::new_system(self.name.clone());
90        let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
91        let system = System::construct(sys_sender, arb.clone(), config);
92
93        // system arbiter
94        let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
95
96        // init system arbiter and run configuration method
97        SystemRunner {
98            stop,
99            support,
100            controller,
101            system,
102            _t: PhantomData,
103        }
104    }
105}
106
107/// Helper object that runs System's event loop
108#[must_use = "SystemRunner must be run"]
109pub struct SystemRunner {
110    stop: oneshot::Receiver<i32>,
111    support: SystemSupport,
112    controller: ArbiterController,
113    system: System,
114    _t: PhantomData<Rc<()>>,
115}
116
117impl SystemRunner {
118    /// Get current system.
119    pub fn system(&self) -> System {
120        self.system.clone()
121    }
122
123    /// This function will start event loop and will finish once the
124    /// `System::stop()` function is called.
125    pub fn run_until_stop(self) -> io::Result<()> {
126        self.run(|| Ok(()))
127    }
128
129    /// This function will start event loop and will finish once the
130    /// `System::stop()` function is called.
131    pub fn run<F>(self, f: F) -> io::Result<()>
132    where
133        F: FnOnce() -> io::Result<()> + 'static,
134    {
135        let SystemRunner {
136            controller,
137            stop,
138            support,
139            system,
140            ..
141        } = self;
142
143        // run loop
144        system.config().block_on(async move {
145            f()?;
146
147            let _ = crate::spawn(support.run());
148            let _ = crate::spawn(controller.run());
149            match stop.await {
150                Ok(code) => {
151                    if code != 0 {
152                        Err(io::Error::new(
153                            io::ErrorKind::Other,
154                            format!("Non-zero exit code: {}", code),
155                        ))
156                    } else {
157                        Ok(())
158                    }
159                }
160                Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
161            }
162        })
163    }
164
165    /// Execute a future and wait for result.
166    pub fn block_on<F, R>(self, fut: F) -> R
167    where
168        F: Future<Output = R> + 'static,
169        R: 'static,
170    {
171        let SystemRunner {
172            controller,
173            support,
174            system,
175            ..
176        } = self;
177
178        system.config().block_on(async move {
179            let _ = crate::spawn(support.run());
180            let _ = crate::spawn(controller.run());
181            fut.await
182        })
183    }
184
185    #[cfg(feature = "tokio")]
186    /// Execute a future and wait for result.
187    pub async fn run_local<F, R>(self, fut: F) -> R
188    where
189        F: Future<Output = R> + 'static,
190        R: 'static,
191    {
192        let SystemRunner {
193            controller,
194            support,
195            ..
196        } = self;
197
198        // run loop
199        tok_io::task::LocalSet::new()
200            .run_until(async move {
201                let _ = crate::spawn(support.run());
202                let _ = crate::spawn(controller.run());
203                fut.await
204            })
205            .await
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use std::sync::mpsc;
212    use std::thread;
213
214    use super::*;
215
216    #[test]
217    fn test_async() {
218        let (tx, rx) = mpsc::channel();
219
220        thread::spawn(move || {
221            let runner = crate::System::build().stop_on_panic(true).finish();
222
223            tx.send(runner.system()).unwrap();
224            let _ = runner.run_until_stop();
225        });
226        let s = System::new("test");
227
228        let sys = rx.recv().unwrap();
229        let id = sys.id();
230        let (tx, rx) = mpsc::channel();
231        sys.arbiter().exec_fn(move || {
232            let _ = tx.send(System::current().id());
233        });
234        let id2 = rx.recv().unwrap();
235        assert_eq!(id, id2);
236
237        let id2 = s
238            .block_on(sys.arbiter().exec(|| System::current().id()))
239            .unwrap();
240        assert_eq!(id, id2);
241
242        let (tx, rx) = mpsc::channel();
243        sys.arbiter().spawn(Box::pin(async move {
244            let _ = tx.send(System::current().id());
245        }));
246        let id2 = rx.recv().unwrap();
247        assert_eq!(id, id2);
248    }
249
250    #[cfg(feature = "tokio")]
251    #[test]
252    fn test_block_on() {
253        let (tx, rx) = mpsc::channel();
254
255        thread::spawn(move || {
256            let runner = crate::System::build()
257                .stop_on_panic(true)
258                .ping_interval(25)
259                .block_on(|fut| {
260                    let rt = tok_io::runtime::Builder::new_current_thread()
261                        .enable_all()
262                        .build()
263                        .unwrap();
264                    tok_io::task::LocalSet::new().block_on(&rt, fut);
265                })
266                .finish();
267
268            tx.send(runner.system()).unwrap();
269            let _ = runner.run_until_stop();
270        });
271        let s = System::new("test");
272
273        let sys = rx.recv().unwrap();
274        let id = sys.id();
275        let (tx, rx) = mpsc::channel();
276        sys.arbiter().exec_fn(move || {
277            let _ = tx.send(System::current().id());
278        });
279        let id2 = rx.recv().unwrap();
280        assert_eq!(id, id2);
281
282        let id2 = s
283            .block_on(sys.arbiter().exec(|| System::current().id()))
284            .unwrap();
285        assert_eq!(id, id2);
286
287        let (tx, rx) = mpsc::channel();
288        sys.arbiter().spawn(async move {
289            futures_timer::Delay::new(std::time::Duration::from_millis(100)).await;
290
291            let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| {
292                recs.unwrap().clone()
293            });
294            let _ = tx.send(recs);
295        });
296        let recs = rx.recv().unwrap();
297
298        assert!(!recs.is_empty());
299        sys.stop();
300    }
301}