1use std::{future::Future, io, marker::PhantomData, pin::Pin, rc::Rc, sync::Arc};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
7
8pub struct Builder {
14 name: String,
16 stop_on_panic: bool,
18 stack_size: usize,
20 ping_interval: usize,
22 block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
24}
25
26impl Builder {
27 pub(super) fn new() -> Self {
28 Builder {
29 name: "ntex".into(),
30 stop_on_panic: false,
31 stack_size: 0,
32 block_on: None,
33 ping_interval: 1000,
34 }
35 }
36
37 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
39 self.name = name.as_ref().into();
40 self
41 }
42
43 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
48 self.stop_on_panic = stop_on_panic;
49 self
50 }
51
52 pub fn stack_size(mut self, size: usize) -> Self {
54 self.stack_size = size;
55 self
56 }
57
58 pub fn ping_interval(mut self, interval: usize) -> Self {
63 self.ping_interval = interval;
64 self
65 }
66
67 pub fn block_on<F>(mut self, block_on: F) -> Self
69 where
70 F: Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send + 'static,
71 {
72 self.block_on = Some(Arc::new(block_on));
73 self
74 }
75
76 pub fn finish(self) -> SystemRunner {
80 let (stop_tx, stop) = oneshot::channel();
81 let (sys_sender, sys_receiver) = unbounded();
82
83 let config = SystemConfig {
84 block_on: self.block_on,
85 stack_size: self.stack_size,
86 stop_on_panic: self.stop_on_panic,
87 };
88
89 let (arb, controller) = Arbiter::new_system(self.name.clone());
90 let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
91 let system = System::construct(sys_sender, arb.clone(), config);
92
93 let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
95
96 SystemRunner {
98 stop,
99 support,
100 controller,
101 system,
102 _t: PhantomData,
103 }
104 }
105}
106
107#[must_use = "SystemRunner must be run"]
109pub struct SystemRunner {
110 stop: oneshot::Receiver<i32>,
111 support: SystemSupport,
112 controller: ArbiterController,
113 system: System,
114 _t: PhantomData<Rc<()>>,
115}
116
117impl SystemRunner {
118 pub fn system(&self) -> System {
120 self.system.clone()
121 }
122
123 pub fn run_until_stop(self) -> io::Result<()> {
126 self.run(|| Ok(()))
127 }
128
129 pub fn run<F>(self, f: F) -> io::Result<()>
132 where
133 F: FnOnce() -> io::Result<()> + 'static,
134 {
135 let SystemRunner {
136 controller,
137 stop,
138 support,
139 system,
140 ..
141 } = self;
142
143 system.config().block_on(async move {
145 f()?;
146
147 let _ = crate::spawn(support.run());
148 let _ = crate::spawn(controller.run());
149 match stop.await {
150 Ok(code) => {
151 if code != 0 {
152 Err(io::Error::other(format!("Non-zero exit code: {}", code)))
153 } else {
154 Ok(())
155 }
156 }
157 Err(_) => Err(io::Error::other("Closed")),
158 }
159 })
160 }
161
162 pub fn block_on<F, R>(self, fut: F) -> R
164 where
165 F: Future<Output = R> + 'static,
166 R: 'static,
167 {
168 let SystemRunner {
169 controller,
170 support,
171 system,
172 ..
173 } = self;
174
175 system.config().block_on(async move {
176 let _ = crate::spawn(support.run());
177 let _ = crate::spawn(controller.run());
178 fut.await
179 })
180 }
181
182 #[cfg(feature = "tokio")]
183 pub async fn run_local<F, R>(self, fut: F) -> R
185 where
186 F: Future<Output = R> + 'static,
187 R: 'static,
188 {
189 let SystemRunner {
190 controller,
191 support,
192 ..
193 } = self;
194
195 tok_io::task::LocalSet::new()
197 .run_until(async move {
198 let _ = crate::spawn(support.run());
199 let _ = crate::spawn(controller.run());
200 fut.await
201 })
202 .await
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use std::sync::mpsc;
209 use std::thread;
210
211 use super::*;
212
213 #[test]
214 fn test_async() {
215 let (tx, rx) = mpsc::channel();
216
217 thread::spawn(move || {
218 let runner = crate::System::build().stop_on_panic(true).finish();
219
220 tx.send(runner.system()).unwrap();
221 let _ = runner.run_until_stop();
222 });
223 let s = System::new("test");
224
225 let sys = rx.recv().unwrap();
226 let id = sys.id();
227 let (tx, rx) = mpsc::channel();
228 sys.arbiter().exec_fn(move || {
229 let _ = tx.send(System::current().id());
230 });
231 let id2 = rx.recv().unwrap();
232 assert_eq!(id, id2);
233
234 let id2 = s
235 .block_on(sys.arbiter().exec(|| System::current().id()))
236 .unwrap();
237 assert_eq!(id, id2);
238
239 let (tx, rx) = mpsc::channel();
240 sys.arbiter().spawn(Box::pin(async move {
241 let _ = tx.send(System::current().id());
242 }));
243 let id2 = rx.recv().unwrap();
244 assert_eq!(id, id2);
245 }
246
247 #[cfg(feature = "tokio")]
248 #[test]
249 fn test_block_on() {
250 let (tx, rx) = mpsc::channel();
251
252 thread::spawn(move || {
253 let runner = crate::System::build()
254 .stop_on_panic(true)
255 .ping_interval(25)
256 .block_on(|fut| {
257 let rt = tok_io::runtime::Builder::new_current_thread()
258 .enable_all()
259 .build()
260 .unwrap();
261 tok_io::task::LocalSet::new().block_on(&rt, fut);
262 })
263 .finish();
264
265 tx.send(runner.system()).unwrap();
266 let _ = runner.run_until_stop();
267 });
268 let s = System::new("test");
269
270 let sys = rx.recv().unwrap();
271 let id = sys.id();
272 let (tx, rx) = mpsc::channel();
273 sys.arbiter().exec_fn(move || {
274 let _ = tx.send(System::current().id());
275 });
276 let id2 = rx.recv().unwrap();
277 assert_eq!(id, id2);
278
279 let id2 = s
280 .block_on(sys.arbiter().exec(|| System::current().id()))
281 .unwrap();
282 assert_eq!(id, id2);
283
284 let (tx, rx) = mpsc::channel();
285 sys.arbiter().spawn(async move {
286 futures_timer::Delay::new(std::time::Duration::from_millis(100)).await;
287
288 let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| {
289 recs.unwrap().clone()
290 });
291 let _ = tx.send(recs);
292 });
293 let recs = rx.recv().unwrap();
294
295 assert!(!recs.is_empty());
296 sys.stop();
297 }
298}