1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, RwLock, atomic::AtomicUsize, atomic::Ordering};
3use std::time::{Duration, Instant};
4use std::{cell::Cell, cell::RefCell, fmt, future::Future, panic, pin::Pin};
5
6use async_channel::{Receiver, Sender, unbounded};
7use futures_timer::Delay;
8
9use crate::pool::ThreadPool;
10use crate::{Arbiter, BlockingResult, Builder, Handle, Runner, SystemRunner};
11
12static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
13
14thread_local!(
15 static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
16 static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
17 RefCell::new(HashMap::default());
18);
19
20#[derive(Default)]
21struct Arbiters {
22 all: HashMap<Id, Arbiter>,
23 list: Vec<Arbiter>,
24}
25
26#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
28pub struct Id(pub(crate) usize);
29
30pub struct System {
32 id: usize,
33 arbiter: Cell<Option<Arbiter>>,
34 config: SystemConfig,
35 sender: Sender<SystemCommand>,
36 receiver: Receiver<SystemCommand>,
37 rt: Arc<RwLock<Arbiter>>,
38 pool: ThreadPool,
39}
40
41#[derive(Clone)]
42pub struct SystemConfig {
43 pub(super) name: String,
44 pub(super) stack_size: usize,
45 pub(super) stop_on_panic: bool,
46 pub(super) ping_interval: usize,
47 pub(super) pool_limit: usize,
48 pub(super) pool_recv_timeout: Duration,
49 pub(super) testing: bool,
50 pub(super) runner: Arc<dyn Runner>,
51}
52
53thread_local!(
54 static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
55);
56
57impl Clone for System {
58 fn clone(&self) -> Self {
59 Self {
60 id: self.id,
61 arbiter: Cell::new(None),
62 config: self.config.clone(),
63 sender: self.sender.clone(),
64 receiver: self.receiver.clone(),
65 rt: self.rt.clone(),
66 pool: self.pool.clone(),
67 }
68 }
69}
70
71impl System {
72 pub(super) fn construct(config: SystemConfig) -> Self {
74 let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
75 let (sender, receiver) = unbounded();
76
77 let pool =
78 ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
79
80 System {
81 id,
82 config,
83 sender,
84 receiver,
85 pool,
86 rt: Arc::new(RwLock::new(Arbiter::dummy())),
87 arbiter: Cell::new(None),
88 }
89 }
90
91 pub(super) fn start(&mut self) -> oneshot::Receiver<i32> {
93 let (stop_tx, stop) = oneshot::channel();
94 let (arb, controller) = Arbiter::new_system(self.id, self.config.name.clone());
95
96 self.arbiter.set(Some(arb.clone()));
97 *self.rt.write().unwrap() = arb.clone();
98 System::set_current(self.clone());
99
100 crate::spawn(SystemSupport::new(self, stop_tx).run(arb.id(), arb));
102 crate::spawn(controller.run());
103
104 stop
105 }
106
107 pub fn build() -> Builder {
112 Builder::new()
113 }
114
115 #[allow(clippy::new_ret_no_self)]
116 pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
120 Self::build().name(name).build(runner)
121 }
122
123 #[allow(clippy::new_ret_no_self)]
124 pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
128 Self::build().name(name).build_with(config)
129 }
130
131 pub fn current() -> System {
137 CURRENT.with(|cell| match *cell.borrow() {
138 Some(ref sys) => sys.clone(),
139 None => panic!("System is not running"),
140 })
141 }
142
143 #[doc(hidden)]
145 pub fn set_current(sys: System) {
146 sys.arbiter().set_current();
147 CURRENT.with(|s| {
148 *s.borrow_mut() = Some(sys);
149 });
150 }
151
152 pub fn id(&self) -> Id {
154 Id(self.id)
155 }
156
157 pub fn name(&self) -> &str {
159 &self.config.name
160 }
161
162 pub fn stop(&self) {
164 self.stop_with_code(0);
165 }
166
167 pub fn stop_with_code(&self, code: i32) {
169 let _ = self.sender.try_send(SystemCommand::Exit(code));
170 }
171
172 pub fn stop_on_panic(&self) -> bool {
177 self.config.stop_on_panic
178 }
179
180 pub fn arbiter(&self) -> Arbiter {
186 if let Some(arb) = self.arbiter.take() {
187 self.arbiter.set(Some(arb.clone()));
188 if arb.hnd.is_some() {
189 return arb;
190 }
191 }
192
193 let arb = self.rt.read().unwrap().clone();
194 self.arbiter.set(Some(arb.clone()));
195 arb
196 }
197
198 pub fn list_arbiters<F, R>(f: F) -> R
203 where
204 F: FnOnce(&[Arbiter]) -> R,
205 {
206 ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
207 }
208
209 pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
214 where
215 F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
216 {
217 PINGS.with(|pings| {
218 if let Some(recs) = pings.borrow().get(&id) {
219 f(Some(recs))
220 } else {
221 f(None)
222 }
223 })
224 }
225
226 pub(super) fn sys(&self) -> &Sender<SystemCommand> {
227 &self.sender
228 }
229
230 pub fn config(&self) -> SystemConfig {
232 self.config.clone()
233 }
234
235 #[inline]
236 pub fn handle(&self) -> Handle {
238 self.arbiter().handle().clone()
239 }
240
241 pub fn testing(&self) -> bool {
243 self.config.testing()
244 }
245
246 pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
250 where
251 F: FnOnce() -> R + Send + 'static,
252 R: Send + 'static,
253 {
254 self.pool.dispatch(f)
255 }
256}
257
258impl SystemConfig {
259 #[inline]
260 pub fn testing(&self) -> bool {
262 self.testing
263 }
264}
265
266impl fmt::Debug for System {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 f.debug_struct("System")
269 .field("id", &self.id)
270 .field("config", &self.config)
271 .field("pool", &self.pool)
272 .finish()
273 }
274}
275
276impl fmt::Debug for SystemConfig {
277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278 f.debug_struct("SystemConfig")
279 .field("name", &self.name)
280 .field("testing", &self.testing)
281 .field("stack_size", &self.stack_size)
282 .field("stop_on_panic", &self.stop_on_panic)
283 .finish()
284 }
285}
286
287#[derive(Debug)]
288pub(super) enum SystemCommand {
289 Exit(i32),
290 RegisterArbiter(Id, Arbiter),
291 UnregisterArbiter(Id),
292}
293
294#[derive(Debug)]
295struct SystemSupport {
296 stop: Option<oneshot::Sender<i32>>,
297 commands: Receiver<SystemCommand>,
298 ping_interval: Duration,
299}
300
301impl SystemSupport {
302 fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
303 Self {
304 stop: Some(stop),
305 commands: sys.receiver.clone(),
306 ping_interval: Duration::from_millis(sys.config.ping_interval as u64),
307 }
308 }
309
310 async fn run(mut self, id: Id, arb: Arbiter) {
311 ARBITERS.with(move |arbs| {
312 let mut arbiters = arbs.borrow_mut();
313 arbiters.all.clear();
314 arbiters.list.clear();
315
316 arbiters.all.insert(id, arb.clone());
318 arbiters.list.push(arb.clone());
319 crate::spawn(ping_arbiter(arb, self.ping_interval));
320 });
321
322 loop {
323 match self.commands.recv().await {
324 Ok(SystemCommand::Exit(code)) => {
325 log::debug!("Stopping system with {code} code");
326
327 ARBITERS.with(move |arbs| {
329 let mut arbiters = arbs.borrow_mut();
330 for arb in arbiters.list.drain(..) {
331 arb.stop();
332 }
333 arbiters.all.clear();
334 });
335
336 if let Some(stop) = self.stop.take() {
338 let _ = stop.send(code);
339 }
340 }
341 Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
342 crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
343 ARBITERS.with(move |arbs| {
344 let mut arbiters = arbs.borrow_mut();
345 arbiters.all.insert(id, hnd.clone());
346 arbiters.list.push(hnd);
347 });
348 }
349 Ok(SystemCommand::UnregisterArbiter(id)) => {
350 ARBITERS.with(move |arbs| {
351 let mut arbiters = arbs.borrow_mut();
352 if let Some(hnd) = arbiters.all.remove(&id) {
353 for (idx, arb) in arbiters.list.iter().enumerate() {
354 if &hnd == arb {
355 arbiters.list.remove(idx);
356 break;
357 }
358 }
359 }
360 });
361 }
362 Err(_) => {
363 log::debug!("System stopped");
364 return;
365 }
366 }
367 }
368 }
369}
370
371#[derive(Copy, Clone, Debug)]
372pub struct PingRecord {
373 pub start: Instant,
375 pub rtt: Option<Duration>,
377}
378
379async fn ping_arbiter(arb: Arbiter, interval: Duration) {
380 loop {
381 Delay::new(interval).await;
382
383 let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
385
386 if !is_alive {
387 PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
388 break;
389 }
390
391 let start = Instant::now();
393 PINGS.with(|pings| {
394 let mut p = pings.borrow_mut();
395 let recs = p.entry(arb.id()).or_default();
396 recs.push_front(PingRecord { start, rtt: None });
397 recs.truncate(10);
398 });
399
400 let result = arb
401 .handle()
402 .spawn(async {
403 yield_to().await;
404 })
405 .await;
406
407 if result.is_err() {
408 break;
409 }
410
411 PINGS.with(|pings| {
412 pings
413 .borrow_mut()
414 .get_mut(&arb.id())
415 .unwrap()
416 .front_mut()
417 .unwrap()
418 .rtt = Some(start.elapsed());
419 });
420 }
421}
422
423async fn yield_to() {
424 use std::task::{Context, Poll};
425
426 struct Yield {
427 completed: bool,
428 }
429
430 impl Future for Yield {
431 type Output = ();
432
433 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
434 if self.completed {
435 return Poll::Ready(());
436 }
437 self.completed = true;
438 cx.waker().wake_by_ref();
439 Poll::Pending
440 }
441 }
442
443 Yield { completed: false }.await;
444}
445
446pub(super) trait FnExec: Send + 'static {
447 fn call_box(self: Box<Self>);
448}
449
450impl<F> FnExec for F
451where
452 F: FnOnce() + Send + 'static,
453{
454 #[allow(clippy::boxed_local)]
455 fn call_box(self: Box<Self>) {
456 (*self)();
457 }
458}