1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17 RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22 all: HashMap<Id, Arbiter>,
23 list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29#[derive(Clone, Debug)]
31pub struct System {
32 id: usize,
33 sys: Sender<SystemCommand>,
34 arbiter: Arbiter,
35 config: SystemConfig,
36 pool: ThreadPool,
37}
38
39#[derive(Clone)]
40pub struct SystemConfig {
41 pub(super) name: String,
42 pub(super) runner: Arc<dyn Runner>,
43 pub(super) stack_size: usize,
44 pub(super) stop_on_panic: bool,
45 pub(super) testing: bool,
46}
47
48thread_local!(
49 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
50);
51
52impl System {
53 pub(super) fn construct(
55 sys: Sender<SystemCommand>,
56 mut arbiter: Arbiter,
57 config: SystemConfig,
58 pool: ThreadPool,
59 ) -> Self {
60 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
61 arbiter.sys_id = id;
62
63 let sys = System {
64 id,
65 sys,
66 arbiter,
67 config,
68 pool,
69 };
70 System::set_current(sys.clone());
71 sys
72 }
73
74 pub fn build() -> Builder {
79 Builder::new()
80 }
81
82 #[allow(clippy::new_ret_no_self)]
83 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
87 Self::build().name(name).build(runner)
88 }
89
90 #[allow(clippy::new_ret_no_self)]
91 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
95 Self::build().name(name).build_with(config)
96 }
97
98 pub fn current() -> System {
104 CURRENT.with(|cell| match *cell.borrow() {
105 Some(ref sys) => sys.clone(),
106 None => panic!("System is not running"),
107 })
108 }
109
110 #[doc(hidden)]
112 pub fn set_current(sys: System) {
113 CURRENT.with(|s| {
114 *s.borrow_mut() = Some(sys);
115 });
116 }
117
118 pub fn id(&self) -> Id {
120 Id(self.id)
121 }
122
123 pub fn name(&self) -> &str {
125 &self.config.name
126 }
127
128 pub fn stop(&self) {
130 self.stop_with_code(0);
131 }
132
133 pub fn stop_with_code(&self, code: i32) {
135 let _ = self.sys.try_send(SystemCommand::Exit(code));
136 }
137
138 pub fn stop_on_panic(&self) -> bool {
143 self.config.stop_on_panic
144 }
145
146 pub fn arbiter(&self) -> &Arbiter {
148 &self.arbiter
149 }
150
151 pub fn list_arbiters<F, R>(f: F) -> R
156 where
157 F: FnOnce(&[Arbiter]) -> R,
158 {
159 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
160 }
161
162 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
167 where
168 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
169 {
170 PINGS.with(|pings| {
171 if let Some(recs) = pings.borrow().get(&id) {
172 f(Some(recs))
173 } else {
174 f(None)
175 }
176 })
177 }
178
179 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
180 &self.sys
181 }
182
183 pub fn config(&self) -> SystemConfig {
185 self.config.clone()
186 }
187
188 pub fn testing(&self) -> bool {
190 self.config.testing()
191 }
192
193 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
197 where
198 F: FnOnce() -> R + Send + 'static,
199 R: Send + 'static,
200 {
201 self.pool.dispatch(f)
202 }
203}
204
205impl SystemConfig {
206 #[inline]
207 pub fn testing(&self) -> bool {
209 self.testing
210 }
211
212 #[inline]
214 pub(super) fn block_on<F, R>(&self, fut: F) -> R
215 where
216 F: Future<Output = R> + 'static,
217 R: 'static,
218 {
219 let result = Rc::new(RefCell::new(None));
221 let result_inner = result.clone();
222
223 self.runner.block_on(Box::pin(async move {
224 let r = fut.await;
225 *result_inner.borrow_mut() = Some(r);
226 }));
227 result.borrow_mut().take().unwrap()
228 }
229}
230
231impl fmt::Debug for SystemConfig {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 f.debug_struct("SystemConfig")
234 .field("name", &self.name)
235 .field("testing", &self.testing)
236 .field("stack_size", &self.stack_size)
237 .field("stop_on_panic", &self.stop_on_panic)
238 .finish()
239 }
240}
241
242#[derive(Debug)]
243pub(super) enum SystemCommand {
244 Exit(i32),
245 RegisterArbiter(Id, Arbiter),
246 UnregisterArbiter(Id),
247}
248
249#[derive(Debug)]
250pub(super) struct SystemSupport {
251 stop: Option<oneshot::Sender<i32>>,
252 commands: Receiver<SystemCommand>,
253 ping_interval: Duration,
254}
255
256impl SystemSupport {
257 pub(super) fn new(
258 stop: oneshot::Sender<i32>,
259 commands: Receiver<SystemCommand>,
260 ping_interval: usize,
261 ) -> Self {
262 Self {
263 commands,
264 stop: Some(stop),
265 ping_interval: Duration::from_millis(ping_interval as u64),
266 }
267 }
268
269 pub(super) async fn run(mut self) {
270 ARBITERS.with(move |arbs| {
271 let mut arbiters = arbs.borrow_mut();
272 arbiters.all.clear();
273 arbiters.list.clear();
274 });
275
276 loop {
277 match self.commands.recv().await {
278 Ok(SystemCommand::Exit(code)) => {
279 log::debug!("Stopping system with {code} code");
280
281 ARBITERS.with(move |arbs| {
283 let mut arbiters = arbs.borrow_mut();
284 for arb in arbiters.list.drain(..) {
285 arb.stop();
286 }
287 arbiters.all.clear();
288 });
289
290 if let Some(stop) = self.stop.take() {
292 let _ = stop.send(code);
293 }
294 }
295 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
296 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
297 ARBITERS.with(move |arbs| {
298 let mut arbiters = arbs.borrow_mut();
299 arbiters.all.insert(id, hnd.clone());
300 arbiters.list.push(hnd);
301 });
302 }
303 Ok(SystemCommand::UnregisterArbiter(id)) => {
304 ARBITERS.with(move |arbs| {
305 let mut arbiters = arbs.borrow_mut();
306 if let Some(hnd) = arbiters.all.remove(&id) {
307 for (idx, arb) in arbiters.list.iter().enumerate() {
308 if &hnd == arb {
309 arbiters.list.remove(idx);
310 break;
311 }
312 }
313 }
314 });
315 }
316 Err(_) => {
317 log::debug!("System stopped");
318 return;
319 }
320 }
321 }
322 }
323}
324
325#[derive(Copy, Clone, Debug)]
326pub struct PingRecord {
327 pub start: Instant,
329 pub rtt: Option<Duration>,
331}
332
333async fn ping_arbiter(arb: Arbiter, interval: Duration) {
334 loop {
335 Delay::new(interval).await;
336
337 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
339
340 if !is_alive {
341 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
342 break;
343 }
344
345 let start = Instant::now();
347 PINGS.with(|pings| {
348 let mut p = pings.borrow_mut();
349 let recs = p.entry(arb.id()).or_default();
350 recs.push_front(PingRecord { start, rtt: None });
351 recs.truncate(10);
352 });
353
354 let result = arb
355 .spawn_with(|| async {
356 yield_to().await;
357 })
358 .await;
359
360 if result.is_err() {
361 break;
362 }
363
364 PINGS.with(|pings| {
365 pings
366 .borrow_mut()
367 .get_mut(&arb.id())
368 .unwrap()
369 .front_mut()
370 .unwrap()
371 .rtt = Some(start.elapsed());
372 });
373 }
374}
375
376async fn yield_to() {
377 use std::task::{Context, Poll};
378
379 struct Yield {
380 completed: bool,
381 }
382
383 impl Future for Yield {
384 type Output = ();
385
386 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
387 if self.completed {
388 return Poll::Ready(());
389 }
390 self.completed = true;
391 cx.waker().wake_by_ref();
392 Poll::Pending
393 }
394 }
395
396 Yield { completed: false }.await;
397}
398
399pub(super) trait FnExec: Send + 'static {
400 fn call_box(self: Box<Self>);
401}
402
403impl<F> FnExec for F
404where
405 F: FnOnce() + Send + 'static,
406{
407 #[allow(clippy::boxed_local)]
408 fn call_box(self: Box<Self>) {
409 (*self)();
410 }
411}