async_std/net/driver/
mod.rs1use std::fmt;
2use std::sync::{Arc, Mutex};
3
4use mio::{self, Evented};
5use once_cell::sync::Lazy;
6use slab::Slab;
7
8use crate::io;
9use crate::task::{Context, Poll, Waker};
10use crate::utils::abort_on_panic;
11
12#[derive(Debug)]
14struct Entry {
15 token: mio::Token,
17
18 readers: Mutex<Readers>,
20
21 writers: Mutex<Writers>,
23}
24
25#[derive(Debug)]
27struct Readers {
28 ready: bool,
31 wakers: Vec<Waker>
33}
34
35#[derive(Debug)]
37struct Writers {
38 ready: bool,
41 wakers: Vec<Waker>
43}
44
45struct Reactor {
47 poller: mio::Poll,
49
50 entries: Mutex<Slab<Arc<Entry>>>,
52
53 notify_reg: (mio::Registration, mio::SetReadiness),
55
56 notify_token: mio::Token,
58}
59
60impl Reactor {
61 fn new() -> io::Result<Reactor> {
63 let poller = mio::Poll::new()?;
64 let notify_reg = mio::Registration::new2();
65
66 let mut reactor = Reactor {
67 poller,
68 entries: Mutex::new(Slab::new()),
69 notify_reg,
70 notify_token: mio::Token(0),
71 };
72
73 let entry = reactor.register(&reactor.notify_reg.0)?;
75 reactor.notify_token = entry.token;
76
77 Ok(reactor)
78 }
79
80 fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
82 let mut entries = self.entries.lock().unwrap();
83
84 let vacant = entries.vacant_entry();
86 let token = mio::Token(vacant.key());
87
88 let entry = Arc::new(Entry {
90 token,
91 readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
92 writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
93 });
94 vacant.insert(entry.clone());
95
96 let interest = mio::Ready::all();
98 let opts = mio::PollOpt::edge();
99 self.poller.register(source, token, interest, opts)?;
100
101 Ok(entry)
102 }
103
104 fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
106 self.poller.deregister(source)?;
108
109 self.entries.lock().unwrap().remove(entry.token.0);
111
112 Ok(())
113 }
114
115 }
122
123static REACTOR: Lazy<Reactor> = Lazy::new(|| {
125 std::thread::Builder::new()
128 .name("async-std/net".to_string())
129 .spawn(move || {
130 abort_on_panic(|| {
133 main_loop().expect("async networking thread has panicked");
134 })
135 })
136 .expect("cannot start a thread driving blocking tasks");
137
138 Reactor::new().expect("cannot initialize reactor")
139});
140
141fn main_loop() -> io::Result<()> {
143 let reactor = &REACTOR;
144 let mut events = mio::Events::with_capacity(1000);
145
146 loop {
147 reactor.poller.poll(&mut events, None)?;
149
150 let entries = reactor.entries.lock().unwrap();
152
153 for event in events.iter() {
154 let token = event.token();
155
156 if token == reactor.notify_token {
157 reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
159 } else {
160 if let Some(entry) = entries.get(token.0) {
162 let readiness = event.readiness();
164
165 if !(readiness & reader_interests()).is_empty() {
167 let mut readers = entry.readers.lock().unwrap();
168 readers.ready = true;
169 for w in readers.wakers.drain(..) {
170 w.wake();
171 }
172 }
173
174 if !(readiness & writer_interests()).is_empty() {
176 let mut writers = entry.writers.lock().unwrap();
177 writers.ready = true;
178 for w in writers.wakers.drain(..) {
179 w.wake();
180 }
181 }
182 }
183 }
184 }
185 }
186}
187
188pub struct Watcher<T: Evented> {
193 entry: Arc<Entry>,
195
196 source: Option<T>,
198}
199
200impl<T: Evented> Watcher<T> {
201 pub fn new(source: T) -> Watcher<T> {
206 Watcher {
207 entry: REACTOR
208 .register(&source)
209 .expect("cannot register an I/O event source"),
210 source: Some(source),
211 }
212 }
213
214 pub fn get_ref(&self) -> &T {
216 self.source.as_ref().unwrap()
217 }
218
219 pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
224 where
225 F: FnMut(&'a T) -> io::Result<R>,
226 {
227 match f(self.source.as_ref().unwrap()) {
229 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
230 res => return Poll::Ready(res),
231 }
232
233 let mut readers = self.entry.readers.lock().unwrap();
235
236 match f(self.source.as_ref().unwrap()) {
238 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
239 res => return Poll::Ready(res),
240 }
241
242 if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
244 readers.wakers.push(cx.waker().clone());
245 }
246
247 readers.ready = false;
248
249 Poll::Pending
250 }
251
252 pub fn poll_write_with<'a, F, R>(
257 &'a self,
258 cx: &mut Context<'_>,
259 mut f: F,
260 ) -> Poll<io::Result<R>>
261 where
262 F: FnMut(&'a T) -> io::Result<R>,
263 {
264 match f(self.source.as_ref().unwrap()) {
266 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
267 res => return Poll::Ready(res),
268 }
269
270 let mut writers = self.entry.writers.lock().unwrap();
272
273 match f(self.source.as_ref().unwrap()) {
275 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
276 res => return Poll::Ready(res),
277 }
278
279 if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
281 writers.wakers.push(cx.waker().clone());
282 }
283
284 writers.ready = false;
285
286 Poll::Pending
287 }
288
289 #[allow(dead_code)]
295 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
296 let mut readers = self.entry.readers.lock().unwrap();
298 if readers.ready {
299 return Poll::Ready(())
300 }
301 if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
303 readers.wakers.push(cx.waker().clone());
304 }
305 Poll::Pending
306 }
307
308 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
314 let mut writers = self.entry.writers.lock().unwrap();
316 if writers.ready {
317 return Poll::Ready(())
318 }
319 if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
321 writers.wakers.push(cx.waker().clone());
322 }
323 Poll::Pending
324 }
325
326 #[allow(dead_code)]
330 pub fn into_inner(mut self) -> T {
331 let source = self.source.take().unwrap();
332 REACTOR
333 .deregister(&source, &self.entry)
334 .expect("cannot deregister I/O event source");
335 source
336 }
337}
338
339impl<T: Evented> Drop for Watcher<T> {
340 fn drop(&mut self) {
341 if let Some(ref source) = self.source {
342 REACTOR
343 .deregister(source, &self.entry)
344 .expect("cannot deregister I/O event source");
345 }
346 }
347}
348
349impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
350 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351 f.debug_struct("Watcher")
352 .field("entry", &self.entry)
353 .field("source", &self.source)
354 .finish()
355 }
356}
357
358#[inline]
360fn reader_interests() -> mio::Ready {
361 mio::Ready::all() - mio::Ready::writable()
362}
363
364#[inline]
366fn writer_interests() -> mio::Ready {
367 mio::Ready::writable() | hup()
368}
369
370#[inline]
372fn hup() -> mio::Ready {
373 #[cfg(unix)]
374 let ready = mio::unix::UnixReady::hup().into();
375
376 #[cfg(not(unix))]
377 let ready = mio::Ready::empty();
378
379 ready
380}