1use std::{future::Future, io, marker::PhantomData, pin::Pin, rc::Rc, sync::Arc};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
7
8pub struct Builder {
14 name: String,
16 stop_on_panic: bool,
18 stack_size: usize,
20 ping_interval: usize,
22 block_on: Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
24}
25
26impl Builder {
27 pub(super) fn new() -> Self {
28 Builder {
29 name: "ntex".into(),
30 stop_on_panic: false,
31 stack_size: 0,
32 block_on: None,
33 ping_interval: 1000,
34 }
35 }
36
37 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
39 self.name = name.as_ref().into();
40 self
41 }
42
43 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
48 self.stop_on_panic = stop_on_panic;
49 self
50 }
51
52 pub fn stack_size(mut self, size: usize) -> Self {
54 self.stack_size = size;
55 self
56 }
57
58 pub fn ping_interval(mut self, interval: usize) -> Self {
63 self.ping_interval = interval;
64 self
65 }
66
67 pub fn block_on<F>(mut self, block_on: F) -> Self
69 where
70 F: Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send + 'static,
71 {
72 self.block_on = Some(Arc::new(block_on));
73 self
74 }
75
76 pub fn finish(self) -> SystemRunner {
80 let (stop_tx, stop) = oneshot::channel();
81 let (sys_sender, sys_receiver) = unbounded();
82
83 let config = SystemConfig {
84 block_on: self.block_on,
85 stack_size: self.stack_size,
86 stop_on_panic: self.stop_on_panic,
87 };
88
89 let (arb, controller) = Arbiter::new_system(self.name.clone());
90 let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
91 let system = System::construct(sys_sender, arb.clone(), config);
92
93 let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
95
96 SystemRunner {
98 stop,
99 support,
100 controller,
101 system,
102 _t: PhantomData,
103 }
104 }
105}
106
107#[must_use = "SystemRunner must be run"]
109pub struct SystemRunner {
110 stop: oneshot::Receiver<i32>,
111 support: SystemSupport,
112 controller: ArbiterController,
113 system: System,
114 _t: PhantomData<Rc<()>>,
115}
116
117impl SystemRunner {
118 pub fn system(&self) -> System {
120 self.system.clone()
121 }
122
123 pub fn run_until_stop(self) -> io::Result<()> {
126 self.run(|| Ok(()))
127 }
128
129 pub fn run<F>(self, f: F) -> io::Result<()>
132 where
133 F: FnOnce() -> io::Result<()> + 'static,
134 {
135 let SystemRunner {
136 controller,
137 stop,
138 support,
139 system,
140 ..
141 } = self;
142
143 system.config().block_on(async move {
145 f()?;
146
147 let _ = crate::spawn(support.run());
148 let _ = crate::spawn(controller.run());
149 match stop.await {
150 Ok(code) => {
151 if code != 0 {
152 Err(io::Error::new(
153 io::ErrorKind::Other,
154 format!("Non-zero exit code: {}", code),
155 ))
156 } else {
157 Ok(())
158 }
159 }
160 Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
161 }
162 })
163 }
164
165 pub fn block_on<F, R>(self, fut: F) -> R
167 where
168 F: Future<Output = R> + 'static,
169 R: 'static,
170 {
171 let SystemRunner {
172 controller,
173 support,
174 system,
175 ..
176 } = self;
177
178 system.config().block_on(async move {
179 let _ = crate::spawn(support.run());
180 let _ = crate::spawn(controller.run());
181 fut.await
182 })
183 }
184
185 #[cfg(feature = "tokio")]
186 pub async fn run_local<F, R>(self, fut: F) -> R
188 where
189 F: Future<Output = R> + 'static,
190 R: 'static,
191 {
192 let SystemRunner {
193 controller,
194 support,
195 ..
196 } = self;
197
198 tok_io::task::LocalSet::new()
200 .run_until(async move {
201 let _ = crate::spawn(support.run());
202 let _ = crate::spawn(controller.run());
203 fut.await
204 })
205 .await
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use std::sync::mpsc;
212 use std::thread;
213
214 use super::*;
215
216 #[test]
217 fn test_async() {
218 let (tx, rx) = mpsc::channel();
219
220 thread::spawn(move || {
221 let runner = crate::System::build().stop_on_panic(true).finish();
222
223 tx.send(runner.system()).unwrap();
224 let _ = runner.run_until_stop();
225 });
226 let s = System::new("test");
227
228 let sys = rx.recv().unwrap();
229 let id = sys.id();
230 let (tx, rx) = mpsc::channel();
231 sys.arbiter().exec_fn(move || {
232 let _ = tx.send(System::current().id());
233 });
234 let id2 = rx.recv().unwrap();
235 assert_eq!(id, id2);
236
237 let id2 = s
238 .block_on(sys.arbiter().exec(|| System::current().id()))
239 .unwrap();
240 assert_eq!(id, id2);
241
242 let (tx, rx) = mpsc::channel();
243 sys.arbiter().spawn(Box::pin(async move {
244 let _ = tx.send(System::current().id());
245 }));
246 let id2 = rx.recv().unwrap();
247 assert_eq!(id, id2);
248 }
249
250 #[cfg(feature = "tokio")]
251 #[test]
252 fn test_block_on() {
253 let (tx, rx) = mpsc::channel();
254
255 thread::spawn(move || {
256 let runner = crate::System::build()
257 .stop_on_panic(true)
258 .ping_interval(25)
259 .block_on(|fut| {
260 let rt = tok_io::runtime::Builder::new_current_thread()
261 .enable_all()
262 .build()
263 .unwrap();
264 tok_io::task::LocalSet::new().block_on(&rt, fut);
265 })
266 .finish();
267
268 tx.send(runner.system()).unwrap();
269 let _ = runner.run_until_stop();
270 });
271 let s = System::new("test");
272
273 let sys = rx.recv().unwrap();
274 let id = sys.id();
275 let (tx, rx) = mpsc::channel();
276 sys.arbiter().exec_fn(move || {
277 let _ = tx.send(System::current().id());
278 });
279 let id2 = rx.recv().unwrap();
280 assert_eq!(id, id2);
281
282 let id2 = s
283 .block_on(sys.arbiter().exec(|| System::current().id()))
284 .unwrap();
285 assert_eq!(id, id2);
286
287 let (tx, rx) = mpsc::channel();
288 sys.arbiter().spawn(async move {
289 futures_timer::Delay::new(std::time::Duration::from_millis(100)).await;
290
291 let recs = System::list_arbiter_pings(Arbiter::current().id(), |recs| {
292 recs.unwrap().clone()
293 });
294 let _ = tx.send(recs);
295 });
296 let recs = rx.recv().unwrap();
297
298 assert!(!recs.is_empty());
299 sys.stop();
300 }
301}