1use std::collections::VecDeque;
27use std::io::{self, Error};
28use std::os::unix::io::{AsRawFd, RawFd};
29use std::sync::Arc;
30use std::time::Duration;
31
32use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
33use crate::{ResourceId, ResourceIdGenerator};
34
35pub struct Poller {
38 poll: popol::Sources<ResourceId>,
39 events: VecDeque<popol::Event<ResourceId>>,
40 id_gen: ResourceIdGenerator,
41}
42
43impl Default for Poller {
44 fn default() -> Self { Self::new() }
45}
46
47impl Poller {
48 pub fn new() -> Self {
50 Self {
51 poll: popol::Sources::new(),
52 events: empty!(),
53 id_gen: ResourceIdGenerator::default(),
54 }
55 }
56
57 pub fn with_capacity(capacity: usize) -> Self {
60 Self {
61 poll: popol::Sources::with_capacity(capacity),
62 events: VecDeque::with_capacity(capacity),
63 id_gen: ResourceIdGenerator::default(),
64 }
65 }
66}
67
68impl Poll for Poller {
69 type Waker = PopolWaker;
70
71 fn register_waker(&mut self, fd: &impl AsRawFd) {
72 let id = ResourceId::WAKER;
73 if self.poll.get(&id).is_some() {
74 #[cfg(feature = "log")]
75 log::error!(target: "popol", "Reactor waker is already registered, terminating");
76 panic!("Reactor waker is already registered");
77 }
78
79 self.poll.register(id, fd, popol::interest::READ);
80 }
81
82 fn register(&mut self, fd: &impl AsRawFd, interest: IoType) -> ResourceId {
83 let id = self.id_gen.next();
84
85 #[cfg(feature = "log")]
86 log::trace!(target: "popol", "Registering file descriptor {} as resource with id {}", fd.as_raw_fd(), id);
87
88 self.poll.register(id, fd, interest.into());
89 id
90 }
91
92 fn unregister(&mut self, id: ResourceId) {
93 #[cfg(feature = "log")]
94 log::trace!(target: "popol", "Unregistering {}", id);
95 self.poll.unregister(&id);
96 }
97
98 fn set_interest(&mut self, id: ResourceId, interest: IoType) -> bool {
99 #[cfg(feature = "log")]
100 log::trace!(target: "popol", "Setting interest `{interest}` on {}", id);
101
102 self.poll.unset(&id, (!interest).into());
103 self.poll.set(&id, interest.into())
104 }
105
106 fn poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
107 #[cfg(feature = "log")]
108 log::trace!(target: "popol",
109 "Polling {} reactor resources with timeout {timeout:?} (pending event queue is {})",
110 self.poll.len(), self.events.len()
111 );
112
113 match self.poll.poll(&mut self.events, timeout) {
115 Ok(count) => {
116 #[cfg(feature = "log")]
117 log::trace!(target: "popol", "Poll resulted in {} new event(s)", count);
118 Ok(count)
119 }
120 Err(err) if err.kind() == io::ErrorKind::TimedOut => {
121 #[cfg(feature = "log")]
122 log::trace!(target: "popol", "Poll timed out with zero events generated");
123 Ok(0)
124 }
125 Err(err) => {
126 #[cfg(feature = "log")]
127 log::trace!(target: "popol", "Poll resulted in error: {err}");
128 Err(err)
129 }
130 }
131 }
132}
133
134impl Iterator for Poller {
135 type Item = (ResourceId, Result<IoType, IoFail>);
136
137 fn next(&mut self) -> Option<Self::Item> {
138 let event = self.events.pop_front()?;
139
140 let id = event.key;
141 let fired = event.raw_events();
142 let res = if event.is_hangup() {
143 #[cfg(feature = "log")]
144 log::trace!(target: "popol", "Hangup on {id}");
145
146 Err(IoFail::Connectivity(fired))
147 } else if event.is_error() || event.is_invalid() {
148 #[cfg(feature = "log")]
149 log::trace!(target: "popol", "OS error on {id} (fired events {fired:#b})");
150
151 Err(IoFail::Os(fired))
152 } else {
153 let io = IoType {
154 read: event.is_readable(),
155 write: event.is_writable(),
156 };
157
158 #[cfg(feature = "log")]
159 log::trace!(target: "popol", "I/O event on {id}: {io}");
160
161 Ok(io)
162 };
163 Some((id, res))
164 }
165}
166
167impl From<IoType> for popol::Interest {
168 fn from(ev: IoType) -> Self {
169 let mut e = popol::interest::NONE;
170 if ev.read {
171 e |= popol::interest::READ;
172 }
173 if ev.write {
174 e |= popol::interest::WRITE;
175 }
176 e
177 }
178}
179
180#[derive(Clone)]
182pub struct PopolWaker(Arc<popol::Waker>);
183
184impl Waker for PopolWaker {
185 type Send = Self;
186 type Recv = Self;
187
188 fn pair() -> Result<(Self::Send, Self::Recv), Error> {
189 let waker = Arc::new(popol::Waker::new()?);
190 Ok((PopolWaker(waker.clone()), PopolWaker(waker)))
191 }
192}
193
194impl io::Read for PopolWaker {
195 fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
196 self.reset();
197 Ok(0)
200 }
201}
202
203impl AsRawFd for PopolWaker {
204 fn as_raw_fd(&self) -> RawFd { self.0.as_ref().as_raw_fd() }
205}
206
207impl WakerRecv for PopolWaker {
208 fn reset(&self) {
209 if let Err(e) = popol::Waker::reset(self.0.as_ref()) {
210 #[cfg(feature = "log")]
211 log::error!(target: "reactor-controller", "Unable to reset waker queue: {e}");
212 panic!("unable to reset waker queue. Details: {e}");
213 }
214 }
215}
216
217impl WakerSend for PopolWaker {
218 fn wake(&self) -> io::Result<()> { self.0.wake() }
219}