async_std/net/driver/
mod.rs

1use std::fmt;
2use std::sync::{Arc, Mutex};
3
4use mio::{self, Evented};
5use once_cell::sync::Lazy;
6use slab::Slab;
7
8use crate::io;
9use crate::task::{Context, Poll, Waker};
10use crate::utils::abort_on_panic;
11
12/// Data associated with a registered I/O handle.
13#[derive(Debug)]
14struct Entry {
15    /// A unique identifier.
16    token: mio::Token,
17
18    /// Tasks that are blocked on reading from this I/O handle.
19    readers: Mutex<Readers>,
20
21    /// Thasks that are blocked on writing to this I/O handle.
22    writers: Mutex<Writers>,
23}
24
25/// The set of `Waker`s interested in read readiness.
26#[derive(Debug)]
27struct Readers {
28    /// Flag indicating read readiness.
29    /// (cf. `Watcher::poll_read_ready`)
30    ready: bool,
31    /// The `Waker`s blocked on reading.
32    wakers: Vec<Waker>
33}
34
35/// The set of `Waker`s interested in write readiness.
36#[derive(Debug)]
37struct Writers {
38    /// Flag indicating write readiness.
39    /// (cf. `Watcher::poll_write_ready`)
40    ready: bool,
41    /// The `Waker`s blocked on writing.
42    wakers: Vec<Waker>
43}
44
45/// The state of a networking driver.
46struct Reactor {
47    /// A mio instance that polls for new events.
48    poller: mio::Poll,
49
50    /// A collection of registered I/O handles.
51    entries: Mutex<Slab<Arc<Entry>>>,
52
53    /// Dummy I/O handle that is only used to wake up the polling thread.
54    notify_reg: (mio::Registration, mio::SetReadiness),
55
56    /// An identifier for the notification handle.
57    notify_token: mio::Token,
58}
59
60impl Reactor {
61    /// Creates a new reactor for polling I/O events.
62    fn new() -> io::Result<Reactor> {
63        let poller = mio::Poll::new()?;
64        let notify_reg = mio::Registration::new2();
65
66        let mut reactor = Reactor {
67            poller,
68            entries: Mutex::new(Slab::new()),
69            notify_reg,
70            notify_token: mio::Token(0),
71        };
72
73        // Register a dummy I/O handle for waking up the polling thread.
74        let entry = reactor.register(&reactor.notify_reg.0)?;
75        reactor.notify_token = entry.token;
76
77        Ok(reactor)
78    }
79
80    /// Registers an I/O event source and returns its associated entry.
81    fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
82        let mut entries = self.entries.lock().unwrap();
83
84        // Reserve a vacant spot in the slab and use its key as the token value.
85        let vacant = entries.vacant_entry();
86        let token = mio::Token(vacant.key());
87
88        // Allocate an entry and insert it into the slab.
89        let entry = Arc::new(Entry {
90            token,
91            readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
92            writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
93        });
94        vacant.insert(entry.clone());
95
96        // Register the I/O event source in the poller.
97        let interest = mio::Ready::all();
98        let opts = mio::PollOpt::edge();
99        self.poller.register(source, token, interest, opts)?;
100
101        Ok(entry)
102    }
103
104    /// Deregisters an I/O event source associated with an entry.
105    fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
106        // Deregister the I/O object from the mio instance.
107        self.poller.deregister(source)?;
108
109        // Remove the entry associated with the I/O object.
110        self.entries.lock().unwrap().remove(entry.token.0);
111
112        Ok(())
113    }
114
115    // fn notify(&self) {
116    //     self.notify_reg
117    //         .1
118    //         .set_readiness(mio::Ready::readable())
119    //         .unwrap();
120    // }
121}
122
123/// The state of the global networking driver.
124static REACTOR: Lazy<Reactor> = Lazy::new(|| {
125    // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
126    // handles.
127    std::thread::Builder::new()
128        .name("async-std/net".to_string())
129        .spawn(move || {
130            // If the driver thread panics, there's not much we can do. It is not a
131            // recoverable error and there is no place to propagate it into so we just abort.
132            abort_on_panic(|| {
133                main_loop().expect("async networking thread has panicked");
134            })
135        })
136        .expect("cannot start a thread driving blocking tasks");
137
138    Reactor::new().expect("cannot initialize reactor")
139});
140
141/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
142fn main_loop() -> io::Result<()> {
143    let reactor = &REACTOR;
144    let mut events = mio::Events::with_capacity(1000);
145
146    loop {
147        // Block on the poller until at least one new event comes in.
148        reactor.poller.poll(&mut events, None)?;
149
150        // Lock the entire entry table while we're processing new events.
151        let entries = reactor.entries.lock().unwrap();
152
153        for event in events.iter() {
154            let token = event.token();
155
156            if token == reactor.notify_token {
157                // If this is the notification token, we just need the notification state.
158                reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
159            } else {
160                // Otherwise, look for the entry associated with this token.
161                if let Some(entry) = entries.get(token.0) {
162                    // Set the readiness flags from this I/O event.
163                    let readiness = event.readiness();
164
165                    // Wake up reader tasks blocked on this I/O handle.
166                    if !(readiness & reader_interests()).is_empty() {
167                        let mut readers = entry.readers.lock().unwrap();
168                        readers.ready = true;
169                        for w in readers.wakers.drain(..) {
170                            w.wake();
171                        }
172                    }
173
174                    // Wake up writer tasks blocked on this I/O handle.
175                    if !(readiness & writer_interests()).is_empty() {
176                        let mut writers = entry.writers.lock().unwrap();
177                        writers.ready = true;
178                        for w in writers.wakers.drain(..) {
179                            w.wake();
180                        }
181                    }
182                }
183            }
184        }
185    }
186}
187
188/// An I/O handle powered by the networking driver.
189///
190/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
191/// implementing traits `AsyncRead` and `AsyncWrite`.
192pub struct Watcher<T: Evented> {
193    /// Data associated with the I/O handle.
194    entry: Arc<Entry>,
195
196    /// The I/O event source.
197    source: Option<T>,
198}
199
200impl<T: Evented> Watcher<T> {
201    /// Creates a new I/O handle.
202    ///
203    /// The provided I/O event source will be kept registered inside the reactor's poller for the
204    /// lifetime of the returned I/O handle.
205    pub fn new(source: T) -> Watcher<T> {
206        Watcher {
207            entry: REACTOR
208                .register(&source)
209                .expect("cannot register an I/O event source"),
210            source: Some(source),
211        }
212    }
213
214    /// Returns a reference to the inner I/O event source.
215    pub fn get_ref(&self) -> &T {
216        self.source.as_ref().unwrap()
217    }
218
219    /// Polls the inner I/O source for a non-blocking read operation.
220    ///
221    /// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
222    /// will be registered for wakeup when the I/O source becomes readable.
223    pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
224    where
225        F: FnMut(&'a T) -> io::Result<R>,
226    {
227        // If the operation isn't blocked, return its result.
228        match f(self.source.as_ref().unwrap()) {
229            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
230            res => return Poll::Ready(res),
231        }
232
233        // Lock the waker list.
234        let mut readers = self.entry.readers.lock().unwrap();
235
236        // Try running the operation again.
237        match f(self.source.as_ref().unwrap()) {
238            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
239            res => return Poll::Ready(res),
240        }
241
242        // Register the task if it isn't registered already.
243        if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
244            readers.wakers.push(cx.waker().clone());
245        }
246
247        readers.ready = false;
248
249        Poll::Pending
250    }
251
252    /// Polls the inner I/O source for a non-blocking write operation.
253    ///
254    /// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
255    /// will be registered for wakeup when the I/O source becomes writable.
256    pub fn poll_write_with<'a, F, R>(
257        &'a self,
258        cx: &mut Context<'_>,
259        mut f: F,
260    ) -> Poll<io::Result<R>>
261    where
262        F: FnMut(&'a T) -> io::Result<R>,
263    {
264        // If the operation isn't blocked, return its result.
265        match f(self.source.as_ref().unwrap()) {
266            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
267            res => return Poll::Ready(res),
268        }
269
270        // Lock the waker list.
271        let mut writers = self.entry.writers.lock().unwrap();
272
273        // Try running the operation again.
274        match f(self.source.as_ref().unwrap()) {
275            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
276            res => return Poll::Ready(res),
277        }
278
279        // Register the task if it isn't registered already.
280        if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
281            writers.wakers.push(cx.waker().clone());
282        }
283
284        writers.ready = false;
285
286        Poll::Pending
287    }
288
289    /// Polls the inner I/O source until a non-blocking read can be performed.
290    ///
291    /// If non-blocking reads are currently not possible, the `Waker`
292    /// will be saved and notified when it can read non-blocking
293    /// again.
294    #[allow(dead_code)]
295    pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
296        // Lock the waker list.
297        let mut readers = self.entry.readers.lock().unwrap();
298        if readers.ready {
299            return Poll::Ready(())
300        }
301        // Register the task if it isn't registered already.
302        if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
303            readers.wakers.push(cx.waker().clone());
304        }
305        Poll::Pending
306    }
307
308    /// Polls the inner I/O source until a non-blocking write can be performed.
309    ///
310    /// If non-blocking writes are currently not possible, the `Waker`
311    /// will be saved and notified when it can write non-blocking
312    /// again.
313    pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
314        // Lock the waker list.
315        let mut writers = self.entry.writers.lock().unwrap();
316        if writers.ready {
317            return Poll::Ready(())
318        }
319        // Register the task if it isn't registered already.
320        if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
321            writers.wakers.push(cx.waker().clone());
322        }
323        Poll::Pending
324    }
325
326    /// Deregisters and returns the inner I/O source.
327    ///
328    /// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
329    #[allow(dead_code)]
330    pub fn into_inner(mut self) -> T {
331        let source = self.source.take().unwrap();
332        REACTOR
333            .deregister(&source, &self.entry)
334            .expect("cannot deregister I/O event source");
335        source
336    }
337}
338
339impl<T: Evented> Drop for Watcher<T> {
340    fn drop(&mut self) {
341        if let Some(ref source) = self.source {
342            REACTOR
343                .deregister(source, &self.entry)
344                .expect("cannot deregister I/O event source");
345        }
346    }
347}
348
349impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
350    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351        f.debug_struct("Watcher")
352            .field("entry", &self.entry)
353            .field("source", &self.source)
354            .finish()
355    }
356}
357
358/// Returns a mask containing flags that interest tasks reading from I/O handles.
359#[inline]
360fn reader_interests() -> mio::Ready {
361    mio::Ready::all() - mio::Ready::writable()
362}
363
364/// Returns a mask containing flags that interest tasks writing into I/O handles.
365#[inline]
366fn writer_interests() -> mio::Ready {
367    mio::Ready::writable() | hup()
368}
369
370/// Returns a flag containing the hangup status.
371#[inline]
372fn hup() -> mio::Ready {
373    #[cfg(unix)]
374    let ready = mio::unix::UnixReady::hup().into();
375
376    #[cfg(not(unix))]
377    let ready = mio::Ready::empty();
378
379    ready
380}