1use std::time::{Duration, SystemTime};
2
3use mirai::PollDriver;
4use tracing::error;
5
6use crate::{
7 io::timers::TIMERS,
8 system::SysHandle,
9 traits::{Loop, StageReactor},
10};
11
12pub(crate) struct IoReactor {
13 #[cfg(feature = "use-mirai")]
14 driver: PollDriver,
15}
16
17impl StageReactor for IoReactor {
18 type Config = IoReactorConfig;
19 const THREAD_NAME: &'static str = "IoReactor";
20
21 fn new(config: Self::Config) -> Self {
22 #[cfg(feature = "use-mirai")]
23 {
24 let IoReactorConfig { poll_timeout, event_buffer_capacity } = config;
25 let (driver, reg) = PollDriver::new(poll_timeout, event_buffer_capacity).unwrap();
26 SysHandle::init_mirai(reg);
27
28 Self { driver }
29 }
30 }
31
32 fn iter(&mut self) -> Loop {
33 #[cfg(not(feature = "use-mirai"))]
34 {
35 self.timer_only_iter()
36 }
37
38 #[cfg(feature = "use-mirai")]
39 {
40 self.regular_iter()
41 }
42 }
43}
44
45impl IoReactor {
46 #[cfg(not(feature = "use-mirai"))]
47 fn timer_only_iter(&mut self) -> Loop {
48 let (mutex, condvar) = &*TIMERS;
49
50 let mut timers = mutex.lock().unwrap();
51 let now = SystemTime::now();
52
53 while let Some(timer) = timers.pop() {
54 if let Some(dur) = timer.time_until(now) {
55 let (mut t, w) = condvar.wait_timeout(timers, dur).unwrap();
56 if w.timed_out() {
57 timer.exec()
58 } else {
59 t.push(timer);
60 }
61 timers = t;
62 } else {
63 timer.exec();
64 }
65 }
66
67 drop(condvar.wait(timers).unwrap());
68
69 Loop::Continue
70 }
71
72 #[cfg(feature = "use-mirai")]
73 fn regular_iter(&mut self) -> Loop {
74 if let Err(err) = self.driver.iter() {
75 error!("Error polling: {}", err);
76 }
77
78 let mut timers = TIMERS.0.lock().unwrap();
79 let now = SystemTime::now();
80
81 while let Some(timer) = timers.pop() {
82 if timer.time_until(now).is_some() {
83 timers.push(timer);
84 break;
85 } else {
86 timer.exec();
87 }
88 }
89
90 Loop::Continue
91 }
92}
93
94pub struct IoReactorConfig {
95 pub poll_timeout: Duration,
97 pub event_buffer_capacity: usize,
99}