1use lockfree::map::Map;
3use mio::{Events, Interest, Poll, Token};
4use std::sync::{Arc, RwLock};
5
6use crate::{
7 error::Result,
8 handler::{EventHandler, HandlerEntry},
9};
10
11type Registry = Arc<Map<Token, HandlerEntry>>;
12
13pub struct PollHandle {
14 poller: Arc<RwLock<mio::Poll>>,
15 mio_registry: mio::Registry,
16 registery: Registry,
17 waker: Arc<mio::Waker>,
18}
19
20impl PollHandle {
21 pub fn new() -> Result<Self> {
22 let poll = Poll::new()?;
23 let mio_registry = poll.registry().try_clone()?;
24 let waker = mio::Waker::new(&mio_registry, Token(0))?;
25 let poller = Arc::new(RwLock::new(poll));
26 let registery: Registry = Arc::new(Map::new());
27 Ok(PollHandle {
28 poller,
29 mio_registry,
30 registery,
31 waker: Arc::new(waker),
32 })
33 }
34
35 pub fn register<H, S>(
36 &self,
37 src: &mut S,
38 token: Token,
39 interest: Interest,
40 handler: H,
41 ) -> Result<()>
42 where
43 H: EventHandler + Send + Sync + 'static,
44 S: mio::event::Source + ?Sized,
45 {
46 let handler_entry = HandlerEntry::new(handler, interest);
47
48 src.register(&self.mio_registry, token, interest)?;
49
50 self.registery.insert(token, handler_entry);
51 Ok(())
52 }
53
54 pub fn deregister<S>(&self, source: &mut S, token: Token) -> Result<()>
55 where
56 S: mio::event::Source + ?Sized,
57 {
58 self.mio_registry.deregister(source)?;
59
60 self.registery.remove(&token);
61
62 Ok(())
63 }
64
65 pub fn poll(&self, events: &mut Events, timeout: Option<std::time::Duration>) -> Result<usize> {
66 let mut poller = self
67 .poller
68 .write()
69 .map_err(|_| "Failed to acquire poller write lock")?;
70 poller.poll(events, timeout)?;
71 Ok(events.iter().count())
72 }
73
74 pub fn wake(&self) -> Result<()> {
75 Ok(self.waker.wake()?)
76 }
77
78 pub fn get_registery(&self) -> Registry {
79 self.registery.clone()
80 }
81}
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use mio::event::{Event, Source};
86 use mio::Events;
87 use std::sync::atomic::{AtomicBool, Ordering};
88 use std::time::Duration;
89
90 struct TestSource;
91 impl Source for TestSource {
92 fn register(
93 &mut self,
94 _registry: &mio::Registry,
95 _token: Token,
96 _interests: Interest,
97 ) -> std::io::Result<()> {
98 Ok(())
99 }
100
101 fn reregister(
102 &mut self,
103 _registry: &mio::Registry,
104 _token: Token,
105 _interests: Interest,
106 ) -> std::io::Result<()> {
107 Ok(())
108 }
109
110 fn deregister(&mut self, _registry: &mio::Registry) -> std::io::Result<()> {
111 Ok(())
112 }
113 }
114 impl TestSource {
115 fn new() -> Self {
116 TestSource
117 }
118 }
119
120 #[test]
121 fn test_poll() {
122 let poller = PollHandle::new().unwrap();
123 let mut events = Events::with_capacity(1024);
124 poller
125 .poll(&mut events, Some(Duration::from_secs(1)))
126 .unwrap();
127 }
128
129 #[test]
130 fn test_wake() {
131 let poller = PollHandle::new().unwrap();
132 assert!(poller.wake().is_ok());
133 }
134
135 #[test]
136 fn test_register_unregister() {
137 let poller = PollHandle::new().unwrap();
138 let mut source = TestSource::new();
139 let token = Token(1);
140
141 struct TestHandler {
142 called: Arc<AtomicBool>,
143 }
144
145 impl EventHandler for TestHandler {
146 fn handle_event(&self, _event: &Event) {
147 self.called.store(true, Ordering::SeqCst);
148 }
149 }
150
151 let handler = TestHandler {
152 called: Arc::new(AtomicBool::new(false)),
153 };
154
155 assert!(
156 poller
157 .register(&mut source, token, Interest::READABLE, handler)
158 .is_ok(),
159 "Failed to register source"
160 );
161
162 assert!(
163 poller.registery.iter().any(|t| t.0 == token),
164 "Token not found in registry"
165 );
166
167 assert!(
168 poller.deregister(&mut source, token).is_ok(),
169 "Failed to unregister source"
170 );
171
172 assert!(
173 !poller.registery.iter().any(|t| t.0 == token),
174 "Token should have been removed from registry"
175 );
176 }
177
178 #[test]
179 fn test_multiple_handlers() {
180 let poller = PollHandle::new().unwrap();
181 let mut src1 = TestSource::new();
182 let mut src2 = TestSource::new();
183
184 struct NoopHandler;
185 impl EventHandler for NoopHandler {
186 fn handle_event(&self, _event: &Event) {}
187 }
188
189 assert!(
190 poller
191 .register(&mut src1, Token(1), Interest::READABLE, NoopHandler)
192 .is_ok(),
193 "Failed to register src1"
194 );
195 assert!(
196 poller
197 .register(&mut src2, Token(2), Interest::WRITABLE, NoopHandler)
198 .is_ok(),
199 "Failed to register src2"
200 );
201
202 assert_eq!(poller.registery.iter().count(), 2);
203 assert!(
204 poller.registery.iter().any(|t| t.0 == Token(1)),
205 "Failed to find src1"
206 );
207 assert!(
208 poller.registery.iter().any(|t| t.0 == Token(2)),
209 "Failed to find src2"
210 );
211 }
212}