1use std::borrow::Cow;
2use std::io;
3
4use futures::channel::mpsc::unbounded;
5use futures::channel::oneshot::{channel, Receiver};
6use futures::future::{lazy, Future, FutureExt};
7use tokio::task::LocalSet;
8
9use crate::arbiter::{Arbiter, SystemArbiter};
10use crate::runtime::Runtime;
11use crate::system::System;
12
13pub struct Builder {
19 name: Cow<'static, str>,
21
22 stop_on_panic: bool,
24}
25
26impl Builder {
27 pub(crate) fn new() -> Self {
28 Builder {
29 name: Cow::Borrowed("actix"),
30 stop_on_panic: false,
31 }
32 }
33
34 pub fn name<T: Into<String>>(mut self, name: T) -> Self {
36 self.name = Cow::Owned(name.into());
37 self
38 }
39
40 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
45 self.stop_on_panic = stop_on_panic;
46 self
47 }
48
49 pub fn build(self) -> SystemRunner {
53 self.create_runtime(|| {})
54 }
55
56 pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
60 self.create_async_runtime(local)
61 }
62
63 pub fn run<F>(self, f: F) -> io::Result<()>
67 where
68 F: FnOnce() + 'static,
69 {
70 self.create_runtime(f).run()
71 }
72
73 fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
74 let (stop_tx, stop) = channel();
75 let (sys_sender, sys_receiver) = unbounded();
76
77 let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
78
79 let arb = SystemArbiter::new(stop_tx, sys_receiver);
81
82 let _ = local.spawn_local(arb);
84
85 AsyncSystemRunner { stop, system }
86 }
87
88 fn create_runtime<F>(self, f: F) -> SystemRunner
89 where
90 F: FnOnce() + 'static,
91 {
92 let (stop_tx, stop) = channel();
93 let (sys_sender, sys_receiver) = unbounded();
94
95 let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
96
97 let arb = SystemArbiter::new(stop_tx, sys_receiver);
99
100 let mut rt = Runtime::new().unwrap();
101 rt.spawn(arb);
102
103 rt.block_on(lazy(move |_| f()));
105
106 SystemRunner { rt, stop, system }
107 }
108}
109
110#[derive(Debug)]
111pub(crate) struct AsyncSystemRunner {
112 stop: Receiver<i32>,
113 system: System,
114}
115
116impl AsyncSystemRunner {
117 pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
120 let AsyncSystemRunner { stop, .. } = self;
121
122 lazy(|_| {
124 Arbiter::run_system(None);
125 async {
126 let res = match stop.await {
127 Ok(code) => {
128 if code != 0 {
129 Err(io::Error::new(
130 io::ErrorKind::Other,
131 format!("Non-zero exit code: {}", code),
132 ))
133 } else {
134 Ok(())
135 }
136 }
137 Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
138 };
139 Arbiter::stop_system();
140 return res;
141 }
142 })
143 .flatten()
144 }
145}
146
147#[must_use = "SystemRunner must be run"]
149#[derive(Debug)]
150pub struct SystemRunner {
151 rt: Runtime,
152 stop: Receiver<i32>,
153 system: System,
154}
155
156impl SystemRunner {
157 pub fn run(self) -> io::Result<()> {
160 let SystemRunner { mut rt, stop, .. } = self;
161
162 Arbiter::run_system(Some(&rt));
164 let result = match rt.block_on(stop) {
165 Ok(code) => {
166 if code != 0 {
167 Err(io::Error::new(
168 io::ErrorKind::Other,
169 format!("Non-zero exit code: {}", code),
170 ))
171 } else {
172 Ok(())
173 }
174 }
175 Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
176 };
177 Arbiter::stop_system();
178 result
179 }
180
181 pub fn block_on<F, O>(&mut self, fut: F) -> O
183 where
184 F: Future<Output = O> + 'static,
185 {
186 Arbiter::run_system(Some(&self.rt));
187 let res = self.rt.block_on(fut);
188 Arbiter::stop_system();
189 res
190 }
191}