1use std::any::{Any, TypeId};
2use std::collections::{HashMap, VecDeque};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::Cell, cell::RefCell, fmt, future::Future, panic, pin::Pin};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::RwLock;
10
11use crate::pool::ThreadPool;
12use crate::{Arbiter, BlockingResult, Builder, Handle, Runner, SystemRunner};
13
14static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
15
16thread_local!(
17 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
18 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19 RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24 all: HashMap<Id, Arbiter>,
25 list: Vec<Arbiter>,
26}
27
28#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32pub struct System {
34 id: usize,
35 arbiter: Cell<Option<Arbiter>>,
36 config: SystemConfig,
37 sender: Sender<SystemCommand>,
38 receiver: Receiver<SystemCommand>,
39 rt: Arc<RwLock<Arbiter>>,
40 storage: Arc<RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>>,
41 pool: ThreadPool,
42}
43
44#[derive(Clone)]
45pub struct SystemConfig {
46 pub(super) name: String,
47 pub(super) stack_size: usize,
48 pub(super) stop_on_panic: bool,
49 pub(super) ping_interval: usize,
50 pub(super) pool_limit: usize,
51 pub(super) pool_recv_timeout: Duration,
52 pub(super) testing: bool,
53 pub(super) runner: Arc<dyn Runner>,
54}
55
56thread_local!(
57 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
58);
59
60impl Clone for System {
61 fn clone(&self) -> Self {
62 Self {
63 id: self.id,
64 arbiter: Cell::new(None),
65 config: self.config.clone(),
66 sender: self.sender.clone(),
67 receiver: self.receiver.clone(),
68 rt: self.rt.clone(),
69 storage: self.storage.clone(),
70 pool: self.pool.clone(),
71 }
72 }
73}
74
75impl System {
76 pub(super) fn construct(config: SystemConfig) -> Self {
78 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
79 let (sender, receiver) = unbounded();
80
81 let pool =
82 ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
83
84 System {
85 id,
86 config,
87 sender,
88 receiver,
89 pool,
90 rt: Arc::new(RwLock::new(Arbiter::dummy())),
91 storage: Arc::new(RwLock::new(HashMap::default())),
92 arbiter: Cell::new(None),
93 }
94 }
95
96 pub(super) fn start(&mut self) -> oneshot::Receiver<i32> {
98 let (stop_tx, stop) = oneshot::channel();
99 let (arb, controller) = Arbiter::new_system(self.clone(), self.config.name.clone());
100
101 self.arbiter.set(Some(arb.clone()));
102 *self.rt.write() = arb.clone();
103 System::set_current(self.clone());
104
105 crate::spawn(SystemSupport::new(self, stop_tx).run(arb.id(), arb));
107 crate::spawn(controller.run());
108
109 stop
110 }
111
112 pub fn build() -> Builder {
117 Builder::new()
118 }
119
120 #[allow(clippy::new_ret_no_self)]
121 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
125 Self::build().name(name).build(runner)
126 }
127
128 #[allow(clippy::new_ret_no_self)]
129 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
133 Self::build().name(name).build_with(config)
134 }
135
136 pub fn current() -> System {
142 CURRENT.with(|cell| match *cell.borrow() {
143 Some(ref sys) => sys.clone(),
144 None => panic!("System is not running"),
145 })
146 }
147
148 #[doc(hidden)]
150 pub fn set_current(sys: System) {
151 sys.arbiter().set_current();
152 CURRENT.with(|s| {
153 *s.borrow_mut() = Some(sys);
154 });
155 }
156
157 pub(super) fn remove_current() {
158 CURRENT.with(|cell| {
159 cell.borrow_mut().take();
160 });
161 }
162
163 pub fn id(&self) -> Id {
165 Id(self.id)
166 }
167
168 pub fn name(&self) -> &str {
170 &self.config.name
171 }
172
173 pub fn stop(&self) {
175 self.stop_with_code(0);
176 }
177
178 pub fn stop_with_code(&self, code: i32) {
180 let _ = self.sender.try_send(SystemCommand::Exit(code));
181 }
182
183 pub fn stop_on_panic(&self) -> bool {
188 self.config.stop_on_panic
189 }
190
191 pub fn arbiter(&self) -> Arbiter {
197 if let Some(arb) = self.arbiter.take() {
198 self.arbiter.set(Some(arb.clone()));
199 if arb.hnd.is_some() {
200 return arb;
201 }
202 }
203
204 let arb = self.rt.read().clone();
205 self.arbiter.set(Some(arb.clone()));
206 arb
207 }
208
209 pub fn list_arbiters<F, R>(f: F) -> R
214 where
215 F: FnOnce(&[Arbiter]) -> R,
216 {
217 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
218 }
219
220 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
225 where
226 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
227 {
228 PINGS.with(|pings| {
229 if let Some(recs) = pings.borrow().get(&id) {
230 f(Some(recs))
231 } else {
232 f(None)
233 }
234 })
235 }
236
237 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
238 &self.sender
239 }
240
241 pub fn config(&self) -> SystemConfig {
243 self.config.clone()
244 }
245
246 #[inline]
247 pub fn handle(&self) -> Handle {
249 self.arbiter().handle().clone()
250 }
251
252 pub fn testing(&self) -> bool {
254 self.config.testing()
255 }
256
257 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
261 where
262 F: FnOnce() -> R + Send + 'static,
263 R: Send + 'static,
264 {
265 self.pool.execute(f)
266 }
267
268 pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
273 where
274 T: Clone + Send + Sync + 'static,
275 {
276 if let Some(boxed) = self.storage.read().get(&TypeId::of::<T>())
277 && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
278 {
279 val.clone()
280 } else {
281 let val = f();
282 self.storage
283 .write()
284 .insert(TypeId::of::<T>(), Box::new(val.clone()));
285 val
286 }
287 }
288}
289
290impl SystemConfig {
291 #[inline]
292 pub fn testing(&self) -> bool {
294 self.testing
295 }
296}
297
298impl fmt::Debug for System {
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300 f.debug_struct("System")
301 .field("id", &self.id)
302 .field("config", &self.config)
303 .field("pool", &self.pool)
304 .finish()
305 }
306}
307
308impl fmt::Debug for SystemConfig {
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 f.debug_struct("SystemConfig")
311 .field("name", &self.name)
312 .field("testing", &self.testing)
313 .field("stack_size", &self.stack_size)
314 .field("stop_on_panic", &self.stop_on_panic)
315 .finish()
316 }
317}
318
319#[derive(Debug)]
320pub(super) enum SystemCommand {
321 Exit(i32),
322 RegisterArbiter(Id, Arbiter),
323 UnregisterArbiter(Id),
324}
325
326#[derive(Debug)]
327struct SystemSupport {
328 stop: Option<oneshot::Sender<i32>>,
329 commands: Receiver<SystemCommand>,
330 ping_interval: Duration,
331}
332
333impl SystemSupport {
334 fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
335 Self {
336 stop: Some(stop),
337 commands: sys.receiver.clone(),
338 ping_interval: Duration::from_millis(sys.config.ping_interval as u64),
339 }
340 }
341
342 async fn run(mut self, id: Id, arb: Arbiter) {
343 ARBITERS.with(move |arbs| {
344 let mut arbiters = arbs.borrow_mut();
345 arbiters.all.clear();
346 arbiters.list.clear();
347
348 arbiters.all.insert(id, arb.clone());
350 arbiters.list.push(arb.clone());
351 crate::spawn(ping_arbiter(arb, self.ping_interval));
352 });
353
354 loop {
355 match self.commands.recv().await {
356 Ok(SystemCommand::Exit(code)) => {
357 log::debug!("Stopping system with {code} code");
358
359 ARBITERS.with(move |arbs| {
361 let mut arbiters = arbs.borrow_mut();
362 for arb in arbiters.list.drain(..) {
363 arb.stop();
364 }
365 arbiters.all.clear();
366 });
367
368 if let Some(stop) = self.stop.take() {
370 let _ = stop.send(code);
371 }
372 }
373 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
374 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
375 ARBITERS.with(move |arbs| {
376 let mut arbiters = arbs.borrow_mut();
377 arbiters.all.insert(id, hnd.clone());
378 arbiters.list.push(hnd);
379 });
380 }
381 Ok(SystemCommand::UnregisterArbiter(id)) => {
382 ARBITERS.with(move |arbs| {
383 let mut arbiters = arbs.borrow_mut();
384 if let Some(hnd) = arbiters.all.remove(&id) {
385 for (idx, arb) in arbiters.list.iter().enumerate() {
386 if &hnd == arb {
387 arbiters.list.remove(idx);
388 break;
389 }
390 }
391 }
392 });
393 }
394 Err(_) => {
395 log::debug!("System stopped");
396 return;
397 }
398 }
399 }
400 }
401}
402
403#[derive(Copy, Clone, Debug)]
404pub struct PingRecord {
405 pub start: Instant,
407 pub rtt: Option<Duration>,
409}
410
411async fn ping_arbiter(arb: Arbiter, interval: Duration) {
412 loop {
413 Delay::new(interval).await;
414
415 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
417
418 if !is_alive {
419 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
420 break;
421 }
422
423 let start = Instant::now();
425 PINGS.with(|pings| {
426 let mut p = pings.borrow_mut();
427 let recs = p.entry(arb.id()).or_default();
428 recs.push_front(PingRecord { start, rtt: None });
429 recs.truncate(10);
430 });
431
432 let result = arb
433 .handle()
434 .spawn(async {
435 yield_to().await;
436 })
437 .await;
438
439 if result.is_err() {
440 break;
441 }
442
443 PINGS.with(|pings| {
444 pings
445 .borrow_mut()
446 .get_mut(&arb.id())
447 .unwrap()
448 .front_mut()
449 .unwrap()
450 .rtt = Some(start.elapsed());
451 });
452 }
453}
454
455async fn yield_to() {
456 use std::task::{Context, Poll};
457
458 struct Yield {
459 completed: bool,
460 }
461
462 impl Future for Yield {
463 type Output = ();
464
465 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
466 if self.completed {
467 return Poll::Ready(());
468 }
469 self.completed = true;
470 cx.waker().wake_by_ref();
471 Poll::Pending
472 }
473 }
474
475 Yield { completed: false }.await;
476}
477
478pub(super) trait FnExec: Send + 'static {
479 fn call_box(self: Box<Self>);
480}
481
482impl<F> FnExec for F
483where
484 F: FnOnce() + Send + 'static,
485{
486 #[allow(clippy::boxed_local)]
487 fn call_box(self: Box<Self>) {
488 (*self)();
489 }
490}