message_io/network/
poll.rs

1use super::resource_id::{ResourceId, ResourceType, ResourceIdGenerator};
2
3use mio::{Poll as MioPoll, Interest, Token, Events, Registry, Waker};
4use mio::event::{Source};
5
6use std::time::{Duration};
7use std::sync::{Arc};
8use std::io::{ErrorKind};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11/// Used for the adapter implementation.
12/// Specify the kind of event that is available for a resource.
13pub enum Readiness {
14    /// The resource is available to write
15    Write,
16
17    /// The resource is available to read (has any content to read).
18    Read,
19}
20
21pub enum PollEvent {
22    Network(ResourceId, Readiness),
23    Waker,
24}
25
26impl From<Token> for ResourceId {
27    fn from(token: Token) -> Self {
28        (token.0 >> Poll::RESERVED_BITS).into()
29    }
30}
31
32impl From<ResourceId> for Token {
33    fn from(id: ResourceId) -> Self {
34        Token((id.raw() << Poll::RESERVED_BITS) | 1)
35    }
36}
37
38pub struct Poll {
39    mio_poll: MioPoll,
40    events: Events,
41    #[allow(dead_code)] //TODO: remove it with poll native event support
42    waker: Arc<Waker>,
43}
44
45impl Default for Poll {
46    fn default() -> Self {
47        let mio_poll = MioPoll::new().unwrap();
48        Self {
49            waker: Arc::new(Waker::new(mio_poll.registry(), Self::WAKER_TOKEN).unwrap()),
50            mio_poll,
51            events: Events::with_capacity(Self::EVENTS_SIZE),
52        }
53    }
54}
55
56impl Poll {
57    const EVENTS_SIZE: usize = 1024;
58    const RESERVED_BITS: usize = 1;
59    const WAKER_TOKEN: Token = Token(0);
60
61    pub fn process_event<C>(&mut self, timeout: Option<Duration>, mut event_callback: C)
62    where C: FnMut(PollEvent) {
63        loop {
64            match self.mio_poll.poll(&mut self.events, timeout) {
65                Ok(()) => {
66                    for mio_event in &self.events {
67                        if Self::WAKER_TOKEN == mio_event.token() {
68                            log::trace!("POLL WAKER EVENT");
69                            event_callback(PollEvent::Waker);
70                        }
71                        else {
72                            let id = ResourceId::from(mio_event.token());
73                            if mio_event.is_readable() {
74                                log::trace!("POLL EVENT (R): {}", id);
75                                event_callback(PollEvent::Network(id, Readiness::Read));
76                            }
77                            if mio_event.is_writable() {
78                                log::trace!("POLL EVENT (W): {}", id);
79                                event_callback(PollEvent::Network(id, Readiness::Write));
80                            }
81                        }
82                    }
83                    break;
84                }
85                Err(ref err) if err.kind() == ErrorKind::Interrupted => continue,
86                Err(ref err) => panic!("{}: No error here", err),
87            }
88        }
89    }
90
91    pub fn create_registry(&mut self, adapter_id: u8, resource_type: ResourceType) -> PollRegistry {
92        PollRegistry::new(adapter_id, resource_type, self.mio_poll.registry().try_clone().unwrap())
93    }
94
95    #[allow(dead_code)] //TODO: remove it with poll native event support
96    pub fn create_waker(&mut self) -> PollWaker {
97        PollWaker::new(self.waker.clone())
98    }
99}
100
101pub struct PollRegistry {
102    id_generator: Arc<ResourceIdGenerator>,
103    registry: Registry,
104}
105
106impl PollRegistry {
107    fn new(adapter_id: u8, resource_type: ResourceType, registry: Registry) -> Self {
108        Self {
109            id_generator: Arc::new(ResourceIdGenerator::new(adapter_id, resource_type)),
110            registry,
111        }
112    }
113
114    pub fn add(&self, source: &mut dyn Source, write_readiness: bool) -> ResourceId {
115        let id = self.id_generator.generate();
116        let interest = match write_readiness {
117            true => Interest::READABLE | Interest::WRITABLE,
118            false => Interest::READABLE,
119        };
120        self.registry.register(source, id.into(), interest).unwrap();
121        id
122    }
123
124    pub fn remove(&self, source: &mut dyn Source) {
125        self.registry.deregister(source).unwrap()
126    }
127}
128
129impl Clone for PollRegistry {
130    fn clone(&self) -> Self {
131        Self {
132            id_generator: self.id_generator.clone(),
133            registry: self.registry.try_clone().unwrap(),
134        }
135    }
136}
137
138#[allow(dead_code)] //TODO: remove it with poll native event support
139pub struct PollWaker {
140    waker: Arc<Waker>,
141}
142
143impl PollWaker {
144    #[allow(dead_code)] //TODO: remove it with poll native event support
145    fn new(waker: Arc<Waker>) -> Self {
146        Self { waker }
147    }
148
149    #[allow(dead_code)] //TODO: remove it with poll native event support
150    pub fn wake(&self) {
151        self.waker.wake().unwrap();
152        log::trace!("Wake poll...");
153    }
154}
155
156impl Clone for PollWaker {
157    fn clone(&self) -> Self {
158        Self { waker: self.waker.clone() }
159    }
160}