mill_io/
poll.rs

1// TODO: add custom error module and use it here
2use lock_freedom::map::Map;
3use mio::{Events, Interest, Poll, Token};
4use std::sync::{Arc, RwLock};
5
6use crate::{
7    error::Result,
8    handler::{EventHandler, HandlerEntry},
9};
10
11type Registry = Arc<Map<Token, HandlerEntry>>;
12
13pub struct PollHandle {
14    poller: Arc<RwLock<mio::Poll>>,
15    mio_registry: mio::Registry,
16    registery: Registry,
17    waker: Arc<mio::Waker>,
18}
19
20impl PollHandle {
21    pub fn new() -> Result<Self> {
22        let poll = Poll::new()?;
23        let mio_registry = poll.registry().try_clone()?;
24        let waker = mio::Waker::new(&mio_registry, Token(0))?;
25        let poller = Arc::new(RwLock::new(poll));
26        let registery: Registry = Arc::new(Map::new());
27        Ok(PollHandle {
28            poller,
29            mio_registry,
30            registery,
31            waker: Arc::new(waker),
32        })
33    }
34
35    pub fn register<H, S>(
36        &self,
37        src: &mut S,
38        token: Token,
39        interest: Interest,
40        handler: H,
41    ) -> Result<()>
42    where
43        H: EventHandler + Send + Sync + 'static,
44        S: mio::event::Source + ?Sized,
45    {
46        let handler_entry = HandlerEntry::new(handler, interest);
47
48        src.register(&self.mio_registry, token, interest)?;
49
50        self.registery.insert(token, handler_entry);
51        Ok(())
52    }
53
54    pub fn deregister<S>(&self, source: &mut S, token: Token) -> Result<()>
55    where
56        S: mio::event::Source + ?Sized,
57    {
58        self.mio_registry.deregister(source)?;
59
60        self.registery.remove(&token);
61
62        Ok(())
63    }
64
65    pub fn poll(&self, events: &mut Events, timeout: Option<std::time::Duration>) -> Result<usize> {
66        let mut poller = self
67            .poller
68            .write()
69            .map_err(|_| "Failed to acquire poller write lock")?;
70        poller.poll(events, timeout)?;
71        Ok(events.iter().count())
72    }
73
74    pub fn wake(&self) -> Result<()> {
75        Ok(self.waker.wake()?)
76    }
77
78    pub fn get_registery(&self) -> Registry {
79        self.registery.clone()
80    }
81}
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use mio::event::{Event, Source};
86    use mio::Events;
87    use std::sync::atomic::{AtomicBool, Ordering};
88    use std::time::Duration;
89
90    struct TestSource;
91    impl Source for TestSource {
92        fn register(
93            &mut self,
94            _registry: &mio::Registry,
95            _token: Token,
96            _interests: Interest,
97        ) -> std::io::Result<()> {
98            Ok(())
99        }
100
101        fn reregister(
102            &mut self,
103            _registry: &mio::Registry,
104            _token: Token,
105            _interests: Interest,
106        ) -> std::io::Result<()> {
107            Ok(())
108        }
109
110        fn deregister(&mut self, _registry: &mio::Registry) -> std::io::Result<()> {
111            Ok(())
112        }
113    }
114    impl TestSource {
115        fn new() -> Self {
116            TestSource
117        }
118    }
119
120    #[test]
121    fn test_poll() {
122        let poller = PollHandle::new().unwrap();
123        let mut events = Events::with_capacity(1024);
124        poller
125            .poll(&mut events, Some(Duration::from_secs(1)))
126            .unwrap();
127    }
128
129    #[test]
130    fn test_wake() {
131        let poller = PollHandle::new().unwrap();
132        assert!(poller.wake().is_ok());
133    }
134
135    #[test]
136    fn test_register_unregister() {
137        let poller = PollHandle::new().unwrap();
138        let mut source = TestSource::new();
139        let token = Token(1);
140
141        struct TestHandler {
142            called: Arc<AtomicBool>,
143        }
144
145        impl EventHandler for TestHandler {
146            fn handle_event(&self, _event: &Event) {
147                self.called.store(true, Ordering::SeqCst);
148            }
149        }
150
151        let handler = TestHandler {
152            called: Arc::new(AtomicBool::new(false)),
153        };
154
155        assert!(
156            poller
157                .register(&mut source, token, Interest::READABLE, handler)
158                .is_ok(),
159            "Failed to register source"
160        );
161
162        assert!(
163            poller.registery.iter().any(|t| t.0 == token),
164            "Token not found in registry"
165        );
166
167        assert!(
168            poller.deregister(&mut source, token).is_ok(),
169            "Failed to unregister source"
170        );
171
172        assert!(
173            !poller.registery.iter().any(|t| t.0 == token),
174            "Token should have been removed from registry"
175        );
176    }
177
178    #[test]
179    fn test_multiple_handlers() {
180        let poller = PollHandle::new().unwrap();
181        let mut src1 = TestSource::new();
182        let mut src2 = TestSource::new();
183
184        struct NoopHandler;
185        impl EventHandler for NoopHandler {
186            fn handle_event(&self, _event: &Event) {}
187        }
188
189        assert!(
190            poller
191                .register(&mut src1, Token(1), Interest::READABLE, NoopHandler)
192                .is_ok(),
193            "Failed to register src1"
194        );
195        assert!(
196            poller
197                .register(&mut src2, Token(2), Interest::WRITABLE, NoopHandler)
198                .is_ok(),
199            "Failed to register src2"
200        );
201
202        assert_eq!(poller.registery.iter().count(), 2);
203        assert!(
204            poller.registery.iter().any(|t| t.0 == Token(1)),
205            "Failed to find src1"
206        );
207        assert!(
208            poller.registery.iter().any(|t| t.0 == Token(2)),
209            "Failed to find src2"
210        );
211    }
212}