1use std::{fmt, future::Future, io, marker::PhantomData, panic, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7pub struct Builder {
13 name: String,
15 stop_on_panic: bool,
17 stack_size: usize,
19 ping_interval: usize,
21 disable_signals: bool,
23 pool_limit: usize,
25 pool_recv_timeout: time::Duration,
26 testing: bool,
28}
29
30impl Builder {
31 pub(super) fn new() -> Self {
32 Builder {
33 name: "ntex".into(),
34 stop_on_panic: false,
35 stack_size: 0,
36 ping_interval: 1000,
37 disable_signals: false,
38 testing: false,
39 pool_limit: 256,
40 pool_recv_timeout: time::Duration::from_secs(60),
41 }
42 }
43
44 #[must_use]
45 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47 self.name = name.as_ref().into();
48 self
49 }
50
51 #[must_use]
52 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
59 self.stop_on_panic = stop_on_panic;
60 self
61 }
62
63 #[must_use]
64 pub fn disable_signals(mut self) -> Self {
68 self.disable_signals = true;
69 self
70 }
71
72 #[must_use]
73 pub fn stack_size(mut self, size: usize) -> Self {
75 self.stack_size = size;
76 self
77 }
78
79 #[must_use]
80 pub fn ping_interval(mut self, interval: usize) -> Self {
85 self.ping_interval = interval;
86 self
87 }
88
89 #[must_use]
90 pub fn thread_pool_limit(mut self, value: usize) -> Self {
93 self.pool_limit = value;
94 self
95 }
96
97 #[must_use]
98 pub fn testing(mut self) -> Self {
100 self.testing = true;
101 self.disable_signals = true;
102 self
103 }
104
105 #[must_use]
106 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
109 where
110 time::Duration: From<T>,
111 {
112 self.pool_recv_timeout = timeout.into();
113 self
114 }
115
116 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
120 let config = SystemConfig {
121 name: self.name.clone(),
122 testing: self.testing,
123 stack_size: self.stack_size,
124 stop_on_panic: self.stop_on_panic,
125 ping_interval: self.ping_interval,
126 disable_signals: self.disable_signals,
127 pool_limit: self.pool_limit,
128 pool_recv_timeout: self.pool_recv_timeout,
129 runner: Arc::new(runner),
130 };
131 self.build_with(config)
132 }
133
134 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
138 let runner = config.runner.clone();
139
140 SystemRunner {
142 config,
143 runner,
144 _t: PhantomData,
145 }
146 }
147}
148
149#[must_use = "SystemRunner must be run"]
151pub struct SystemRunner {
152 config: SystemConfig,
153 runner: Arc<dyn Runner>,
154 _t: PhantomData<Rc<()>>,
155}
156
157impl SystemRunner {
158 pub fn run_until_stop(self) -> io::Result<()> {
161 self.run(|| Ok(()))
162 }
163
164 pub fn run<F>(self, f: F) -> io::Result<()>
167 where
168 F: FnOnce() -> io::Result<()> + 'static,
169 {
170 log::info!("Starting {:?} system", self.config.name);
171
172 let SystemRunner { config, runner, .. } = self;
173
174 crate::driver::block_on(runner.as_ref(), async move {
176 let (system, stop) = System::start(config);
177 crate::signals::start(&system);
178
179 f()?;
180
181 match stop.await {
182 Ok(code) => {
183 if code != 0 {
184 Err(io::Error::other(format!("Non-zero exit code: {code}")))
185 } else {
186 Ok(())
187 }
188 }
189 Err(_) => Err(io::Error::other("Closed")),
190 }
191 })
192 }
193
194 pub fn block_on<F, R>(self, fut: F) -> R
196 where
197 F: Future<Output = R> + 'static,
198 R: 'static,
199 {
200 let SystemRunner { config, runner, .. } = self;
201
202 crate::driver::block_on(runner.as_ref(), async move {
203 let (system, _) = System::start(config);
204 crate::signals::start(&system);
205
206 let loc = current_location();
207 ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
208 fut.await
209 })
210 }
211
212 #[cfg(feature = "tokio")]
213 pub async fn run_local<F, R>(self, fut: F) -> R
215 where
216 F: Future<Output = R> + 'static,
217 R: 'static,
218 {
219 let SystemRunner { config, .. } = self;
220
221 tok_io::task::LocalSet::new()
223 .run_until(async move {
224 _ = System::start(config);
225
226 let loc = current_location();
227 ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
228 fut.await
229 })
230 .await
231 }
232}
233
234#[track_caller]
235pub(crate) fn current_location() -> &'static panic::Location<'static> {
236 panic::Location::caller()
237}
238
239impl fmt::Debug for SystemRunner {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 f.debug_struct("SystemRunner")
242 .field("config", &self.config)
243 .finish()
244 }
245}