1use std::{future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::driver::Runner;
7use crate::pool::ThreadPool;
8use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
9
10#[derive(Debug, Clone)]
11pub struct Builder {
17 name: String,
19 stop_on_panic: bool,
21 stack_size: usize,
23 ping_interval: usize,
25 pool_limit: usize,
27 pool_recv_timeout: time::Duration,
28 testing: bool,
30}
31
32impl Builder {
33 pub(super) fn new() -> Self {
34 Builder {
35 name: "ntex".into(),
36 stop_on_panic: false,
37 stack_size: 0,
38 ping_interval: 1000,
39 testing: false,
40 pool_limit: 256,
41 pool_recv_timeout: time::Duration::from_secs(60),
42 }
43 }
44
45 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47 self.name = name.as_ref().into();
48 self
49 }
50
51 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
56 self.stop_on_panic = stop_on_panic;
57 self
58 }
59
60 pub fn stack_size(mut self, size: usize) -> Self {
62 self.stack_size = size;
63 self
64 }
65
66 pub fn ping_interval(mut self, interval: usize) -> Self {
71 self.ping_interval = interval;
72 self
73 }
74
75 pub fn thread_pool_limit(mut self, value: usize) -> Self {
78 self.pool_limit = value;
79 self
80 }
81
82 pub fn testing(mut self) -> Self {
84 self.testing = true;
85 self
86 }
87
88 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
91 where
92 time::Duration: From<T>,
93 {
94 self.pool_recv_timeout = timeout.into();
95 self
96 }
97
98 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
102 let name = self.name.clone();
103 let testing = self.testing;
104 let stack_size = self.stack_size;
105 let stop_on_panic = self.stop_on_panic;
106
107 self.build_with(SystemConfig {
108 name,
109 testing,
110 stack_size,
111 stop_on_panic,
112 runner: Arc::new(runner),
113 })
114 }
115
116 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
120 let (stop_tx, stop) = oneshot::channel();
121 let (sys_sender, sys_receiver) = unbounded();
122
123 let pool = ThreadPool::new(&self.name, self.pool_limit, self.pool_recv_timeout);
125
126 let (arb, controller) = Arbiter::new_system(self.name.clone());
127 let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
128 let system = System::construct(sys_sender, arb, config.clone(), pool);
129
130 let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
132
133 SystemRunner {
135 stop,
136 support,
137 controller,
138 system,
139 config,
140 _t: PhantomData,
141 }
142 }
143}
144
145#[must_use = "SystemRunner must be run"]
147pub struct SystemRunner {
148 stop: oneshot::Receiver<i32>,
149 support: SystemSupport,
150 controller: ArbiterController,
151 system: System,
152 config: SystemConfig,
153 _t: PhantomData<Rc<()>>,
154}
155
156impl SystemRunner {
157 pub fn system(&self) -> System {
159 self.system.clone()
160 }
161
162 pub fn run_until_stop(self) -> io::Result<()> {
165 self.run(|| Ok(()))
166 }
167
168 pub fn run<F>(self, f: F) -> io::Result<()>
171 where
172 F: FnOnce() -> io::Result<()> + 'static,
173 {
174 log::info!("Starting {:?} system", self.config.name);
175
176 let SystemRunner {
177 controller,
178 stop,
179 support,
180 config,
181 ..
182 } = self;
183
184 config.block_on(async move {
186 f()?;
187
188 let _ = crate::spawn(support.run());
189 let _ = crate::spawn(controller.run());
190 match stop.await {
191 Ok(code) => {
192 if code != 0 {
193 Err(io::Error::other(format!("Non-zero exit code: {code}")))
194 } else {
195 Ok(())
196 }
197 }
198 Err(_) => Err(io::Error::other("Closed")),
199 }
200 })
201 }
202
203 pub fn block_on<F, R>(self, fut: F) -> R
205 where
206 F: Future<Output = R> + 'static,
207 R: 'static,
208 {
209 let SystemRunner {
210 controller,
211 support,
212 config,
213 ..
214 } = self;
215
216 config.block_on(async move {
217 let _ = crate::spawn(support.run());
218 let _ = crate::spawn(controller.run());
219 fut.await
220 })
221 }
222
223 #[cfg(feature = "tokio")]
224 pub async fn run_local<F, R>(self, fut: F) -> R
226 where
227 F: Future<Output = R> + 'static,
228 R: 'static,
229 {
230 let SystemRunner {
231 controller,
232 support,
233 ..
234 } = self;
235
236 tok_io::task::LocalSet::new()
238 .run_until(async move {
239 let _ = crate::spawn(support.run());
240 let _ = crate::spawn(controller.run());
241 fut.await
242 })
243 .await
244 }
245}