1pub mod net;
3
4use crate::{driver::executor_context, Executor};
5use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
6use mio::event::Source;
7use rand::Rng;
8use std::{
9 collections::HashMap,
10 ops::{Deref, DerefMut},
11 sync::Arc,
12 time::Duration,
13};
14
15#[derive(Debug)]
16pub(crate) struct Event(u8);
17impl Event {
18 pub fn is_readable(&self) -> bool {
19 self.0 & 1 != 0
20 }
21 pub fn is_writable(&self) -> bool {
22 self.0 & 2 != 0
23 }
24}
25
26impl From<&mio::event::Event> for Event {
27 fn from(e: &mio::event::Event) -> Self {
28 let mut event = 0;
29 event |= (e.is_readable() as u8) << 1;
30 event |= (e.is_writable() as u8) << 2;
31 Event(event)
32 }
33}
34
35pub(crate) struct Os {
36 pub poll: mio::Poll,
37 pub events: mio::Events,
38 pub tasks: HashMap<mio::Token, UnboundedSender<Event>>,
39}
40
41impl Default for Os {
42 fn default() -> Self {
43 Self {
44 poll: mio::Poll::new().unwrap(),
45 events: mio::Events::with_capacity(128),
46 tasks: HashMap::new(),
47 }
48 }
49}
50
51impl Os {
52 pub(crate) fn process(&mut self) {
54 let Self {
55 poll,
56 events,
57 tasks,
58 } = self;
59 poll.poll(events, Some(Duration::from_millis(10))).unwrap();
60
61 for event in &*events {
62 if let Some(sender) = tasks.get(&event.token()) {
63 sender.unbounded_send(event.into()).unwrap();
64 }
65 }
66 }
67}
68
69pub(crate) struct Registration<S: Source> {
70 pub exec: Arc<Executor>,
71 pub token: mio::Token,
72 pub source: S,
73}
74
75impl<S: Source> Deref for Registration<S> {
77 type Target = S;
78
79 fn deref(&self) -> &Self::Target {
80 &self.source
81 }
82}
83impl<S: Source> DerefMut for Registration<S> {
84 fn deref_mut(&mut self) -> &mut Self::Target {
85 &mut self.source
86 }
87}
88
89impl<S: Source> Registration<S> {
90 pub fn new(mut source: S, interests: mio::Interest) -> std::io::Result<Self> {
91 executor_context(|exec| {
92 let token = mio::Token(rand::thread_rng().gen());
93 let os = exec.os.lock();
94 os.poll.registry().register(&mut source, token, interests)?;
95 Ok(Self {
96 exec: exec.clone(),
97 token,
98 source,
99 })
100 })
101 }
102
103 pub fn events(&self) -> UnboundedReceiver<Event> {
106 let (sender, receiver) = unbounded();
107 self.exec.os.lock().tasks.insert(self.token, sender);
108 receiver
109 }
110}
111
112impl<S: Source> Drop for Registration<S> {
113 fn drop(&mut self) {
114 let mut os = self.exec.os.lock();
115 os.poll.registry().deregister(&mut self.source).unwrap();
117 os.tasks.remove(&self.token);
119 }
120}