1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, RwLock, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::Cell, cell::RefCell, fmt, future::Future, panic, pin::Pin};
5
6use async_channel::{Receiver, Sender, unbounded};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Handle, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17 RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22 all: HashMap<Id, Arbiter>,
23 list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
28pub struct Id(pub(crate) usize);
29
30pub struct System {
32 id: usize,
33 arbiter: Cell<Option<Arbiter>>,
34 config: SystemConfig,
35 sender: Sender<SystemCommand>,
36 receiver: Receiver<SystemCommand>,
37 rt: Arc<RwLock<Arbiter>>,
38 pool: ThreadPool,
39}
40
41#[derive(Clone)]
42pub struct SystemConfig {
43 pub(super) name: String,
44 pub(super) stack_size: usize,
45 pub(super) stop_on_panic: bool,
46 pub(super) ping_interval: usize,
47 pub(super) pool_limit: usize,
48 pub(super) pool_bounded: bool,
49 pub(super) pool_recv_timeout: Duration,
50 pub(super) testing: bool,
51 pub(super) runner: Arc<dyn Runner>,
52}
53
54thread_local!(
55 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
56);
57
58impl Clone for System {
59 fn clone(&self) -> Self {
60 Self {
61 id: self.id,
62 arbiter: Cell::new(None),
63 config: self.config.clone(),
64 sender: self.sender.clone(),
65 receiver: self.receiver.clone(),
66 rt: self.rt.clone(),
67 pool: self.pool.clone(),
68 }
69 }
70}
71
72impl System {
73 pub(super) fn construct(config: SystemConfig) -> Self {
75 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
76 let (sender, receiver) = unbounded();
77
78 let pool = ThreadPool::new(
79 &config.name,
80 config.pool_limit,
81 config.pool_recv_timeout,
82 config.pool_bounded,
83 );
84
85 System {
86 id,
87 config,
88 sender,
89 receiver,
90 pool,
91 rt: Arc::new(RwLock::new(Arbiter::dummy())),
92 arbiter: Cell::new(None),
93 }
94 }
95
96 pub(super) fn start(&mut self) -> oneshot::Receiver<i32> {
98 let (stop_tx, stop) = oneshot::channel();
99 let (arb, controller) = Arbiter::new_system(self.id, self.config.name.clone());
100
101 self.arbiter.set(Some(arb.clone()));
102 *self.rt.write().unwrap() = arb.clone();
103 System::set_current(self.clone());
104
105 crate::spawn(SystemSupport::new(self, stop_tx).run(arb.id(), arb));
107 crate::spawn(controller.run());
108
109 stop
110 }
111
112 pub fn build() -> Builder {
117 Builder::new()
118 }
119
120 #[allow(clippy::new_ret_no_self)]
121 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
125 Self::build().name(name).build(runner)
126 }
127
128 #[allow(clippy::new_ret_no_self)]
129 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
133 Self::build().name(name).build_with(config)
134 }
135
136 pub fn current() -> System {
142 CURRENT.with(|cell| match *cell.borrow() {
143 Some(ref sys) => sys.clone(),
144 None => panic!("System is not running"),
145 })
146 }
147
148 #[doc(hidden)]
150 pub fn set_current(sys: System) {
151 sys.arbiter().set_current();
152 CURRENT.with(|s| {
153 *s.borrow_mut() = Some(sys);
154 });
155 }
156
157 pub fn id(&self) -> Id {
159 Id(self.id)
160 }
161
162 pub fn name(&self) -> &str {
164 &self.config.name
165 }
166
167 pub fn stop(&self) {
169 self.stop_with_code(0);
170 }
171
172 pub fn stop_with_code(&self, code: i32) {
174 let _ = self.sender.try_send(SystemCommand::Exit(code));
175 }
176
177 pub fn stop_on_panic(&self) -> bool {
182 self.config.stop_on_panic
183 }
184
185 pub fn arbiter(&self) -> Arbiter {
191 if let Some(arb) = self.arbiter.take() {
192 self.arbiter.set(Some(arb.clone()));
193 if arb.hnd.is_some() {
194 return arb;
195 }
196 }
197
198 let arb = self.rt.read().unwrap().clone();
199 self.arbiter.set(Some(arb.clone()));
200 arb
201 }
202
203 pub fn list_arbiters<F, R>(f: F) -> R
208 where
209 F: FnOnce(&[Arbiter]) -> R,
210 {
211 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
212 }
213
214 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
219 where
220 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
221 {
222 PINGS.with(|pings| {
223 if let Some(recs) = pings.borrow().get(&id) {
224 f(Some(recs))
225 } else {
226 f(None)
227 }
228 })
229 }
230
231 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
232 &self.sender
233 }
234
235 pub fn config(&self) -> SystemConfig {
237 self.config.clone()
238 }
239
240 #[inline]
241 pub fn handle(&self) -> Handle {
243 self.arbiter().handle().clone()
244 }
245
246 pub fn testing(&self) -> bool {
248 self.config.testing()
249 }
250
251 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
255 where
256 F: FnOnce() -> R + Send + 'static,
257 R: Send + 'static,
258 {
259 self.pool.execute(f)
260 }
261}
262
263impl SystemConfig {
264 #[inline]
265 pub fn testing(&self) -> bool {
267 self.testing
268 }
269}
270
271impl fmt::Debug for System {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 f.debug_struct("System")
274 .field("id", &self.id)
275 .field("config", &self.config)
276 .field("pool", &self.pool)
277 .finish()
278 }
279}
280
281impl fmt::Debug for SystemConfig {
282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
283 f.debug_struct("SystemConfig")
284 .field("name", &self.name)
285 .field("testing", &self.testing)
286 .field("stack_size", &self.stack_size)
287 .field("stop_on_panic", &self.stop_on_panic)
288 .finish()
289 }
290}
291
292#[derive(Debug)]
293pub(super) enum SystemCommand {
294 Exit(i32),
295 RegisterArbiter(Id, Arbiter),
296 UnregisterArbiter(Id),
297}
298
299#[derive(Debug)]
300struct SystemSupport {
301 stop: Option<oneshot::Sender<i32>>,
302 commands: Receiver<SystemCommand>,
303 ping_interval: Duration,
304}
305
306impl SystemSupport {
307 fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
308 Self {
309 stop: Some(stop),
310 commands: sys.receiver.clone(),
311 ping_interval: Duration::from_millis(sys.config.ping_interval as u64),
312 }
313 }
314
315 async fn run(mut self, id: Id, arb: Arbiter) {
316 ARBITERS.with(move |arbs| {
317 let mut arbiters = arbs.borrow_mut();
318 arbiters.all.clear();
319 arbiters.list.clear();
320
321 arbiters.all.insert(id, arb.clone());
323 arbiters.list.push(arb.clone());
324 crate::spawn(ping_arbiter(arb, self.ping_interval));
325 });
326
327 loop {
328 match self.commands.recv().await {
329 Ok(SystemCommand::Exit(code)) => {
330 log::debug!("Stopping system with {code} code");
331
332 ARBITERS.with(move |arbs| {
334 let mut arbiters = arbs.borrow_mut();
335 for arb in arbiters.list.drain(..) {
336 arb.stop();
337 }
338 arbiters.all.clear();
339 });
340
341 if let Some(stop) = self.stop.take() {
343 let _ = stop.send(code);
344 }
345 }
346 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
347 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
348 ARBITERS.with(move |arbs| {
349 let mut arbiters = arbs.borrow_mut();
350 arbiters.all.insert(id, hnd.clone());
351 arbiters.list.push(hnd);
352 });
353 }
354 Ok(SystemCommand::UnregisterArbiter(id)) => {
355 ARBITERS.with(move |arbs| {
356 let mut arbiters = arbs.borrow_mut();
357 if let Some(hnd) = arbiters.all.remove(&id) {
358 for (idx, arb) in arbiters.list.iter().enumerate() {
359 if &hnd == arb {
360 arbiters.list.remove(idx);
361 break;
362 }
363 }
364 }
365 });
366 }
367 Err(_) => {
368 log::debug!("System stopped");
369 return;
370 }
371 }
372 }
373 }
374}
375
376#[derive(Copy, Clone, Debug)]
377pub struct PingRecord {
378 pub start: Instant,
380 pub rtt: Option<Duration>,
382}
383
384async fn ping_arbiter(arb: Arbiter, interval: Duration) {
385 loop {
386 Delay::new(interval).await;
387
388 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
390
391 if !is_alive {
392 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
393 break;
394 }
395
396 let start = Instant::now();
398 PINGS.with(|pings| {
399 let mut p = pings.borrow_mut();
400 let recs = p.entry(arb.id()).or_default();
401 recs.push_front(PingRecord { start, rtt: None });
402 recs.truncate(10);
403 });
404
405 let result = arb
406 .handle()
407 .spawn(async {
408 yield_to().await;
409 })
410 .await;
411
412 if result.is_err() {
413 break;
414 }
415
416 PINGS.with(|pings| {
417 pings
418 .borrow_mut()
419 .get_mut(&arb.id())
420 .unwrap()
421 .front_mut()
422 .unwrap()
423 .rtt = Some(start.elapsed());
424 });
425 }
426}
427
428async fn yield_to() {
429 use std::task::{Context, Poll};
430
431 struct Yield {
432 completed: bool,
433 }
434
435 impl Future for Yield {
436 type Output = ();
437
438 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
439 if self.completed {
440 return Poll::Ready(());
441 }
442 self.completed = true;
443 cx.waker().wake_by_ref();
444 Poll::Pending
445 }
446 }
447
448 Yield { completed: false }.await;
449}
450
451pub(super) trait FnExec: Send + 'static {
452 fn call_box(self: Box<Self>);
453}
454
455impl<F> FnExec for F
456where
457 F: FnOnce() + Send + 'static,
458{
459 #[allow(clippy::boxed_local)]
460 fn call_box(self: Box<Self>) {
461 (*self)();
462 }
463}