1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::RefCell, fmt, future::Future, panic, pin::Pin, rc::Rc};
5
6use async_channel::{Receiver, Sender};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17 RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22 all: HashMap<Id, Arbiter>,
23 list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
27pub struct Id(pub(crate) usize);
28
29#[derive(Clone, Debug)]
31pub struct System {
32 id: usize,
33 sys: Sender<SystemCommand>,
34 arbiter: Arbiter,
35 config: SystemConfig,
36 pool: ThreadPool,
37}
38
39#[derive(Clone)]
40pub struct SystemConfig {
41 pub(super) name: String,
42 pub(super) runner: Arc<dyn Runner>,
43 pub(super) stack_size: usize,
44 pub(super) stop_on_panic: bool,
45 pub(super) testing: bool,
46}
47
48thread_local!(
49 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
50);
51
52impl System {
53 pub(super) fn construct(
55 sys: Sender<SystemCommand>,
56 mut arbiter: Arbiter,
57 config: SystemConfig,
58 pool: ThreadPool,
59 ) -> Self {
60 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
61 arbiter.sys_id = id;
62
63 let sys = System {
64 id,
65 sys,
66 arbiter,
67 config,
68 pool,
69 };
70 System::set_current(sys.clone());
71 sys
72 }
73
74 pub fn build() -> Builder {
79 Builder::new()
80 }
81
82 #[allow(clippy::new_ret_no_self)]
83 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
87 Self::build().name(name).build(runner)
88 }
89
90 #[allow(clippy::new_ret_no_self)]
91 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
95 Self::build().name(name).build_with(config)
96 }
97
98 pub fn current() -> System {
100 CURRENT.with(|cell| match *cell.borrow() {
101 Some(ref sys) => sys.clone(),
102 None => panic!("System is not running"),
103 })
104 }
105
106 #[doc(hidden)]
108 pub fn set_current(sys: System) {
109 CURRENT.with(|s| {
110 *s.borrow_mut() = Some(sys);
111 })
112 }
113
114 pub fn id(&self) -> Id {
116 Id(self.id)
117 }
118
119 pub fn name(&self) -> &str {
121 &self.config.name
122 }
123
124 pub fn stop(&self) {
126 self.stop_with_code(0)
127 }
128
129 pub fn stop_with_code(&self, code: i32) {
131 let _ = self.sys.try_send(SystemCommand::Exit(code));
132 }
133
134 pub fn stop_on_panic(&self) -> bool {
137 self.config.stop_on_panic
138 }
139
140 pub fn arbiter(&self) -> &Arbiter {
142 &self.arbiter
143 }
144
145 pub fn list_arbiters<F, R>(f: F) -> R
150 where
151 F: FnOnce(&[Arbiter]) -> R,
152 {
153 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
154 }
155
156 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
161 where
162 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
163 {
164 PINGS.with(|pings| {
165 if let Some(recs) = pings.borrow().get(&id) {
166 f(Some(recs))
167 } else {
168 f(None)
169 }
170 })
171 }
172
173 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
174 &self.sys
175 }
176
177 pub fn config(&self) -> SystemConfig {
179 self.config.clone()
180 }
181
182 pub fn testing(&self) -> bool {
184 self.config.testing()
185 }
186
187 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
191 where
192 F: FnOnce() -> R + Send + 'static,
193 R: Send + 'static,
194 {
195 self.pool.dispatch(f)
196 }
197}
198
199impl SystemConfig {
200 #[inline]
201 pub fn testing(&self) -> bool {
203 self.testing
204 }
205
206 #[inline]
208 pub(super) fn block_on<F, R>(&self, fut: F) -> R
209 where
210 F: Future<Output = R> + 'static,
211 R: 'static,
212 {
213 let result = Rc::new(RefCell::new(None));
215 let result_inner = result.clone();
216
217 self.runner.block_on(Box::pin(async move {
218 let r = fut.await;
219 *result_inner.borrow_mut() = Some(r);
220 }));
221 result.borrow_mut().take().unwrap()
222 }
223}
224
225impl fmt::Debug for SystemConfig {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.debug_struct("SystemConfig")
228 .field("testing", &self.testing)
229 .field("stack_size", &self.stack_size)
230 .field("stop_on_panic", &self.stop_on_panic)
231 .finish()
232 }
233}
234
235#[derive(Debug)]
236pub(super) enum SystemCommand {
237 Exit(i32),
238 RegisterArbiter(Id, Arbiter),
239 UnregisterArbiter(Id),
240}
241
242pub(super) struct SystemSupport {
243 stop: Option<oneshot::Sender<i32>>,
244 commands: Receiver<SystemCommand>,
245 ping_interval: Duration,
246}
247
248impl SystemSupport {
249 pub(super) fn new(
250 stop: oneshot::Sender<i32>,
251 commands: Receiver<SystemCommand>,
252 ping_interval: usize,
253 ) -> Self {
254 Self {
255 commands,
256 stop: Some(stop),
257 ping_interval: Duration::from_millis(ping_interval as u64),
258 }
259 }
260
261 pub(super) async fn run(mut self) {
262 ARBITERS.with(move |arbs| {
263 let mut arbiters = arbs.borrow_mut();
264 arbiters.all.clear();
265 arbiters.list.clear();
266 });
267
268 loop {
269 match self.commands.recv().await {
270 Ok(SystemCommand::Exit(code)) => {
271 log::debug!("Stopping system with {code} code");
272
273 ARBITERS.with(move |arbs| {
275 let mut arbiters = arbs.borrow_mut();
276 for arb in arbiters.list.drain(..) {
277 arb.stop();
278 }
279 arbiters.all.clear();
280 });
281
282 if let Some(stop) = self.stop.take() {
284 let _ = stop.send(code);
285 }
286 }
287 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
288 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
289 ARBITERS.with(move |arbs| {
290 let mut arbiters = arbs.borrow_mut();
291 arbiters.all.insert(id, hnd.clone());
292 arbiters.list.push(hnd);
293 });
294 }
295 Ok(SystemCommand::UnregisterArbiter(id)) => {
296 ARBITERS.with(move |arbs| {
297 let mut arbiters = arbs.borrow_mut();
298 if let Some(hnd) = arbiters.all.remove(&id) {
299 for (idx, arb) in arbiters.list.iter().enumerate() {
300 if &hnd == arb {
301 arbiters.list.remove(idx);
302 break;
303 }
304 }
305 }
306 });
307 }
308 Err(_) => {
309 log::debug!("System stopped");
310 return;
311 }
312 }
313 }
314 }
315}
316
317#[derive(Copy, Clone, Debug)]
318pub struct PingRecord {
319 pub start: Instant,
321 pub rtt: Option<Duration>,
323}
324
325async fn ping_arbiter(arb: Arbiter, interval: Duration) {
326 loop {
327 Delay::new(interval).await;
328
329 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
331
332 if !is_alive {
333 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
334 break;
335 }
336
337 let start = Instant::now();
339 PINGS.with(|pings| {
340 let mut p = pings.borrow_mut();
341 let recs = p.entry(arb.id()).or_default();
342 recs.push_front(PingRecord { start, rtt: None });
343 recs.truncate(10);
344 });
345
346 let result = arb
347 .spawn_with(|| async {
348 yield_to().await;
349 })
350 .await;
351
352 if result.is_err() {
353 break;
354 }
355
356 PINGS.with(|pings| {
357 pings
358 .borrow_mut()
359 .get_mut(&arb.id())
360 .unwrap()
361 .front_mut()
362 .unwrap()
363 .rtt = Some(Instant::now() - start);
364 });
365 }
366}
367
368async fn yield_to() {
369 use std::task::{Context, Poll};
370
371 struct Yield {
372 completed: bool,
373 }
374
375 impl Future for Yield {
376 type Output = ();
377
378 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
379 if self.completed {
380 return Poll::Ready(());
381 }
382 self.completed = true;
383 cx.waker().wake_by_ref();
384 Poll::Pending
385 }
386 }
387
388 Yield { completed: false }.await;
389}
390
391pub(super) trait FnExec: Send + 'static {
392 fn call_box(self: Box<Self>);
393}
394
395impl<F> FnExec for F
396where
397 F: FnOnce() + Send + 'static,
398{
399 #[allow(clippy::boxed_local)]
400 fn call_box(self: Box<Self>) {
401 (*self)()
402 }
403}