Skip to main content

ntex_rt/
system.rs

1use std::any::{Any, TypeId};
2use std::collections::{HashMap, VecDeque};
3use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
4use std::time::{Duration, Instant};
5use std::{cell::Cell, cell::RefCell, fmt, future::Future, panic, pin::Pin};
6
7use async_channel::{Receiver, Sender, unbounded};
8use futures_timer::Delay;
9use parking_lot::RwLock;
10
11use crate::pool::ThreadPool;
12use crate::{Arbiter, BlockingResult, Builder, Handle, Runner, SystemRunner};
13
14static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
15
16thread_local!(
17    static ARBITERS: RefCell<Arbiters> = RefCell::new(Arbiters::default());
18    static PINGS: RefCell<HashMap<Id, VecDeque<PingRecord>>> =
19        RefCell::new(HashMap::default());
20);
21
22#[derive(Default)]
23struct Arbiters {
24    all: HashMap<Id, Arbiter>,
25    list: Vec<Arbiter>,
26}
27
28/// System id
29#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct Id(pub(crate) usize);
31
32/// System is a runtime manager
33pub struct System {
34    id: usize,
35    arbiter: Cell<Option<Arbiter>>,
36    config: SystemConfig,
37    sender: Sender<SystemCommand>,
38    receiver: Receiver<SystemCommand>,
39    rt: Arc<RwLock<Arbiter>>,
40    storage: Arc<RwLock<HashMap<TypeId, Box<dyn Any + Sync + Send>>>>,
41    pool: ThreadPool,
42}
43
44#[derive(Clone)]
45pub struct SystemConfig {
46    pub(super) name: String,
47    pub(super) stack_size: usize,
48    pub(super) stop_on_panic: bool,
49    pub(super) ping_interval: usize,
50    pub(super) pool_limit: usize,
51    pub(super) pool_recv_timeout: Duration,
52    pub(super) testing: bool,
53    pub(super) runner: Arc<dyn Runner>,
54}
55
56thread_local!(
57    static CURRENT: RefCell<Option<System>> = const { RefCell::new(None) };
58);
59
60impl Clone for System {
61    fn clone(&self) -> Self {
62        Self {
63            id: self.id,
64            arbiter: Cell::new(None),
65            config: self.config.clone(),
66            sender: self.sender.clone(),
67            receiver: self.receiver.clone(),
68            rt: self.rt.clone(),
69            storage: self.storage.clone(),
70            pool: self.pool.clone(),
71        }
72    }
73}
74
75impl System {
76    /// Constructs new system and sets it as current
77    pub(super) fn construct(config: SystemConfig) -> Self {
78        let id = SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst);
79        let (sender, receiver) = unbounded();
80
81        let pool =
82            ThreadPool::new(&config.name, config.pool_limit, config.pool_recv_timeout);
83
84        System {
85            id,
86            config,
87            sender,
88            receiver,
89            pool,
90            rt: Arc::new(RwLock::new(Arbiter::dummy())),
91            storage: Arc::new(RwLock::new(HashMap::default())),
92            arbiter: Cell::new(None),
93        }
94    }
95
96    /// Constructs new system and sets it as current
97    pub(super) fn start(&mut self) -> oneshot::Receiver<i32> {
98        let (stop_tx, stop) = oneshot::channel();
99        let (arb, controller) = Arbiter::new_system(self.clone(), self.config.name.clone());
100
101        self.arbiter.set(Some(arb.clone()));
102        *self.rt.write() = arb.clone();
103        System::set_current(self.clone());
104
105        // system support tasks
106        crate::spawn(SystemSupport::new(self, stop_tx).run(arb.id(), arb));
107        crate::spawn(controller.run());
108
109        stop
110    }
111
112    /// Build a new system with a customized runtime
113    ///
114    /// This allows to customize the runtime. See struct level docs on
115    /// `Builder` for more information.
116    pub fn build() -> Builder {
117        Builder::new()
118    }
119
120    #[allow(clippy::new_ret_no_self)]
121    /// Create new system
122    ///
123    /// This method panics if it can not create runtime
124    pub fn new<R: Runner>(name: &str, runner: R) -> SystemRunner {
125        Self::build().name(name).build(runner)
126    }
127
128    #[allow(clippy::new_ret_no_self)]
129    /// Create new system
130    ///
131    /// This method panics if it can not create runtime
132    pub fn with_config(name: &str, config: SystemConfig) -> SystemRunner {
133        Self::build().name(name).build_with(config)
134    }
135
136    /// Get current running system
137    ///
138    /// # Panics
139    ///
140    /// Panics if System is not running
141    pub fn current() -> System {
142        CURRENT.with(|cell| match *cell.borrow() {
143            Some(ref sys) => sys.clone(),
144            None => panic!("System is not running"),
145        })
146    }
147
148    /// Set current running system
149    #[doc(hidden)]
150    pub fn set_current(sys: System) {
151        sys.arbiter().set_current();
152        CURRENT.with(|s| {
153            *s.borrow_mut() = Some(sys);
154        });
155    }
156
157    pub(super) fn remove_current() {
158        CURRENT.with(|cell| {
159            cell.borrow_mut().take();
160        });
161    }
162
163    /// System id
164    pub fn id(&self) -> Id {
165        Id(self.id)
166    }
167
168    /// System name
169    pub fn name(&self) -> &str {
170        &self.config.name
171    }
172
173    /// Stop the system
174    pub fn stop(&self) {
175        self.stop_with_code(0);
176    }
177
178    /// Stop the system with a particular exit code
179    pub fn stop_with_code(&self, code: i32) {
180        let _ = self.sender.try_send(SystemCommand::Exit(code));
181    }
182
183    /// Return status of `stop_on_panic` option
184    ///
185    /// It controls whether the System is stopped when an
186    /// uncaught panic is thrown from a worker thread.
187    pub fn stop_on_panic(&self) -> bool {
188        self.config.stop_on_panic
189    }
190
191    /// System arbiter
192    ///
193    /// # Panics
194    ///
195    /// Panics if system is not started
196    pub fn arbiter(&self) -> Arbiter {
197        if let Some(arb) = self.arbiter.take() {
198            self.arbiter.set(Some(arb.clone()));
199            if arb.hnd.is_some() {
200                return arb;
201            }
202        }
203
204        let arb = self.rt.read().clone();
205        self.arbiter.set(Some(arb.clone()));
206        arb
207    }
208
209    /// Retrieves a list of all arbiters in the system
210    ///
211    /// This method should be called from the thread where the system has been initialized,
212    /// typically the "main" thread.
213    pub fn list_arbiters<F, R>(f: F) -> R
214    where
215        F: FnOnce(&[Arbiter]) -> R,
216    {
217        ARBITERS.with(|arbs| f(arbs.borrow().list.as_ref()))
218    }
219
220    /// Retrieves a list of last pings records for specified arbiter
221    ///
222    /// This method should be called from the thread where the system has been initialized,
223    /// typically the "main" thread.
224    pub fn list_arbiter_pings<F, R>(id: Id, f: F) -> R
225    where
226        F: FnOnce(Option<&VecDeque<PingRecord>>) -> R,
227    {
228        PINGS.with(|pings| {
229            if let Some(recs) = pings.borrow().get(&id) {
230                f(Some(recs))
231            } else {
232                f(None)
233            }
234        })
235    }
236
237    pub(super) fn sys(&self) -> &Sender<SystemCommand> {
238        &self.sender
239    }
240
241    /// System config
242    pub fn config(&self) -> SystemConfig {
243        self.config.clone()
244    }
245
246    #[inline]
247    /// Runtime handle for main thread
248    pub fn handle(&self) -> Handle {
249        self.arbiter().handle().clone()
250    }
251
252    /// Testing flag
253    pub fn testing(&self) -> bool {
254        self.config.testing()
255    }
256
257    /// Spawns a blocking task in a new thread, and wait for it
258    ///
259    /// The task will not be cancelled even if the future is dropped.
260    pub fn spawn_blocking<F, R>(&self, f: F) -> BlockingResult<R>
261    where
262        F: FnOnce() -> R + Send + 'static,
263        R: Send + 'static,
264    {
265        self.pool.execute(f)
266    }
267
268    /// Returns a previously registered type, or inserts and returns a new one.
269    ///
270    /// This method acquires a lock on the internal data structure.
271    /// To avoid repeated locking, prefer storing a cloned value in the arbiter's storage.
272    pub fn get_value<T>(&self, f: impl FnOnce() -> T) -> T
273    where
274        T: Clone + Send + Sync + 'static,
275    {
276        if let Some(boxed) = self.storage.read().get(&TypeId::of::<T>())
277            && let Some(val) = (&**boxed as &(dyn Any + 'static)).downcast_ref::<T>()
278        {
279            val.clone()
280        } else {
281            let val = f();
282            self.storage
283                .write()
284                .insert(TypeId::of::<T>(), Box::new(val.clone()));
285            val
286        }
287    }
288}
289
290impl SystemConfig {
291    #[inline]
292    /// Is current system is testing
293    pub fn testing(&self) -> bool {
294        self.testing
295    }
296}
297
298impl fmt::Debug for System {
299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300        f.debug_struct("System")
301            .field("id", &self.id)
302            .field("config", &self.config)
303            .field("pool", &self.pool)
304            .finish()
305    }
306}
307
308impl fmt::Debug for SystemConfig {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        f.debug_struct("SystemConfig")
311            .field("name", &self.name)
312            .field("testing", &self.testing)
313            .field("stack_size", &self.stack_size)
314            .field("stop_on_panic", &self.stop_on_panic)
315            .finish()
316    }
317}
318
319#[derive(Debug)]
320pub(super) enum SystemCommand {
321    Exit(i32),
322    RegisterArbiter(Id, Arbiter),
323    UnregisterArbiter(Id),
324}
325
326#[derive(Debug)]
327struct SystemSupport {
328    stop: Option<oneshot::Sender<i32>>,
329    commands: Receiver<SystemCommand>,
330    ping_interval: Duration,
331}
332
333impl SystemSupport {
334    fn new(sys: &System, stop: oneshot::Sender<i32>) -> Self {
335        Self {
336            stop: Some(stop),
337            commands: sys.receiver.clone(),
338            ping_interval: Duration::from_millis(sys.config.ping_interval as u64),
339        }
340    }
341
342    async fn run(mut self, id: Id, arb: Arbiter) {
343        ARBITERS.with(move |arbs| {
344            let mut arbiters = arbs.borrow_mut();
345            arbiters.all.clear();
346            arbiters.list.clear();
347
348            // system arbiter
349            arbiters.all.insert(id, arb.clone());
350            arbiters.list.push(arb.clone());
351            crate::spawn(ping_arbiter(arb, self.ping_interval));
352        });
353
354        loop {
355            match self.commands.recv().await {
356                Ok(SystemCommand::Exit(code)) => {
357                    log::debug!("Stopping system with {code} code");
358
359                    // stop arbiters
360                    ARBITERS.with(move |arbs| {
361                        let mut arbiters = arbs.borrow_mut();
362                        for arb in arbiters.list.drain(..) {
363                            arb.stop();
364                        }
365                        arbiters.all.clear();
366                    });
367
368                    // stop event loop
369                    if let Some(stop) = self.stop.take() {
370                        let _ = stop.send(code);
371                    }
372                }
373                Ok(SystemCommand::RegisterArbiter(id, hnd)) => {
374                    crate::spawn(ping_arbiter(hnd.clone(), self.ping_interval));
375                    ARBITERS.with(move |arbs| {
376                        let mut arbiters = arbs.borrow_mut();
377                        arbiters.all.insert(id, hnd.clone());
378                        arbiters.list.push(hnd);
379                    });
380                }
381                Ok(SystemCommand::UnregisterArbiter(id)) => {
382                    ARBITERS.with(move |arbs| {
383                        let mut arbiters = arbs.borrow_mut();
384                        if let Some(hnd) = arbiters.all.remove(&id) {
385                            for (idx, arb) in arbiters.list.iter().enumerate() {
386                                if &hnd == arb {
387                                    arbiters.list.remove(idx);
388                                    break;
389                                }
390                            }
391                        }
392                    });
393                }
394                Err(_) => {
395                    log::debug!("System stopped");
396                    return;
397                }
398            }
399        }
400    }
401}
402
403#[derive(Copy, Clone, Debug)]
404pub struct PingRecord {
405    /// Ping start time
406    pub start: Instant,
407    /// Round-trip time, if value is not set then ping is in process
408    pub rtt: Option<Duration>,
409}
410
411async fn ping_arbiter(arb: Arbiter, interval: Duration) {
412    loop {
413        Delay::new(interval).await;
414
415        // check if arbiter is still active
416        let is_alive = ARBITERS.with(|arbs| arbs.borrow().all.contains_key(&arb.id()));
417
418        if !is_alive {
419            PINGS.with(|pings| pings.borrow_mut().remove(&arb.id()));
420            break;
421        }
422
423        // calc ttl
424        let start = Instant::now();
425        PINGS.with(|pings| {
426            let mut p = pings.borrow_mut();
427            let recs = p.entry(arb.id()).or_default();
428            recs.push_front(PingRecord { start, rtt: None });
429            recs.truncate(10);
430        });
431
432        let result = arb
433            .handle()
434            .spawn(async {
435                yield_to().await;
436            })
437            .await;
438
439        if result.is_err() {
440            break;
441        }
442
443        PINGS.with(|pings| {
444            pings
445                .borrow_mut()
446                .get_mut(&arb.id())
447                .unwrap()
448                .front_mut()
449                .unwrap()
450                .rtt = Some(start.elapsed());
451        });
452    }
453}
454
455async fn yield_to() {
456    use std::task::{Context, Poll};
457
458    struct Yield {
459        completed: bool,
460    }
461
462    impl Future for Yield {
463        type Output = ();
464
465        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
466            if self.completed {
467                return Poll::Ready(());
468            }
469            self.completed = true;
470            cx.waker().wake_by_ref();
471            Poll::Pending
472        }
473    }
474
475    Yield { completed: false }.await;
476}
477
478pub(super) trait FnExec: Send + 'static {
479    fn call_box(self: Box<Self>);
480}
481
482impl<F> FnExec for F
483where
484    F: FnOnce() + Send + 'static,
485{
486    #[allow(clippy::boxed_local)]
487    fn call_box(self: Box<Self>) {
488        (*self)();
489    }
490}