1use std::{fmt, future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7pub struct Builder {
13 name: String,
15 stop_on_panic: bool,
17 stack_size: usize,
19 ping_interval: usize,
21 pool_limit: usize,
23 pool_recv_timeout: time::Duration,
24 testing: bool,
26}
27
28impl Builder {
29 pub(super) fn new() -> Self {
30 Builder {
31 name: "ntex".into(),
32 stop_on_panic: false,
33 stack_size: 0,
34 ping_interval: 1000,
35 testing: false,
36 pool_limit: 256,
37 pool_recv_timeout: time::Duration::from_secs(60),
38 }
39 }
40
41 #[must_use]
42 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
44 self.name = name.as_ref().into();
45 self
46 }
47
48 #[must_use]
49 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
56 self.stop_on_panic = stop_on_panic;
57 self
58 }
59
60 #[must_use]
61 pub fn stack_size(mut self, size: usize) -> Self {
63 self.stack_size = size;
64 self
65 }
66
67 #[must_use]
68 pub fn ping_interval(mut self, interval: usize) -> Self {
73 self.ping_interval = interval;
74 self
75 }
76
77 #[must_use]
78 pub fn thread_pool_limit(mut self, value: usize) -> Self {
81 self.pool_limit = value;
82 self
83 }
84
85 #[must_use]
86 pub fn testing(mut self) -> Self {
88 self.testing = true;
89 self
90 }
91
92 #[must_use]
93 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
96 where
97 time::Duration: From<T>,
98 {
99 self.pool_recv_timeout = timeout.into();
100 self
101 }
102
103 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
107 let config = SystemConfig {
108 name: self.name.clone(),
109 testing: self.testing,
110 stack_size: self.stack_size,
111 stop_on_panic: self.stop_on_panic,
112 ping_interval: self.ping_interval,
113 pool_limit: self.pool_limit,
114 pool_recv_timeout: self.pool_recv_timeout,
115 runner: Arc::new(runner),
116 };
117 self.build_with(config)
118 }
119
120 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
124 let runner = config.runner.clone();
125 let system = System::construct(config);
126
127 SystemRunner {
129 system,
130 runner,
131 _t: PhantomData,
132 }
133 }
134}
135
136#[must_use = "SystemRunner must be run"]
138pub struct SystemRunner {
139 system: System,
140 runner: Arc<dyn Runner>,
141 _t: PhantomData<Rc<()>>,
142}
143
144impl SystemRunner {
145 pub fn system(&self) -> System {
147 self.system.clone()
148 }
149
150 pub fn run_until_stop(self) -> io::Result<()> {
153 self.run(|| Ok(()))
154 }
155
156 pub fn run<F>(self, f: F) -> io::Result<()>
159 where
160 F: FnOnce() -> io::Result<()> + 'static,
161 {
162 log::info!("Starting {:?} system", self.system.name());
163
164 let SystemRunner {
165 mut system, runner, ..
166 } = self;
167
168 crate::driver::block_on(runner.as_ref(), async move {
170 let stop = system.start();
171
172 f()?;
173
174 match stop.await {
175 Ok(code) => {
176 if code != 0 {
177 Err(io::Error::other(format!("Non-zero exit code: {code}")))
178 } else {
179 Ok(())
180 }
181 }
182 Err(_) => Err(io::Error::other("Closed")),
183 }
184 })
185 }
186
187 pub fn block_on<F, R>(self, fut: F) -> R
189 where
190 F: Future<Output = R> + 'static,
191 R: 'static,
192 {
193 let SystemRunner {
194 mut system, runner, ..
195 } = self;
196
197 crate::driver::block_on(runner.as_ref(), async move {
198 let stop = system.start();
199 drop(stop);
200
201 fut.await
202 })
203 }
204
205 #[cfg(feature = "tokio")]
206 pub async fn run_local<F, R>(self, fut: F) -> R
208 where
209 F: Future<Output = R> + 'static,
210 R: 'static,
211 {
212 let SystemRunner { mut system, .. } = self;
213
214 tok_io::task::LocalSet::new()
216 .run_until(async move {
217 let stop = system.start();
218 drop(stop);
219
220 fut.await
221 })
222 .await
223 }
224}
225
226impl fmt::Debug for SystemRunner {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 f.debug_struct("SystemRunner")
229 .field("system", &self.system)
230 .finish()
231 }
232}