tl_async_runtime/
io.rs

1/// Networking specific handlers
2pub mod net;
3
4use crate::{driver::executor_context, Executor};
5use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
6use mio::event::Source;
7use rand::Rng;
8use std::{
9    collections::HashMap,
10    ops::{Deref, DerefMut},
11    sync::Arc,
12    time::Duration,
13};
14
15#[derive(Debug)]
16pub(crate) struct Event(u8);
17impl Event {
18    pub fn is_readable(&self) -> bool {
19        self.0 & 1 != 0
20    }
21    pub fn is_writable(&self) -> bool {
22        self.0 & 2 != 0
23    }
24}
25
26impl From<&mio::event::Event> for Event {
27    fn from(e: &mio::event::Event) -> Self {
28        let mut event = 0;
29        event |= (e.is_readable() as u8) << 1;
30        event |= (e.is_writable() as u8) << 2;
31        Event(event)
32    }
33}
34
35pub(crate) struct Os {
36    pub poll: mio::Poll,
37    pub events: mio::Events,
38    pub tasks: HashMap<mio::Token, UnboundedSender<Event>>,
39}
40
41impl Default for Os {
42    fn default() -> Self {
43        Self {
44            poll: mio::Poll::new().unwrap(),
45            events: mio::Events::with_capacity(128),
46            tasks: HashMap::new(),
47        }
48    }
49}
50
51impl Os {
52    /// Polls the OS for new events, and dispatches those to any awaiting tasks
53    pub(crate) fn process(&mut self) {
54        let Self {
55            poll,
56            events,
57            tasks,
58        } = self;
59        poll.poll(events, Some(Duration::from_millis(10))).unwrap();
60
61        for event in &*events {
62            if let Some(sender) = tasks.get(&event.token()) {
63                sender.unbounded_send(event.into()).unwrap();
64            }
65        }
66    }
67}
68
69pub(crate) struct Registration<S: Source> {
70    pub exec: Arc<Executor>,
71    pub token: mio::Token,
72    pub source: S,
73}
74
75// allow internal access to the source
76impl<S: Source> Deref for Registration<S> {
77    type Target = S;
78
79    fn deref(&self) -> &Self::Target {
80        &self.source
81    }
82}
83impl<S: Source> DerefMut for Registration<S> {
84    fn deref_mut(&mut self) -> &mut Self::Target {
85        &mut self.source
86    }
87}
88
89impl<S: Source> Registration<S> {
90    pub fn new(mut source: S, interests: mio::Interest) -> std::io::Result<Self> {
91        executor_context(|exec| {
92            let token = mio::Token(rand::thread_rng().gen());
93            let os = exec.os.lock();
94            os.poll.registry().register(&mut source, token, interests)?;
95            Ok(Self {
96                exec: exec.clone(),
97                token,
98                source,
99            })
100        })
101    }
102
103    // register this token on the event dispatcher
104    // and return a receiver to it
105    pub fn events(&self) -> UnboundedReceiver<Event> {
106        let (sender, receiver) = unbounded();
107        self.exec.os.lock().tasks.insert(self.token, sender);
108        receiver
109    }
110}
111
112impl<S: Source> Drop for Registration<S> {
113    fn drop(&mut self) {
114        let mut os = self.exec.os.lock();
115        // deregister the source from the OS
116        os.poll.registry().deregister(&mut self.source).unwrap();
117        // remove the event dispatcher
118        os.tasks.remove(&self.token);
119    }
120}