1use std::{fmt, future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::driver::Runner;
7use crate::pool::ThreadPool;
8use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
9
10#[derive(Debug, Clone)]
11pub struct Builder {
17 name: String,
19 stop_on_panic: bool,
21 stack_size: usize,
23 ping_interval: usize,
25 pool_limit: usize,
27 pool_recv_timeout: time::Duration,
28 testing: bool,
30}
31
32impl Builder {
33 pub(super) fn new() -> Self {
34 Builder {
35 name: "ntex".into(),
36 stop_on_panic: false,
37 stack_size: 0,
38 ping_interval: 1000,
39 testing: false,
40 pool_limit: 256,
41 pool_recv_timeout: time::Duration::from_secs(60),
42 }
43 }
44
45 #[must_use]
46 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
48 self.name = name.as_ref().into();
49 self
50 }
51
52 #[must_use]
53 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
60 self.stop_on_panic = stop_on_panic;
61 self
62 }
63
64 #[must_use]
65 pub fn stack_size(mut self, size: usize) -> Self {
67 self.stack_size = size;
68 self
69 }
70
71 #[must_use]
72 pub fn ping_interval(mut self, interval: usize) -> Self {
77 self.ping_interval = interval;
78 self
79 }
80
81 #[must_use]
82 pub fn thread_pool_limit(mut self, value: usize) -> Self {
85 self.pool_limit = value;
86 self
87 }
88
89 #[must_use]
90 pub fn testing(mut self) -> Self {
92 self.testing = true;
93 self
94 }
95
96 #[must_use]
97 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
100 where
101 time::Duration: From<T>,
102 {
103 self.pool_recv_timeout = timeout.into();
104 self
105 }
106
107 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
111 let name = self.name.clone();
112 let testing = self.testing;
113 let stack_size = self.stack_size;
114 let stop_on_panic = self.stop_on_panic;
115
116 self.build_with(SystemConfig {
117 name,
118 testing,
119 stack_size,
120 stop_on_panic,
121 runner: Arc::new(runner),
122 })
123 }
124
125 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
129 let (stop_tx, stop) = oneshot::channel();
130 let (sys_sender, sys_receiver) = unbounded();
131
132 let pool = ThreadPool::new(&self.name, self.pool_limit, self.pool_recv_timeout);
134
135 let (arb, controller) = Arbiter::new_system(self.name.clone());
136 let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
137 let system = System::construct(sys_sender, arb, config.clone(), pool);
138
139 let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
141
142 SystemRunner {
144 stop,
145 support,
146 controller,
147 system,
148 config,
149 _t: PhantomData,
150 }
151 }
152}
153
154#[must_use = "SystemRunner must be run"]
156pub struct SystemRunner {
157 stop: oneshot::Receiver<i32>,
158 support: SystemSupport,
159 controller: ArbiterController,
160 system: System,
161 config: SystemConfig,
162 _t: PhantomData<Rc<()>>,
163}
164
165impl SystemRunner {
166 pub fn system(&self) -> System {
168 self.system.clone()
169 }
170
171 pub fn run_until_stop(self) -> io::Result<()> {
174 self.run(|| Ok(()))
175 }
176
177 pub fn run<F>(self, f: F) -> io::Result<()>
180 where
181 F: FnOnce() -> io::Result<()> + 'static,
182 {
183 log::info!("Starting {:?} system", self.config.name);
184
185 let SystemRunner {
186 controller,
187 stop,
188 support,
189 config,
190 ..
191 } = self;
192
193 config.block_on(async move {
195 f()?;
196
197 crate::spawn(support.run());
198 crate::spawn(controller.run());
199 match stop.await {
200 Ok(code) => {
201 if code != 0 {
202 Err(io::Error::other(format!("Non-zero exit code: {code}")))
203 } else {
204 Ok(())
205 }
206 }
207 Err(_) => Err(io::Error::other("Closed")),
208 }
209 })
210 }
211
212 pub fn block_on<F, R>(self, fut: F) -> R
214 where
215 F: Future<Output = R> + 'static,
216 R: 'static,
217 {
218 let SystemRunner {
219 controller,
220 support,
221 config,
222 ..
223 } = self;
224
225 config.block_on(async move {
226 crate::spawn(support.run());
227 crate::spawn(controller.run());
228 fut.await
229 })
230 }
231
232 #[cfg(feature = "tokio")]
233 pub async fn run_local<F, R>(self, fut: F) -> R
235 where
236 F: Future<Output = R> + 'static,
237 R: 'static,
238 {
239 let SystemRunner {
240 controller,
241 support,
242 ..
243 } = self;
244
245 tok_io::task::LocalSet::new()
247 .run_until(async move {
248 crate::spawn(support.run());
249 crate::spawn(controller.run());
250 fut.await
251 })
252 .await
253 }
254}
255
256impl fmt::Debug for SystemRunner {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("SystemRunner")
259 .field("system", &self.system)
260 .field("support", &self.support)
261 .field("config", &self.config)
262 .finish()
263 }
264}