message_io/network/
poll.rs1use super::resource_id::{ResourceId, ResourceType, ResourceIdGenerator};
2
3use mio::{Poll as MioPoll, Interest, Token, Events, Registry, Waker};
4use mio::event::{Source};
5
6use std::time::{Duration};
7use std::sync::{Arc};
8use std::io::{ErrorKind};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11pub enum Readiness {
14 Write,
16
17 Read,
19}
20
21pub enum PollEvent {
22 Network(ResourceId, Readiness),
23 Waker,
24}
25
26impl From<Token> for ResourceId {
27 fn from(token: Token) -> Self {
28 (token.0 >> Poll::RESERVED_BITS).into()
29 }
30}
31
32impl From<ResourceId> for Token {
33 fn from(id: ResourceId) -> Self {
34 Token((id.raw() << Poll::RESERVED_BITS) | 1)
35 }
36}
37
38pub struct Poll {
39 mio_poll: MioPoll,
40 events: Events,
41 #[allow(dead_code)] waker: Arc<Waker>,
43}
44
45impl Default for Poll {
46 fn default() -> Self {
47 let mio_poll = MioPoll::new().unwrap();
48 Self {
49 waker: Arc::new(Waker::new(mio_poll.registry(), Self::WAKER_TOKEN).unwrap()),
50 mio_poll,
51 events: Events::with_capacity(Self::EVENTS_SIZE),
52 }
53 }
54}
55
56impl Poll {
57 const EVENTS_SIZE: usize = 1024;
58 const RESERVED_BITS: usize = 1;
59 const WAKER_TOKEN: Token = Token(0);
60
61 pub fn process_event<C>(&mut self, timeout: Option<Duration>, mut event_callback: C)
62 where C: FnMut(PollEvent) {
63 loop {
64 match self.mio_poll.poll(&mut self.events, timeout) {
65 Ok(()) => {
66 for mio_event in &self.events {
67 if Self::WAKER_TOKEN == mio_event.token() {
68 log::trace!("POLL WAKER EVENT");
69 event_callback(PollEvent::Waker);
70 }
71 else {
72 let id = ResourceId::from(mio_event.token());
73 if mio_event.is_readable() {
74 log::trace!("POLL EVENT (R): {}", id);
75 event_callback(PollEvent::Network(id, Readiness::Read));
76 }
77 if mio_event.is_writable() {
78 log::trace!("POLL EVENT (W): {}", id);
79 event_callback(PollEvent::Network(id, Readiness::Write));
80 }
81 }
82 }
83 break;
84 }
85 Err(ref err) if err.kind() == ErrorKind::Interrupted => continue,
86 Err(ref err) => panic!("{}: No error here", err),
87 }
88 }
89 }
90
91 pub fn create_registry(&mut self, adapter_id: u8, resource_type: ResourceType) -> PollRegistry {
92 PollRegistry::new(adapter_id, resource_type, self.mio_poll.registry().try_clone().unwrap())
93 }
94
95 #[allow(dead_code)] pub fn create_waker(&mut self) -> PollWaker {
97 PollWaker::new(self.waker.clone())
98 }
99}
100
101pub struct PollRegistry {
102 id_generator: Arc<ResourceIdGenerator>,
103 registry: Registry,
104}
105
106impl PollRegistry {
107 fn new(adapter_id: u8, resource_type: ResourceType, registry: Registry) -> Self {
108 Self {
109 id_generator: Arc::new(ResourceIdGenerator::new(adapter_id, resource_type)),
110 registry,
111 }
112 }
113
114 pub fn add(&self, source: &mut dyn Source, write_readiness: bool) -> ResourceId {
115 let id = self.id_generator.generate();
116 let interest = match write_readiness {
117 true => Interest::READABLE | Interest::WRITABLE,
118 false => Interest::READABLE,
119 };
120 self.registry.register(source, id.into(), interest).unwrap();
121 id
122 }
123
124 pub fn remove(&self, source: &mut dyn Source) {
125 self.registry.deregister(source).unwrap()
126 }
127}
128
129impl Clone for PollRegistry {
130 fn clone(&self) -> Self {
131 Self {
132 id_generator: self.id_generator.clone(),
133 registry: self.registry.try_clone().unwrap(),
134 }
135 }
136}
137
138#[allow(dead_code)] pub struct PollWaker {
140 waker: Arc<Waker>,
141}
142
143impl PollWaker {
144 #[allow(dead_code)] fn new(waker: Arc<Waker>) -> Self {
146 Self { waker }
147 }
148
149 #[allow(dead_code)] pub fn wake(&self) {
151 self.waker.wake().unwrap();
152 log::trace!("Wake poll...");
153 }
154}
155
156impl Clone for PollWaker {
157 fn clone(&self) -> Self {
158 Self { waker: self.waker.clone() }
159 }
160}