1use std::collections::{HashMap, VecDeque};
2use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use super::arbiter::Arbiter;
10use super::builder::{Builder, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17 RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22 all: HashMap<Id, Arbiter>,
23 list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29#[derive(Clone, Debug)]
31pub struct System {
32 id: usize,
33 sys: Sender<SystemCommand>,
34 arbiter: Arbiter,
35 config: SystemConfig,
36}
37
38#[derive(Clone)]
39pub(super) struct SystemConfig {
40 pub(super) stack_size: usize,
41 pub(super) stop_on_panic: bool,
42 pub(super) block_on:
43 Option<Arc<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) + Sync + Send>>,
44}
45
46thread_local!(
47 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
48);
49
50impl System {
51 pub(super) fn construct(
53 sys: Sender<SystemCommand>,
54 mut arbiter: Arbiter,
55 config: SystemConfig,
56 ) -> Self {
57 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
58 arbiter.sys_id = id;
59
60 let sys = System {
61 id,
62 sys,
63 config,
64 arbiter,
65 };
66 System::set_current(sys.clone());
67 sys
68 }
69
70 pub fn build() -> Builder {
75 Builder::new()
76 }
77
78 #[allow(clippy::new_ret_no_self)]
79 pub fn new(name: &str) -> SystemRunner {
83 Self::build().name(name).finish()
84 }
85
86 pub fn current() -> System {
88 CURRENT.with(|cell| match *cell.borrow() {
89 Some(ref sys) => sys.clone(),
90 None => panic!("System is not running"),
91 })
92 }
93
94 #[doc(hidden)]
96 pub fn set_current(sys: System) {
97 CURRENT.with(|s| {
98 *s.borrow_mut() = Some(sys);
99 })
100 }
101
102 pub fn id(&self) -> Id {
104 Id(self.id)
105 }
106
107 pub fn stop(&self) {
109 self.stop_with_code(0)
110 }
111
112 pub fn stop_with_code(&self, code: i32) {
114 let _ = self.sys.try_send(SystemCommand::Exit(code));
115 }
116
117 pub fn stop_on_panic(&self) -> bool {
120 self.config.stop_on_panic
121 }
122
123 pub fn arbiter(&self) -> &Arbiter {
125 &self.arbiter
126 }
127
128 pub fn list_arbiters<F, R>(f: F) -> R
133 where
134 F: FnOnce(&[Arbiter]) -> R,
135 {
136 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
137 }
138
139 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
144 where
145 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
146 {
147 PINGS.with(|pings| {
148 if let Some(recs) = pings.borrow().get(&id) {
149 f(Some(recs))
150 } else {
151 f(None)
152 }
153 })
154 }
155
156 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
157 &self.sys
158 }
159
160 pub(super) fn config(&self) -> SystemConfig {
162 self.config.clone()
163 }
164}
165
166impl SystemConfig {
167 #[inline]
169 pub(super) fn block_on<F, R>(&self, fut: F) -> R
170 where
171 F: Future<Output = R> + 'static,
172 R: 'static,
173 {
174 let result = Rc::new(RefCell::new(None));
176 let result_inner = result.clone();
177
178 if let Some(block_on) = &self.block_on {
179 (*block_on)(Box::pin(async move {
180 let r = fut.await;
181 *result_inner.borrow_mut() = Some(r);
182 }));
183 } else {
184 crate::block_on(Box::pin(async move {
185 let r = fut.await;
186 *result_inner.borrow_mut() = Some(r);
187 }));
188 }
189 let res = result.borrow_mut().take().unwrap();
190 res
191 }
192}
193
194impl fmt::Debug for SystemConfig {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 f.debug_struct("SystemConfig")
197 .field("stack_size", &self.stack_size)
198 .field("stop_on_panic", &self.stop_on_panic)
199 .finish()
200 }
201}
202
203#[derive(Debug)]
204pub(super) enum SystemCommand {
205 Exit(i32),
206 RegisterArbiter(Id, Arbiter),
207 UnregisterArbiter(Id),
208}
209
210pub(super) struct SystemSupport {
211 stop: Option<oneshot::Sender<i32>>,
212 commands: Receiver<SystemCommand>,
213 ping_interval: Duration,
214}
215
216impl SystemSupport {
217 pub(super) fn new(
218 stop: oneshot::Sender<i32>,
219 commands: Receiver<SystemCommand>,
220 ping_interval: usize,
221 ) -> Self {
222 Self {
223 commands,
224 stop: Some(stop),
225 ping_interval: Duration::from_millis(ping_interval as u64),
226 }
227 }
228
229 pub(super) async fn run(mut self) {
230 ARBITERS.with(move |arbs| {
231 let mut arbiters = arbs.borrow_mut();
232 arbiters.all.clear();
233 arbiters.list.clear();
234 });
235
236 loop {
237 match self.commands.recv().await {
238 Ok(SystemCommand::Exit(code)) => {
239 log::debug!("Stopping system with {} code", code);
240
241 ARBITERS.with(move |arbs| {
243 let mut arbiters = arbs.borrow_mut();
244 for arb in arbiters.list.drain(..) {
245 arb.stop();
246 }
247 arbiters.all.clear();
248 });
249
250 if let Some(stop) = self.stop.take() {
252 let _ = stop.send(code);
253 }
254 }
255 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
256 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
257 ARBITERS.with(move |arbs| {
258 let mut arbiters = arbs.borrow_mut();
259 arbiters.all.insert(id, hnd.clone());
260 arbiters.list.push(hnd);
261 });
262 }
263 Ok(SystemCommand::UnregisterArbiter(id)) => {
264 ARBITERS.with(move |arbs| {
265 let mut arbiters = arbs.borrow_mut();
266 if let Some(hnd) = arbiters.all.remove(&id) {
267 for (idx, arb) in arbiters.list.iter().enumerate() {
268 if &hnd == arb {
269 arbiters.list.remove(idx);
270 break;
271 }
272 }
273 }
274 });
275 }
276 Err(_) => {
277 log::debug!("System stopped");
278 return;
279 }
280 }
281 }
282 }
283}
284
285#[derive(Copy, Clone, Debug)]
286pub struct PingRecord {
287 pub start: Instant,
289 pub rtt: Option<Duration>,
291}
292
293async fn ping_arbiter(arb: Arbiter, interval: Duration) {
294 loop {
295 Delay::new(interval).await;
296
297 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
299
300 if !is_alive {
301 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
302 break;
303 }
304
305 let start = Instant::now();
307 PINGS.with(|pings| {
308 let mut p = pings.borrow_mut();
309 let recs = p.entry(arb.id()).or_default();
310 recs.push_front(PingRecord { start, rtt: None });
311 recs.truncate(10);
312 });
313
314 let result = arb
315 .spawn_with(|| async {
316 yield_to().await;
317 })
318 .await;
319
320 if result.is_err() {
321 break;
322 }
323
324 PINGS.with(|pings| {
325 pings
326 .borrow_mut()
327 .get_mut(&arb.id())
328 .unwrap()
329 .front_mut()
330 .unwrap()
331 .rtt = Some(Instant::now() - start);
332 });
333 }
334}
335
336async fn yield_to() {
337 use std::task::{Context, Poll};
338
339 struct Yield {
340 completed: bool,
341 }
342
343 impl Future for Yield {
344 type Output = ();
345
346 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
347 if self.completed {
348 return Poll::Ready(());
349 }
350 self.completed = true;
351 cx.waker().wake_by_ref();
352 Poll::Pending
353 }
354 }
355
356 Yield { completed: false }.await;
357}
358
359pub(super) trait FnExec: Send + 'static {
360 fn call_box(self: Box<Self>);
361}
362
363impl<F> FnExec for F
364where
365 F: FnOnce() + Send + 'static,
366{
367 #[allow(clippy::boxed_local)]
368 fn call_box(self: Box<Self>) {
369 (*self)()
370 }
371}