1use std::{future::Future, io, pin::Pin, sync::Arc};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter};
6use crate::{system::SystemConfig, System};
7
8pub struct Builder {
14 name: String,
16 stop_on_panic: bool,
18 stack_size: usize,
20 block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
22}
23
24impl Builder {
25 pub(super) fn new() -> Self {
26 Builder {
27 name: "ntex".into(),
28 stop_on_panic: false,
29 stack_size: 0,
30 block_on: None,
31 }
32 }
33
34 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
36 self.name = name.as_ref().into();
37 self
38 }
39
40 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
45 self.stop_on_panic = stop_on_panic;
46 self
47 }
48
49 pub fn stack_size(mut self, size: usize) -> Self {
51 self.stack_size = size;
52 self
53 }
54
55 pub fn block_on<F>(mut self, block_on: F) -> Self
57 where
58 F: Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send + 'static,
59 {
60 self.block_on = Some(Arc::new(block_on));
61 self
62 }
63
64 pub fn finish(self) -> SystemRunner {
68 let (stop_tx, stop) = oneshot::channel();
69 let (sys_sender, sys_receiver) = unbounded();
70
71 let config = SystemConfig {
72 block_on: self.block_on,
73 stack_size: self.stack_size,
74 stop_on_panic: self.stop_on_panic,
75 };
76
77 let (arb, arb_controller) = Arbiter::new_system();
78 let system = System::construct(sys_sender, arb, config);
79
80 let arb = SystemArbiter::new(stop_tx, sys_receiver);
82
83 SystemRunner {
85 stop,
86 arb,
87 arb_controller,
88 system,
89 }
90 }
91}
92
93#[must_use = "SystemRunner must be run"]
95pub struct SystemRunner {
96 stop: oneshot::Receiver<i32>,
97 arb: SystemArbiter,
98 arb_controller: ArbiterController,
99 system: System,
100}
101
102impl SystemRunner {
103 pub fn system(&self) -> System {
105 self.system.clone()
106 }
107
108 pub fn run_until_stop(self) -> io::Result<()> {
111 self.run(|| Ok(()))
112 }
113
114 #[inline]
117 pub fn run<F>(self, f: F) -> io::Result<()>
118 where
119 F: FnOnce() -> io::Result<()> + 'static,
120 {
121 let SystemRunner {
122 stop,
123 arb,
124 arb_controller,
125 system,
126 ..
127 } = self;
128
129 system.config().block_on(async move {
131 f()?;
132
133 let _ = crate::spawn(arb);
134 let _ = crate::spawn(arb_controller);
135 match stop.await {
136 Ok(code) => {
137 if code != 0 {
138 Err(io::Error::new(
139 io::ErrorKind::Other,
140 format!("Non-zero exit code: {}", code),
141 ))
142 } else {
143 Ok(())
144 }
145 }
146 Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
147 }
148 })
149 }
150
151 #[inline]
153 pub fn block_on<F, R>(self, fut: F) -> R
154 where
155 F: Future<Output = R> + 'static,
156 R: 'static,
157 {
158 let SystemRunner {
159 arb,
160 arb_controller,
161 system,
162 ..
163 } = self;
164
165 system.config().block_on(async move {
166 let _ = crate::spawn(arb);
167 let _ = crate::spawn(arb_controller);
168 fut.await
169 })
170 }
171
172 #[cfg(feature = "tokio")]
173 pub async fn run_local<F, R>(self, fut: F) -> R
175 where
176 F: Future<Output = R> + 'static,
177 R: 'static,
178 {
179 let SystemRunner {
180 arb,
181 arb_controller,
182 ..
183 } = self;
184
185 tok_io::task::LocalSet::new()
187 .run_until(async move {
188 let _ = crate::spawn(arb);
189 let _ = crate::spawn(arb_controller);
190 fut.await
191 })
192 .await
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::sync::mpsc;
199 use std::thread;
200
201 use super::*;
202
203 #[test]
204 fn test_async() {
205 let (tx, rx) = mpsc::channel();
206
207 thread::spawn(move || {
208 let runner = crate::System::build().stop_on_panic(true).finish();
209
210 tx.send(runner.system()).unwrap();
211 let _ = runner.run_until_stop();
212 });
213 let s = System::new("test");
214
215 let sys = rx.recv().unwrap();
216 let id = sys.id();
217 let (tx, rx) = mpsc::channel();
218 sys.arbiter().exec_fn(move || {
219 let _ = tx.send(System::current().id());
220 });
221 let id2 = rx.recv().unwrap();
222 assert_eq!(id, id2);
223
224 let id2 = s
225 .block_on(sys.arbiter().exec(|| System::current().id()))
226 .unwrap();
227 assert_eq!(id, id2);
228
229 let (tx, rx) = mpsc::channel();
230 sys.arbiter().spawn(Box::pin(async move {
231 let _ = tx.send(System::current().id());
232 }));
233 let id2 = rx.recv().unwrap();
234 assert_eq!(id, id2);
235 }
236
237 #[cfg(feature = "tokio")]
238 #[test]
239 fn test_block_on() {
240 let (tx, rx) = mpsc::channel();
241
242 thread::spawn(move || {
243 let runner = crate::System::build()
244 .stop_on_panic(true)
245 .block_on(|fut| {
246 let rt = tok_io::runtime::Builder::new_current_thread()
247 .enable_all()
248 .build()
249 .unwrap();
250 tok_io::task::LocalSet::new().block_on(&rt, fut);
251 })
252 .finish();
253
254 tx.send(runner.system()).unwrap();
255 let _ = runner.run_until_stop();
256 });
257 let s = System::new("test");
258
259 let sys = rx.recv().unwrap();
260 let id = sys.id();
261 let (tx, rx) = mpsc::channel();
262 sys.arbiter().exec_fn(move || {
263 let _ = tx.send(System::current().id());
264 });
265 let id2 = rx.recv().unwrap();
266 assert_eq!(id, id2);
267
268 let id2 = s
269 .block_on(sys.arbiter().exec(|| System::current().id()))
270 .unwrap();
271 assert_eq!(id, id2);
272
273 sys.stop();
274 }
275}