1use std::{future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::driver::Runner;
7use crate::pool::ThreadPool;
8use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
9
10pub struct Builder {
16 name: String,
18 stop_on_panic: bool,
20 stack_size: usize,
22 ping_interval: usize,
24 pool_limit: usize,
26 pool_recv_timeout: time::Duration,
27}
28
29impl Builder {
30 pub(super) fn new() -> Self {
31 Builder {
32 name: "ntex".into(),
33 stop_on_panic: false,
34 stack_size: 0,
35 ping_interval: 1000,
36 pool_limit: 256,
37 pool_recv_timeout: time::Duration::from_secs(60),
38 }
39 }
40
41 pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
43 self.name = name.as_ref().into();
44 self
45 }
46
47 pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
52 self.stop_on_panic = stop_on_panic;
53 self
54 }
55
56 pub fn stack_size(mut self, size: usize) -> Self {
58 self.stack_size = size;
59 self
60 }
61
62 pub fn ping_interval(mut self, interval: usize) -> Self {
67 self.ping_interval = interval;
68 self
69 }
70
71 pub fn thread_pool_limit(mut self, value: usize) -> Self {
74 self.pool_limit = value;
75 self
76 }
77
78 pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
81 where
82 time::Duration: From<T>,
83 {
84 self.pool_recv_timeout = timeout.into();
85 self
86 }
87
88 pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
92 let (stop_tx, stop) = oneshot::channel();
93 let (sys_sender, sys_receiver) = unbounded();
94
95 let config = SystemConfig {
96 runner: Arc::new(runner),
97 stack_size: self.stack_size,
98 stop_on_panic: self.stop_on_panic,
99 };
100
101 let pool = ThreadPool::new(self.pool_limit, self.pool_recv_timeout);
103
104 let (arb, controller) = Arbiter::new_system(self.name.clone());
105 let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
106 let system = System::construct(sys_sender, arb, config.clone(), pool);
107
108 let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
110
111 SystemRunner {
113 stop,
114 support,
115 controller,
116 system,
117 config,
118 _t: PhantomData,
119 }
120 }
121
122 pub fn build_with(self, config: SystemConfig) -> SystemRunner {
126 let (stop_tx, stop) = oneshot::channel();
127 let (sys_sender, sys_receiver) = unbounded();
128
129 let pool = ThreadPool::new(self.pool_limit, self.pool_recv_timeout);
131
132 let (arb, controller) = Arbiter::new_system(self.name.clone());
133 let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
134 let system = System::construct(sys_sender, arb, config.clone(), pool);
135
136 let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
138
139 SystemRunner {
141 stop,
142 support,
143 controller,
144 system,
145 config,
146 _t: PhantomData,
147 }
148 }
149}
150
151#[must_use = "SystemRunner must be run"]
153pub struct SystemRunner {
154 stop: oneshot::Receiver<i32>,
155 support: SystemSupport,
156 controller: ArbiterController,
157 system: System,
158 config: SystemConfig,
159 _t: PhantomData<Rc<()>>,
160}
161
162impl SystemRunner {
163 pub fn system(&self) -> System {
165 self.system.clone()
166 }
167
168 pub fn run_until_stop(self) -> io::Result<()> {
171 self.run(|| Ok(()))
172 }
173
174 pub fn run<F>(self, f: F) -> io::Result<()>
177 where
178 F: FnOnce() -> io::Result<()> + 'static,
179 {
180 let SystemRunner {
181 controller,
182 stop,
183 support,
184 config,
185 ..
186 } = self;
187
188 config.block_on(async move {
190 f()?;
191
192 let _ = crate::spawn(support.run());
193 let _ = crate::spawn(controller.run());
194 match stop.await {
195 Ok(code) => {
196 if code != 0 {
197 Err(io::Error::other(format!("Non-zero exit code: {code}")))
198 } else {
199 Ok(())
200 }
201 }
202 Err(_) => Err(io::Error::other("Closed")),
203 }
204 })
205 }
206
207 pub fn block_on<F, R>(self, fut: F) -> R
209 where
210 F: Future<Output = R> + 'static,
211 R: 'static,
212 {
213 let SystemRunner {
214 controller,
215 support,
216 config,
217 ..
218 } = self;
219
220 config.block_on(async move {
221 let _ = crate::spawn(support.run());
222 let _ = crate::spawn(controller.run());
223 fut.await
224 })
225 }
226
227 #[cfg(feature = "tokio")]
228 pub async fn run_local<F, R>(self, fut: F) -> R
230 where
231 F: Future<Output = R> + 'static,
232 R: 'static,
233 {
234 let SystemRunner {
235 controller,
236 support,
237 ..
238 } = self;
239
240 tok_io::task::LocalSet::new()
242 .run_until(async move {
243 let _ = crate::spawn(support.run());
244 let _ = crate::spawn(controller.run());
245 fut.await
246 })
247 .await
248 }
249}