ntex_rt/
builder.rs

1use std::{future::Future, io, pin::Pin, sync::Arc};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter};
6use crate::{system::SystemConfig, System};
7
8/// Builder struct for a ntex runtime.
9///
10/// Either use `Builder::build` to create a system and start actors.
11/// Alternatively, use `Builder::run` to start the tokio runtime and
12/// run a function in its context.
13pub struct Builder {
14    /// Name of the System. Defaults to "ntex" if unset.
15    name: String,
16    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
17    stop_on_panic: bool,
18    /// New thread stack size
19    stack_size: usize,
20    /// Block on fn
21    block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
22}
23
24impl Builder {
25    pub(super) fn new() -> Self {
26        Builder {
27            name: "ntex".into(),
28            stop_on_panic: false,
29            stack_size: 0,
30            block_on: None,
31        }
32    }
33
34    /// Sets the name of the System.
35    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
36        self.name = name.as_ref().into();
37        self
38    }
39
40    /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
41    /// uncaught panic is thrown from a worker thread.
42    ///
43    /// Defaults to false.
44    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
45        self.stop_on_panic = stop_on_panic;
46        self
47    }
48
49    /// Sets the size of the stack (in bytes) for the new thread.
50    pub fn stack_size(mut self, size: usize) -> Self {
51        self.stack_size = size;
52        self
53    }
54
55    /// Use custom block_on function
56    pub fn block_on<F>(mut self, block_on: F) -> Self
57    where
58        F: Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send + 'static,
59    {
60        self.block_on = Some(Arc::new(block_on));
61        self
62    }
63
64    /// Create new System.
65    ///
66    /// This method panics if it can not create tokio runtime
67    pub fn finish(self) -> SystemRunner {
68        let (stop_tx, stop) = oneshot::channel();
69        let (sys_sender, sys_receiver) = unbounded();
70
71        let config = SystemConfig {
72            block_on: self.block_on,
73            stack_size: self.stack_size,
74            stop_on_panic: self.stop_on_panic,
75        };
76
77        let (arb, arb_controller) = Arbiter::new_system();
78        let system = System::construct(sys_sender, arb, config);
79
80        // system arbiter
81        let arb = SystemArbiter::new(stop_tx, sys_receiver);
82
83        // init system arbiter and run configuration method
84        SystemRunner {
85            stop,
86            arb,
87            arb_controller,
88            system,
89        }
90    }
91}
92
93/// Helper object that runs System's event loop
94#[must_use = "SystemRunner must be run"]
95pub struct SystemRunner {
96    stop: oneshot::Receiver<i32>,
97    arb: SystemArbiter,
98    arb_controller: ArbiterController,
99    system: System,
100}
101
102impl SystemRunner {
103    /// Get current system.
104    pub fn system(&self) -> System {
105        self.system.clone()
106    }
107
108    /// This function will start event loop and will finish once the
109    /// `System::stop()` function is called.
110    pub fn run_until_stop(self) -> io::Result<()> {
111        self.run(|| Ok(()))
112    }
113
114    /// This function will start event loop and will finish once the
115    /// `System::stop()` function is called.
116    #[inline]
117    pub fn run<F>(self, f: F) -> io::Result<()>
118    where
119        F: FnOnce() -> io::Result<()> + 'static,
120    {
121        let SystemRunner {
122            stop,
123            arb,
124            arb_controller,
125            system,
126            ..
127        } = self;
128
129        // run loop
130        system.config().block_on(async move {
131            f()?;
132
133            let _ = crate::spawn(arb);
134            let _ = crate::spawn(arb_controller);
135            match stop.await {
136                Ok(code) => {
137                    if code != 0 {
138                        Err(io::Error::new(
139                            io::ErrorKind::Other,
140                            format!("Non-zero exit code: {}", code),
141                        ))
142                    } else {
143                        Ok(())
144                    }
145                }
146                Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
147            }
148        })
149    }
150
151    /// Execute a future and wait for result.
152    #[inline]
153    pub fn block_on<F, R>(self, fut: F) -> R
154    where
155        F: Future<Output = R> + 'static,
156        R: 'static,
157    {
158        let SystemRunner {
159            arb,
160            arb_controller,
161            system,
162            ..
163        } = self;
164
165        system.config().block_on(async move {
166            let _ = crate::spawn(arb);
167            let _ = crate::spawn(arb_controller);
168            fut.await
169        })
170    }
171
172    #[cfg(feature = "tokio")]
173    /// Execute a future and wait for result.
174    pub async fn run_local<F, R>(self, fut: F) -> R
175    where
176        F: Future<Output = R> + 'static,
177        R: 'static,
178    {
179        let SystemRunner {
180            arb,
181            arb_controller,
182            ..
183        } = self;
184
185        // run loop
186        tok_io::task::LocalSet::new()
187            .run_until(async move {
188                let _ = crate::spawn(arb);
189                let _ = crate::spawn(arb_controller);
190                fut.await
191            })
192            .await
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::sync::mpsc;
199    use std::thread;
200
201    use super::*;
202
203    #[test]
204    fn test_async() {
205        let (tx, rx) = mpsc::channel();
206
207        thread::spawn(move || {
208            let runner = crate::System::build().stop_on_panic(true).finish();
209
210            tx.send(runner.system()).unwrap();
211            let _ = runner.run_until_stop();
212        });
213        let s = System::new("test");
214
215        let sys = rx.recv().unwrap();
216        let id = sys.id();
217        let (tx, rx) = mpsc::channel();
218        sys.arbiter().exec_fn(move || {
219            let _ = tx.send(System::current().id());
220        });
221        let id2 = rx.recv().unwrap();
222        assert_eq!(id, id2);
223
224        let id2 = s
225            .block_on(sys.arbiter().exec(|| System::current().id()))
226            .unwrap();
227        assert_eq!(id, id2);
228
229        let (tx, rx) = mpsc::channel();
230        sys.arbiter().spawn(Box::pin(async move {
231            let _ = tx.send(System::current().id());
232        }));
233        let id2 = rx.recv().unwrap();
234        assert_eq!(id, id2);
235    }
236
237    #[cfg(feature = "tokio")]
238    #[test]
239    fn test_block_on() {
240        let (tx, rx) = mpsc::channel();
241
242        thread::spawn(move || {
243            let runner = crate::System::build()
244                .stop_on_panic(true)
245                .block_on(|fut| {
246                    let rt = tok_io::runtime::Builder::new_current_thread()
247                        .enable_all()
248                        .build()
249                        .unwrap();
250                    tok_io::task::LocalSet::new().block_on(&rt, fut);
251                })
252                .finish();
253
254            tx.send(runner.system()).unwrap();
255            let _ = runner.run_until_stop();
256        });
257        let s = System::new("test");
258
259        let sys = rx.recv().unwrap();
260        let id = sys.id();
261        let (tx, rx) = mpsc::channel();
262        sys.arbiter().exec_fn(move || {
263            let _ = tx.send(System::current().id());
264        });
265        let id2 = rx.recv().unwrap();
266        assert_eq!(id, id2);
267
268        let id2 = s
269            .block_on(sys.arbiter().exec(|| System::current().id()))
270            .unwrap();
271        assert_eq!(id, id2);
272
273        sys.stop();
274    }
275}