futmio/
lib.rs

1use std::{
2    collections::HashMap,
3    io::Result as IoResult,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        mpsc::{Receiver, Sender},
7        Arc, Mutex,
8    },
9    task::Waker,
10    time::Duration,
11};
12
13use futures::io::Error as FutIoError;
14use log::error;
15use mio::{Evented, Events, PollOpt, Ready, Token as MioToken};
16
17type FutIoResult<T> = Result<T, FutIoError>;
18
19pub mod tcp;
20
21#[derive(Clone)]
22pub struct PollBundle {
23    events: Arc<Mutex<Events>>,
24    poll: Arc<mio::Poll>,
25    timeout: Option<Duration>,
26    // We use an atomic counter to ensure a unique backing value per Token. usize should be large
27    // enough.
28    token_counter: Arc<AtomicUsize>,
29    token_freed: Arc<Mutex<Receiver<usize>>>,
30    token_drop_box: Sender<usize>,
31    wakers: Arc<Mutex<HashMap<usize, Arc<Mutex<Option<Waker>>>>>>,
32}
33
34/// Token returned by the PollBundle on registration. Keep it with the registered handle, drop it
35/// *after* the handle. This ensures that, internally, the corresponding [`mio::Token`] will be
36/// freed when the handle is dropped.
37///
38/// # Contract: You must keep this Token alive just as long as the [`mio::Evented`] handle.
39pub struct Token {
40    val: usize,
41    drop_box: Sender<usize>,
42    bundle: PollBundle,
43}
44
45impl Token {
46    pub fn get_mio(&self) -> MioToken {
47        MioToken(self.val)
48    }
49}
50
51impl PartialEq<MioToken> for Token {
52    fn eq(&self, other: &MioToken) -> bool {
53        self.val == other.0
54    }
55}
56
57impl PartialEq<Token> for MioToken {
58    fn eq(&self, other: &Token) -> bool {
59        self.0 == other.val
60    }
61}
62
63impl Drop for Token {
64    fn drop(&mut self) {
65        // We don't care if it fails. We just need to try if it is possible.
66        let _ = self.bundle.wakers.lock().map(|mut g| g.remove(&self.val));
67        let _ = self.drop_box.send(self.val);
68    }
69}
70
71impl PollBundle {
72    pub fn new(
73        timeout: impl Into<Option<Duration>>,
74        event_buf_size: usize,
75    ) -> IoResult<PollBundle> {
76        let (tx, rx) = std::sync::mpsc::channel();
77        Ok(PollBundle {
78            events: Arc::new(Mutex::new(Events::with_capacity(event_buf_size))),
79            poll: Arc::new(mio::Poll::new()?),
80            timeout: timeout.into(),
81            token_counter: Arc::new(AtomicUsize::new(0)),
82            token_freed: Arc::new(Mutex::new(rx)),
83            token_drop_box: tx,
84            wakers: Default::default(),
85        })
86    }
87
88    fn get_token(&self) -> Token {
89        // First we check the inbox for a freed token. If we have one, reuse it. However, most
90        // likely we won't, so we catch the error and just get a fresh value.
91        let val = match self
92            .token_freed
93            .lock()
94            .expect("Poisoned token channel")
95            .try_recv()
96        {
97            Err(_) => self.token_counter.fetch_add(1, Ordering::AcqRel),
98            Ok(val) => val,
99        };
100
101        Token {
102            val,
103            drop_box: self.token_drop_box.clone(),
104            bundle: self.clone(),
105        }
106    }
107
108    /// If threading in a threadpool or other kind of scheduler, this is the function that should be
109    /// called in a loop. For error details, see [`mio::Poll::poll`].
110    pub fn iter(&self) -> IoResult<()> {
111        // Lock on events, for mutable, synchronous execution.
112        let events = &mut *self.events.lock().expect("Poisoned PollBundle");
113        // Do the poll
114        self.poll.poll(events, self.timeout)?;
115
116        // We lock on this *now* because we want minimal contention with Futures updating their
117        // wakers.
118        let wakers = self.wakers.lock().expect("Poisoned PollBundle");
119        for event in events.iter() {
120            // We register the waker at the same time as the token, so this is basically guaranteed
121            // to have a value.
122            if let Some(waker) = wakers.get(&event.token().0) {
123                match waker.lock() {
124                    // Wakey-wakey!!
125                    Ok(w) => {
126                        w.as_ref().map(Waker::wake_by_ref);
127                    }
128                    Err(_) => {
129                        // If the Future is poisoned, we don't want to touch that.
130                        error!("Ignoring panicked waker. This should be handled by the future.")
131                    }
132                }
133            } else {
134                error!("Registered handler does not have a corresponding Waker. This is a bug.")
135            }
136        }
137        Ok(())
138    }
139
140    /// For internal registration of
141    pub fn register<E: ?Sized>(
142        &self,
143        handle: &E,
144        interest: Ready,
145        opts: PollOpt,
146        waker_ptr: Arc<Mutex<Option<Waker>>>,
147    ) -> IoResult<Token>
148    where
149        E: Evented,
150    {
151        let token = self.get_token();
152        self.poll
153            .register(handle, token.get_mio(), interest, opts)?;
154        self.wakers
155            .lock()
156            .expect("Poisoned PollBundle")
157            .insert(token.val, waker_ptr);
158        Ok(token)
159    }
160}
161
162#[cfg(test)]
163mod tests {}