1pub(crate) mod background;
4mod poll_evented;
5pub(crate) mod registration;
6mod sharded_rwlock;
7pub mod sys;
8
9pub use self::poll_evented::PollEvented;
10
11use futures_util::task::AtomicWaker;
12use log::{debug, log_enabled, trace, Level};
13use slab::Slab;
14use std::cell::RefCell;
15use std::io;
16use std::mem;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering::{Relaxed, SeqCst};
19use std::sync::{Arc, Weak};
20use std::task::Context;
21use std::time::{Duration, Instant};
22use std::{fmt, usize};
23
24use self::background::Background;
25use self::sharded_rwlock::RwLock;
26use self::sys::event::Evented;
27
28struct Reactor {
35 events: sys::event::Events,
37
38 inner: Arc<Inner>,
40
41 _wakeup_registration: sys::Registration,
42}
43
44#[derive(Clone)]
53struct Handle {
54 inner: Option<HandlePriv>,
55}
56
57#[derive(Clone)]
59struct HandlePriv {
60 inner: Weak<Inner>,
61}
62
63#[derive(Debug)]
68struct Turn {
69 _priv: (),
70}
71
72#[test]
73fn test_handle_size() {
74 use std::mem;
75 assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
76}
77
78struct Inner {
79 io: sys::Poll,
81
82 next_aba_guard: AtomicUsize,
84
85 io_dispatch: RwLock<Slab<ScheduledIo>>,
87
88 wakeup: sys::SetReadiness,
90}
91
92struct ScheduledIo {
93 aba_guard: usize,
94 readiness: AtomicUsize,
95 reader: AtomicWaker,
96 writer: AtomicWaker,
97}
98
99#[derive(Debug, Eq, PartialEq, Clone, Copy)]
100pub(crate) enum Direction {
101 Read,
102 Write,
103}
104
105static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
107
108thread_local!(static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None));
110
111const TOKEN_SHIFT: usize = 22;
112
113const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
115const TOKEN_WAKEUP: sys::Token = sys::Token(MAX_SOURCES);
116
117fn _assert_kinds() {
118 fn _assert<T: Send + Sync>() {}
119
120 _assert::<Handle>();
121}
122
123impl Reactor {
126 fn new() -> io::Result<Reactor> {
129 let io = sys::Poll::new()?;
130 let wakeup_pair = sys::Registration::new2();
131
132 io.register(
133 &wakeup_pair.0,
134 TOKEN_WAKEUP,
135 sys::event::Ready::readable(),
136 sys::event::PollOpt::level(),
137 )?;
138
139 Ok(Reactor {
140 events: sys::event::Events::with_capacity(1024),
141 _wakeup_registration: wakeup_pair.0,
142 inner: Arc::new(Inner {
143 io: io,
144 next_aba_guard: AtomicUsize::new(0),
145 io_dispatch: RwLock::new(Slab::with_capacity(1)),
146 wakeup: wakeup_pair.1,
147 }),
148 })
149 }
150
151 fn handle(&self) -> Handle {
158 Handle {
159 inner: Some(HandlePriv {
160 inner: Arc::downgrade(&self.inner),
161 }),
162 }
163 }
164
165 fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
191 self.poll(max_wait)?;
192 Ok(Turn { _priv: () })
193 }
194
195 fn is_idle(&self) -> bool {
200 self.inner.io_dispatch.read().is_empty()
201 }
202
203 fn background(self) -> io::Result<Background> {
210 Background::new(self)
211 }
212
213 fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
214 match self.inner.io.poll(&mut self.events, max_wait) {
217 Ok(_) => {}
218 Err(e) => return Err(e),
219 }
220
221 let start = if log_enabled!(Level::Debug) {
222 Some(Instant::now())
223 } else {
224 None
225 };
226
227 let mut events = 0;
229 for event in self.events.iter() {
230 events += 1;
231 let token = event.token();
232 trace!("event {:?} {:?}", event.readiness(), event.token());
233
234 if token == TOKEN_WAKEUP {
235 self.inner
236 .wakeup
237 .set_readiness(sys::event::Ready::empty())
238 .unwrap();
239 } else {
240 self.dispatch(token, event.readiness());
241 }
242 }
243
244 if let Some(start) = start {
245 let dur = start.elapsed();
246 trace!(
247 "loop process - {} events, {}.{:03}s",
248 events,
249 dur.as_secs(),
250 dur.subsec_nanos() / 1_000_000
251 );
252 }
253
254 Ok(())
255 }
256
257 fn dispatch(&self, token: sys::Token, ready: sys::event::Ready) {
258 let aba_guard = token.0 & !MAX_SOURCES;
259 let token = token.0 & MAX_SOURCES;
260
261 let mut rd = None;
262 let mut wr = None;
263
264 {
267 let io_dispatch = self.inner.io_dispatch.read();
268
269 let io = match io_dispatch.get(token) {
270 Some(io) => io,
271 None => return,
272 };
273
274 if aba_guard != io.aba_guard {
275 return;
276 }
277
278 io.readiness.fetch_or(ready.as_usize(), Relaxed);
279
280 if ready.is_writable() || platform::is_hup(&ready) {
281 wr = io.writer.take();
282 }
283
284 if !(ready & (!sys::event::Ready::writable())).is_empty() {
285 rd = io.reader.take();
286 }
287 }
288
289 if let Some(task) = rd {
290 task.wake();
291 }
292
293 if let Some(task) = wr {
294 task.wake();
295 }
296 }
297}
298
299impl fmt::Debug for Reactor {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301 write!(f, "Reactor")
302 }
303}
304
305impl Handle {
308 fn as_priv(&self) -> Option<&HandlePriv> {
309 self.inner.as_ref()
310 }
311
312 fn into_priv(self) -> Option<HandlePriv> {
313 self.inner
314 }
315
316 fn wakeup(&self) {
317 if let Some(handle) = self.as_priv() {
318 handle.wakeup();
319 }
320 }
321}
322
323impl Default for Handle {
324 fn default() -> Handle {
326 Handle { inner: None }
327 }
328}
329
330impl fmt::Debug for Handle {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 write!(f, "Handle")
333 }
334}
335
336fn set_fallback(handle: HandlePriv) -> Result<(), ()> {
337 unsafe {
338 let val = handle.into_usize();
339 match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
340 Ok(_) => Ok(()),
341 Err(_) => {
342 drop(HandlePriv::from_usize(val));
343 Err(())
344 }
345 }
346 }
347}
348
349impl HandlePriv {
352 pub(crate) fn try_current() -> io::Result<HandlePriv> {
356 CURRENT_REACTOR.with(|current| match *current.borrow() {
357 Some(ref handle) => Ok(handle.clone()),
358 None => HandlePriv::fallback(),
359 })
360 }
361
362 fn fallback() -> io::Result<HandlePriv> {
364 let mut fallback = HANDLE_FALLBACK.load(SeqCst);
365
366 if fallback == 0 {
372 let reactor = match Reactor::new() {
373 Ok(reactor) => reactor,
374 Err(_) => {
375 return Err(io::Error::new(
376 io::ErrorKind::Other,
377 "failed to create reactor",
378 ))
379 }
380 };
381
382 if set_fallback(reactor.handle().into_priv().unwrap()).is_ok() {
389 let ret = reactor.handle().into_priv().unwrap();
390
391 match reactor.background() {
392 Ok(bg) => bg.forget(),
393 Err(_) => {}
396 }
397
398 return Ok(ret);
399 }
400
401 fallback = HANDLE_FALLBACK.load(SeqCst);
402 }
403
404 assert!(fallback != 0);
408
409 let ret = unsafe {
410 let handle = HandlePriv::from_usize(fallback);
411 let ret = handle.clone();
412
413 drop(handle.into_usize());
416
417 ret
418 };
419
420 Ok(ret)
421 }
422
423 fn wakeup(&self) {
433 if let Some(inner) = self.inner() {
434 inner
435 .wakeup
436 .set_readiness(sys::event::Ready::readable())
437 .unwrap();
438 }
439 }
440
441 fn into_usize(self) -> usize {
442 unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
443 }
444
445 unsafe fn from_usize(val: usize) -> HandlePriv {
446 let inner = mem::transmute::<usize, Weak<Inner>>(val);
447 HandlePriv { inner }
448 }
449
450 fn inner(&self) -> Option<Arc<Inner>> {
451 self.inner.upgrade()
452 }
453}
454
455impl fmt::Debug for HandlePriv {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 write!(f, "HandlePriv")
458 }
459}
460
461impl Inner {
464 fn add_source(&self, source: &dyn Evented) -> io::Result<usize> {
468 let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed);
470
471 let mut io_dispatch = self.io_dispatch.write();
472
473 if io_dispatch.len() == MAX_SOURCES {
474 return Err(io::Error::new(
475 io::ErrorKind::Other,
476 "reactor at max \
477 registered I/O resources",
478 ));
479 }
480
481 let key = io_dispatch.insert(ScheduledIo {
483 aba_guard,
484 readiness: AtomicUsize::new(0),
485 reader: AtomicWaker::new(),
486 writer: AtomicWaker::new(),
487 });
488
489 self.io.register(
490 source,
491 sys::Token(aba_guard | key),
492 sys::event::Ready::all(),
493 sys::event::PollOpt::edge(),
494 )?;
495
496 Ok(key)
497 }
498
499 fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
501 self.io.deregister(source)
502 }
503
504 fn drop_source(&self, token: usize) {
505 debug!("dropping I/O source: {}", token);
506 self.io_dispatch.write().remove(token);
507 }
508
509 fn register(&self, cx: &mut Context<'_>, token: usize, dir: Direction) {
511 debug!("scheduling direction for: {}", token);
512 let io_dispatch = self.io_dispatch.read();
513 let sched = io_dispatch.get(token).unwrap();
514
515 let (atomic_waker, ready) = match dir {
516 Direction::Read => (&sched.reader, !sys::event::Ready::writable()),
517 Direction::Write => (&sched.writer, sys::event::Ready::writable()),
518 };
519
520 atomic_waker.register(&cx.waker());
521
522 if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
523 atomic_waker.wake();
524 }
525 }
526}
527
528impl Drop for Inner {
529 fn drop(&mut self) {
530 let io = self.io_dispatch.read();
534 for (_, io) in io.iter() {
535 io.writer.wake();
536 io.reader.wake();
537 }
538 }
539}
540
541impl Direction {
542 fn mask(&self) -> sys::event::Ready {
543 match *self {
544 Direction::Read => {
545 sys::event::Ready::all() - sys::event::Ready::writable()
547 }
548 Direction::Write => sys::event::Ready::writable() | platform::hup(),
549 }
550 }
551}
552
553pub(crate) mod platform {
554 use super::sys::event::Ready;
555 use super::sys::UnixReady;
556
557 pub fn hup() -> Ready {
558 UnixReady::hup().into()
559 }
560
561 pub fn is_hup(ready: &Ready) -> bool {
562 UnixReady::from(*ready).is_hup()
563 }
564}