1use std::{fmt, future::Future, io, marker::PhantomData, panic, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7pub struct Builder {
13 name: String,
15 stop_on_panic: bool,
17 stack_size: usize,
19 ping_interval: usize,
21 signals: bool,
23 pool_limit: usize,
25 pool_recv_timeout: time::Duration,
26 testing: bool,
28}
29
30impl Builder {
31 pub(super) fn new() -> Self {
32 Builder {
33 name: "ntex".into(),
34 stop_on_panic: false,
35 stack_size: 0,
36 ping_interval: 1000,
37 signals: false,
38 testing: false,
39 pool_limit: 256,
40 pool_recv_timeout: time::Duration::from_secs(60),
41 }
42 }
43
44 #[must_use]
45 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47 self.name = name.as_ref().into();
48 self
49 }
50
51 #[must_use]
52 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
59 self.stop_on_panic = stop_on_panic;
60 self
61 }
62
63 #[must_use]
64 pub fn disable_signals(mut self) -> Self {
68 self.signals = false;
69 self
70 }
71
72 #[must_use]
73 pub fn enable_signals(mut self) -> Self {
77 self.signals = true;
78 self
79 }
80
81 #[must_use]
82 pub fn stack_size(mut self, size: usize) -> Self {
84 self.stack_size = size;
85 self
86 }
87
88 #[must_use]
89 pub fn ping_interval(mut self, interval: usize) -> Self {
94 self.ping_interval = interval;
95 self
96 }
97
98 #[must_use]
99 pub fn thread_pool_limit(mut self, value: usize) -> Self {
102 self.pool_limit = value;
103 self
104 }
105
106 #[must_use]
107 pub fn testing(mut self) -> Self {
109 self.testing = true;
110 self.signals = false;
111 self
112 }
113
114 #[must_use]
115 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
118 where
119 time::Duration: From<T>,
120 {
121 self.pool_recv_timeout = timeout.into();
122 self
123 }
124
125 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
129 let config = SystemConfig {
130 name: self.name.clone(),
131 testing: self.testing,
132 stack_size: self.stack_size,
133 stop_on_panic: self.stop_on_panic,
134 ping_interval: self.ping_interval,
135 pool_limit: self.pool_limit,
136 pool_recv_timeout: self.pool_recv_timeout,
137 runner: Arc::new(runner),
138 };
139 self.build_with(config)
140 }
141
142 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
146 let runner = config.runner.clone();
147
148 SystemRunner {
150 config,
151 runner,
152 signals: self.signals,
153 _t: PhantomData,
154 }
155 }
156}
157
158#[must_use = "SystemRunner must be run"]
160pub struct SystemRunner {
161 config: SystemConfig,
162 runner: Arc<dyn Runner>,
163 signals: bool,
164 _t: PhantomData<Rc<()>>,
165}
166
167impl SystemRunner {
168 pub fn run_until_stop(self) -> io::Result<()> {
171 self.run(|| Ok(()))
172 }
173
174 pub fn run<F>(self, f: F) -> io::Result<()>
177 where
178 F: FnOnce() -> io::Result<()> + 'static,
179 {
180 log::info!("Starting {:?} system", self.config.name);
181
182 let SystemRunner {
183 config,
184 runner,
185 signals,
186 ..
187 } = self;
188
189 crate::driver::block_on(runner.as_ref(), async move {
191 let (system, stop) = System::start(config);
192 if signals {
193 system.enable_signals();
194 }
195
196 f()?;
197
198 match stop.await {
199 Ok(code) => {
200 if code != 0 {
201 Err(io::Error::other(format!("Non-zero exit code: {code}")))
202 } else {
203 Ok(())
204 }
205 }
206 Err(_) => Err(io::Error::other("Closed")),
207 }
208 })
209 }
210
211 pub fn block_on<F, R>(self, fut: F) -> R
213 where
214 F: Future<Output = R> + 'static,
215 R: 'static,
216 {
217 let SystemRunner {
218 config,
219 runner,
220 signals,
221 ..
222 } = self;
223
224 crate::driver::block_on(runner.as_ref(), async move {
225 let (system, _) = System::start(config);
226 if signals {
227 system.enable_signals();
228 }
229
230 let loc = current_location();
231 ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
232 fut.await
233 })
234 }
235
236 #[cfg(feature = "tokio")]
237 pub async fn run_local<F, R>(self, fut: F) -> R
239 where
240 F: Future<Output = R> + 'static,
241 R: 'static,
242 {
243 let SystemRunner { config, .. } = self;
244
245 tok_io::task::LocalSet::new()
247 .run_until(async move {
248 _ = System::start(config);
249
250 let loc = current_location();
251 ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
252 fut.await
253 })
254 .await
255 }
256}
257
258#[track_caller]
259pub(crate) fn current_location() -> &'static panic::Location<'static> {
260 panic::Location::caller()
261}
262
263impl fmt::Debug for SystemRunner {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 f.debug_struct("SystemRunner")
266 .field("config", &self.config)
267 .finish()
268 }
269}