1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17 RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22 all: HashMap<Id, Arbiter>,
23 list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29#[derive(Clone, Debug)]
31pub struct System {
32 id: usize,
33 sys: Sender<SystemCommand>,
34 arbiter: Arbiter,
35 config: SystemConfig,
36 pool: ThreadPool,
37}
38
39#[derive(Clone)]
40pub struct SystemConfig {
41 pub(super) runner: Arc<dyn Runner>,
42 pub(super) stack_size: usize,
43 pub(super) stop_on_panic: bool,
44}
45
46thread_local!(
47 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
48);
49
50impl System {
51 pub(super) fn construct(
53 sys: Sender<SystemCommand>,
54 mut arbiter: Arbiter,
55 config: SystemConfig,
56 pool: ThreadPool,
57 ) -> Self {
58 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
59 arbiter.sys_id = id;
60
61 let sys = System {
62 id,
63 sys,
64 arbiter,
65 config,
66 pool,
67 };
68 System::set_current(sys.clone());
69 sys
70 }
71
72 pub fn build() -> Builder {
77 Builder::new()
78 }
79
80 #[allow(clippy::new_ret_no_self)]
81 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
85 Self::build().name(name).build(runner)
86 }
87
88 #[allow(clippy::new_ret_no_self)]
89 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
93 Self::build().name(name).build_with(config)
94 }
95
96 pub fn current() -> System {
98 CURRENT.with(|cell| match *cell.borrow() {
99 Some(ref sys) => sys.clone(),
100 None => panic!("System is not running"),
101 })
102 }
103
104 #[doc(hidden)]
106 pub fn set_current(sys: System) {
107 CURRENT.with(|s| {
108 *s.borrow_mut() = Some(sys);
109 })
110 }
111
112 pub fn id(&self) -> Id {
114 Id(self.id)
115 }
116
117 pub fn stop(&self) {
119 self.stop_with_code(0)
120 }
121
122 pub fn stop_with_code(&self, code: i32) {
124 let _ = self.sys.try_send(SystemCommand::Exit(code));
125 }
126
127 pub fn stop_on_panic(&self) -> bool {
130 self.config.stop_on_panic
131 }
132
133 pub fn arbiter(&self) -> &Arbiter {
135 &self.arbiter
136 }
137
138 pub fn list_arbiters<F, R>(f: F) -> R
143 where
144 F: FnOnce(&[Arbiter]) -> R,
145 {
146 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
147 }
148
149 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
154 where
155 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
156 {
157 PINGS.with(|pings| {
158 if let Some(recs) = pings.borrow().get(&id) {
159 f(Some(recs))
160 } else {
161 f(None)
162 }
163 })
164 }
165
166 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
167 &self.sys
168 }
169
170 pub fn config(&self) -> SystemConfig {
172 self.config.clone()
173 }
174
175 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
179 where
180 F: FnOnce() -> R + Send + 'static,
181 R: Send + 'static,
182 {
183 self.pool.dispatch(f)
184 }
185}
186
187impl SystemConfig {
188 #[inline]
190 pub(super) fn block_on<F, R>(&self, fut: F) -> R
191 where
192 F: Future<Output = R> + 'static,
193 R: 'static,
194 {
195 let result = Rc::new(RefCell::new(None));
197 let result_inner = result.clone();
198
199 self.runner.block_on(Box::pin(async move {
200 let r = fut.await;
201 *result_inner.borrow_mut() = Some(r);
202 }));
203 result.borrow_mut().take().unwrap()
204 }
205}
206
207impl fmt::Debug for SystemConfig {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 f.debug_struct("SystemConfig")
210 .field("stack_size", &self.stack_size)
211 .field("stop_on_panic", &self.stop_on_panic)
212 .finish()
213 }
214}
215
216#[derive(Debug)]
217pub(super) enum SystemCommand {
218 Exit(i32),
219 RegisterArbiter(Id, Arbiter),
220 UnregisterArbiter(Id),
221}
222
223pub(super) struct SystemSupport {
224 stop: Option<oneshot::Sender<i32>>,
225 commands: Receiver<SystemCommand>,
226 ping_interval: Duration,
227}
228
229impl SystemSupport {
230 pub(super) fn new(
231 stop: oneshot::Sender<i32>,
232 commands: Receiver<SystemCommand>,
233 ping_interval: usize,
234 ) -> Self {
235 Self {
236 commands,
237 stop: Some(stop),
238 ping_interval: Duration::from_millis(ping_interval as u64),
239 }
240 }
241
242 pub(super) async fn run(mut self) {
243 ARBITERS.with(move |arbs| {
244 let mut arbiters = arbs.borrow_mut();
245 arbiters.all.clear();
246 arbiters.list.clear();
247 });
248
249 loop {
250 match self.commands.recv().await {
251 Ok(SystemCommand::Exit(code)) => {
252 log::debug!("Stopping system with {code} code");
253
254 ARBITERS.with(move |arbs| {
256 let mut arbiters = arbs.borrow_mut();
257 for arb in arbiters.list.drain(..) {
258 arb.stop();
259 }
260 arbiters.all.clear();
261 });
262
263 if let Some(stop) = self.stop.take() {
265 let _ = stop.send(code);
266 }
267 }
268 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
269 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
270 ARBITERS.with(move |arbs| {
271 let mut arbiters = arbs.borrow_mut();
272 arbiters.all.insert(id, hnd.clone());
273 arbiters.list.push(hnd);
274 });
275 }
276 Ok(SystemCommand::UnregisterArbiter(id)) => {
277 ARBITERS.with(move |arbs| {
278 let mut arbiters = arbs.borrow_mut();
279 if let Some(hnd) = arbiters.all.remove(&id) {
280 for (idx, arb) in arbiters.list.iter().enumerate() {
281 if &hnd == arb {
282 arbiters.list.remove(idx);
283 break;
284 }
285 }
286 }
287 });
288 }
289 Err(_) => {
290 log::debug!("System stopped");
291 return;
292 }
293 }
294 }
295 }
296}
297
298#[derive(Copy, Clone, Debug)]
299pub struct PingRecord {
300 pub start: Instant,
302 pub rtt: Option<Duration>,
304}
305
306async fn ping_arbiter(arb: Arbiter, interval: Duration) {
307 loop {
308 Delay::new(interval).await;
309
310 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
312
313 if !is_alive {
314 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
315 break;
316 }
317
318 let start = Instant::now();
320 PINGS.with(|pings| {
321 let mut p = pings.borrow_mut();
322 let recs = p.entry(arb.id()).or_default();
323 recs.push_front(PingRecord { start, rtt: None });
324 recs.truncate(10);
325 });
326
327 let result = arb
328 .spawn_with(|| async {
329 yield_to().await;
330 })
331 .await;
332
333 if result.is_err() {
334 break;
335 }
336
337 PINGS.with(|pings| {
338 pings
339 .borrow_mut()
340 .get_mut(&arb.id())
341 .unwrap()
342 .front_mut()
343 .unwrap()
344 .rtt = Some(Instant::now() - start);
345 });
346 }
347}
348
349async fn yield_to() {
350 use std::task::{Context, Poll};
351
352 struct Yield {
353 completed: bool,
354 }
355
356 impl Future for Yield {
357 type Output = ();
358
359 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
360 if self.completed {
361 return Poll::Ready(());
362 }
363 self.completed = true;
364 cx.waker().wake_by_ref();
365 Poll::Pending
366 }
367 }
368
369 Yield { completed: false }.await;
370}
371
372pub(super) trait FnExec: Send + 'static {
373 fn call_box(self: Box<Self>);
374}
375
376impl<F> FnExec for F
377where
378 F: FnOnce() + Send + 'static,
379{
380 #[allow(clippy::boxed_local)]
381 fn call_box(self: Box<Self>) {
382 (*self)()
383 }
384}