Skip to main content

stage/io/
reactor.rs

1use std::time::{Duration, SystemTime};
2
3use mirai::PollDriver;
4use tracing::error;
5
6use crate::{
7    io::timers::TIMERS,
8    system::SysHandle,
9    traits::{Loop, StageReactor},
10};
11
12pub(crate) struct IoReactor {
13    #[cfg(feature = "use-mirai")]
14    driver: PollDriver,
15}
16
17impl StageReactor for IoReactor {
18    type Config = IoReactorConfig;
19    const THREAD_NAME: &'static str = "IoReactor";
20
21    fn new(config: Self::Config) -> Self {
22        #[cfg(feature = "use-mirai")]
23        {
24            let IoReactorConfig { poll_timeout, event_buffer_capacity } = config;
25            let (driver, reg) = PollDriver::new(poll_timeout, event_buffer_capacity).unwrap();
26            SysHandle::init_mirai(reg);
27
28            Self { driver }
29        }
30    }
31
32    fn iter(&mut self) -> Loop {
33        #[cfg(not(feature = "use-mirai"))]
34        {
35            self.timer_only_iter()
36        }
37
38        #[cfg(feature = "use-mirai")]
39        {
40            self.regular_iter()
41        }
42    }
43}
44
45impl IoReactor {
46    #[cfg(not(feature = "use-mirai"))]
47    fn timer_only_iter(&mut self) -> Loop {
48        let (mutex, condvar) = &*TIMERS;
49
50        let mut timers = mutex.lock().unwrap();
51        let now = SystemTime::now();
52
53        while let Some(timer) = timers.pop() {
54            if let Some(dur) = timer.time_until(now) {
55                let (mut t, w) = condvar.wait_timeout(timers, dur).unwrap();
56                if w.timed_out() {
57                    timer.exec()
58                } else {
59                    t.push(timer);
60                }
61                timers = t;
62            } else {
63                timer.exec();
64            }
65        }
66
67        drop(condvar.wait(timers).unwrap());
68
69        Loop::Continue
70    }
71
72    #[cfg(feature = "use-mirai")]
73    fn regular_iter(&mut self) -> Loop {
74        if let Err(err) = self.driver.iter() {
75            error!("Error polling: {}", err);
76        }
77
78        let mut timers = TIMERS.0.lock().unwrap();
79        let now = SystemTime::now();
80
81        while let Some(timer) = timers.pop() {
82            if timer.time_until(now).is_some() {
83                timers.push(timer);
84                break;
85            } else {
86                timer.exec();
87            }
88        }
89
90        Loop::Continue
91    }
92}
93
94pub struct IoReactorConfig {
95    /// Passed to Mio
96    pub poll_timeout: Duration,
97    /// Passed to Mirai, defines [`mio::Events`] capacity.
98    pub event_buffer_capacity: usize,
99}