1use std::{
2 collections::HashMap,
3 io::Result as IoResult,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 mpsc::{Receiver, Sender},
7 Arc, Mutex,
8 },
9 task::Waker,
10 time::Duration,
11};
12
13use futures::io::Error as FutIoError;
14use log::error;
15use mio::{Evented, Events, PollOpt, Ready, Token as MioToken};
16
17type FutIoResult<T> = Result<T, FutIoError>;
18
19pub mod tcp;
20
21#[derive(Clone)]
22pub struct PollBundle {
23 events: Arc<Mutex<Events>>,
24 poll: Arc<mio::Poll>,
25 timeout: Option<Duration>,
26 token_counter: Arc<AtomicUsize>,
29 token_freed: Arc<Mutex<Receiver<usize>>>,
30 token_drop_box: Sender<usize>,
31 wakers: Arc<Mutex<HashMap<usize, Arc<Mutex<Option<Waker>>>>>>,
32}
33
34pub struct Token {
40 val: usize,
41 drop_box: Sender<usize>,
42 bundle: PollBundle,
43}
44
45impl Token {
46 pub fn get_mio(&self) -> MioToken {
47 MioToken(self.val)
48 }
49}
50
51impl PartialEq<MioToken> for Token {
52 fn eq(&self, other: &MioToken) -> bool {
53 self.val == other.0
54 }
55}
56
57impl PartialEq<Token> for MioToken {
58 fn eq(&self, other: &Token) -> bool {
59 self.0 == other.val
60 }
61}
62
63impl Drop for Token {
64 fn drop(&mut self) {
65 let _ = self.bundle.wakers.lock().map(|mut g| g.remove(&self.val));
67 let _ = self.drop_box.send(self.val);
68 }
69}
70
71impl PollBundle {
72 pub fn new(
73 timeout: impl Into<Option<Duration>>,
74 event_buf_size: usize,
75 ) -> IoResult<PollBundle> {
76 let (tx, rx) = std::sync::mpsc::channel();
77 Ok(PollBundle {
78 events: Arc::new(Mutex::new(Events::with_capacity(event_buf_size))),
79 poll: Arc::new(mio::Poll::new()?),
80 timeout: timeout.into(),
81 token_counter: Arc::new(AtomicUsize::new(0)),
82 token_freed: Arc::new(Mutex::new(rx)),
83 token_drop_box: tx,
84 wakers: Default::default(),
85 })
86 }
87
88 fn get_token(&self) -> Token {
89 let val = match self
92 .token_freed
93 .lock()
94 .expect("Poisoned token channel")
95 .try_recv()
96 {
97 Err(_) => self.token_counter.fetch_add(1, Ordering::AcqRel),
98 Ok(val) => val,
99 };
100
101 Token {
102 val,
103 drop_box: self.token_drop_box.clone(),
104 bundle: self.clone(),
105 }
106 }
107
108 pub fn iter(&self) -> IoResult<()> {
111 let events = &mut *self.events.lock().expect("Poisoned PollBundle");
113 self.poll.poll(events, self.timeout)?;
115
116 let wakers = self.wakers.lock().expect("Poisoned PollBundle");
119 for event in events.iter() {
120 if let Some(waker) = wakers.get(&event.token().0) {
123 match waker.lock() {
124 Ok(w) => {
126 w.as_ref().map(Waker::wake_by_ref);
127 }
128 Err(_) => {
129 error!("Ignoring panicked waker. This should be handled by the future.")
131 }
132 }
133 } else {
134 error!("Registered handler does not have a corresponding Waker. This is a bug.")
135 }
136 }
137 Ok(())
138 }
139
140 pub fn register<E: ?Sized>(
142 &self,
143 handle: &E,
144 interest: Ready,
145 opts: PollOpt,
146 waker_ptr: Arc<Mutex<Option<Waker>>>,
147 ) -> IoResult<Token>
148 where
149 E: Evented,
150 {
151 let token = self.get_token();
152 self.poll
153 .register(handle, token.get_mio(), interest, opts)?;
154 self.wakers
155 .lock()
156 .expect("Poisoned PollBundle")
157 .insert(token.val, waker_ptr);
158 Ok(token)
159 }
160}
161
162#[cfg(test)]
163mod tests {}