1use std::{fmt, future::Future, io, marker::PhantomData, panic, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7pub struct Builder {
13 name: String,
15 stop_on_panic: bool,
17 stack_size: usize,
19 ping_interval: usize,
21 pool_limit: usize,
23 pool_bounded: bool,
25 pool_recv_timeout: time::Duration,
26 testing: bool,
28}
29
30impl Builder {
31 pub(super) fn new() -> Self {
32 Builder {
33 name: "ntex".into(),
34 stop_on_panic: false,
35 stack_size: 0,
36 ping_interval: 1000,
37 testing: false,
38 pool_limit: 256,
39 pool_bounded: true,
40 pool_recv_timeout: time::Duration::from_secs(60),
41 }
42 }
43
44 #[must_use]
45 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47 self.name = name.as_ref().into();
48 self
49 }
50
51 #[must_use]
52 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
59 self.stop_on_panic = stop_on_panic;
60 self
61 }
62
63 #[must_use]
64 pub fn stack_size(mut self, size: usize) -> Self {
66 self.stack_size = size;
67 self
68 }
69
70 #[must_use]
71 pub fn ping_interval(mut self, interval: usize) -> Self {
76 self.ping_interval = interval;
77 self
78 }
79
80 #[must_use]
81 pub fn thread_pool_limit(mut self, value: usize) -> Self {
84 self.pool_limit = value;
85 self
86 }
87
88 #[must_use]
89 pub fn thread_pool_unbounded(mut self) -> Self {
91 self.pool_bounded = false;
92 self
93 }
94
95 #[must_use]
96 pub fn testing(mut self) -> Self {
98 self.testing = true;
99 self
100 }
101
102 #[must_use]
103 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
106 where
107 time::Duration: From<T>,
108 {
109 self.pool_recv_timeout = timeout.into();
110 self
111 }
112
113 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
117 let config = SystemConfig {
118 name: self.name.clone(),
119 testing: self.testing,
120 stack_size: self.stack_size,
121 stop_on_panic: self.stop_on_panic,
122 ping_interval: self.ping_interval,
123 pool_limit: self.pool_limit,
124 pool_bounded: self.pool_bounded,
125 pool_recv_timeout: self.pool_recv_timeout,
126 runner: Arc::new(runner),
127 };
128 self.build_with(config)
129 }
130
131 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
135 let runner = config.runner.clone();
136 let system = System::construct(config);
137
138 SystemRunner {
140 system,
141 runner,
142 _t: PhantomData,
143 }
144 }
145}
146
147#[must_use = "SystemRunner must be run"]
149pub struct SystemRunner {
150 system: System,
151 runner: Arc<dyn Runner>,
152 _t: PhantomData<Rc<()>>,
153}
154
155impl SystemRunner {
156 pub fn system(&self) -> System {
158 self.system.clone()
159 }
160
161 pub fn run_until_stop(self) -> io::Result<()> {
164 self.run(|| Ok(()))
165 }
166
167 pub fn run<F>(self, f: F) -> io::Result<()>
170 where
171 F: FnOnce() -> io::Result<()> + 'static,
172 {
173 log::info!("Starting {:?} system", self.system.name());
174
175 let SystemRunner {
176 mut system, runner, ..
177 } = self;
178
179 crate::driver::block_on(runner.as_ref(), async move {
181 let stop = system.start();
182
183 f()?;
184
185 match stop.await {
186 Ok(code) => {
187 if code != 0 {
188 Err(io::Error::other(format!("Non-zero exit code: {code}")))
189 } else {
190 Ok(())
191 }
192 }
193 Err(_) => Err(io::Error::other("Closed")),
194 }
195 })
196 }
197
198 pub fn block_on<F, R>(self, fut: F) -> R
200 where
201 F: Future<Output = R> + 'static,
202 R: 'static,
203 {
204 let SystemRunner {
205 mut system, runner, ..
206 } = self;
207
208 crate::driver::block_on(runner.as_ref(), async move {
209 let stop = system.start();
210 drop(stop);
211
212 let loc = current_location();
213 ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
214 fut.await
215 })
216 }
217
218 #[cfg(feature = "tokio")]
219 pub async fn run_local<F, R>(self, fut: F) -> R
221 where
222 F: Future<Output = R> + 'static,
223 R: 'static,
224 {
225 let SystemRunner { mut system, .. } = self;
226
227 tok_io::task::LocalSet::new()
229 .run_until(async move {
230 let stop = system.start();
231 drop(stop);
232
233 let loc = current_location();
234 ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
235 fut.await
236 })
237 .await
238 }
239}
240
241#[track_caller]
242pub(crate) fn current_location() -> &'static panic::Location<'static> {
243 panic::Location::caller()
244}
245
246impl fmt::Debug for SystemRunner {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 f.debug_struct("SystemRunner")
249 .field("system", &self.system)
250 .finish()
251 }
252}