#![doc(html_root_url = "https://docs.rs/tokio-reactor/0.1.0")]
#![deny(missing_docs, warnings, missing_debug_implementations)]
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
extern crate mio;
extern crate slab;
extern crate tokio_executor;
extern crate tokio_io;
pub(crate) mod background;
mod atomic_task;
mod poll_evented;
mod registration;
pub use self::background::Background;
pub use self::registration::Registration;
pub use self::poll_evented::PollEvented;
use atomic_task::AtomicTask;
use tokio_executor::Enter;
use tokio_executor::park::{Park, Unpark};
use std::{fmt, usize};
use std::io::{self, ErrorKind};
use std::mem;
use std::cell::RefCell;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Weak, RwLock};
use std::time::{Duration, Instant};
use log::Level;
use mio::event::Evented;
use slab::Slab;
use futures::task::Task;
pub struct Reactor {
events: mio::Events,
inner: Arc<Inner>,
_wakeup_registration: mio::Registration,
}
#[derive(Clone)]
pub struct Handle {
inner: Weak<Inner>,
}
#[derive(Debug)]
pub struct Turn {
_priv: (),
}
#[derive(Clone, Debug)]
pub struct SetFallbackError(());
#[deprecated(since = "0.1.2", note = "use SetFallbackError instead")]
#[doc(hidden)]
pub type SetDefaultError = SetFallbackError;
struct Inner {
io: mio::Poll,
io_dispatch: RwLock<Slab<ScheduledIo>>,
wakeup: mio::SetReadiness
}
struct ScheduledIo {
readiness: AtomicUsize,
reader: AtomicTask,
writer: AtomicTask,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(crate) enum Direction {
Read,
Write,
}
static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT;
thread_local!(static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None));
const TOKEN_WAKEUP: mio::Token = mio::Token(0);
const TOKEN_START: usize = 1;
const MAX_SOURCES: usize = usize::MAX >> 4;
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
_assert::<Handle>();
}
pub fn with_default<F, R>(handle: &Handle, enter: &mut Enter, f: F) -> R
where F: FnOnce(&mut Enter) -> R
{
struct Reset;
impl Drop for Reset {
fn drop(&mut self) {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
*current = None;
});
}
}
let _r = Reset;
CURRENT_REACTOR.with(|current| {
{
let mut current = current.borrow_mut();
assert!(current.is_none(), "default Tokio reactor already set \
for execution context");
*current = Some(handle.clone());
}
f(enter)
})
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
io.register(&wakeup_pair.0,
TOKEN_WAKEUP,
mio::Ready::readable(),
mio::PollOpt::level())?;
Ok(Reactor {
events: mio::Events::with_capacity(1024),
_wakeup_registration: wakeup_pair.0,
inner: Arc::new(Inner {
io: io,
io_dispatch: RwLock::new(Slab::with_capacity(1)),
wakeup: wakeup_pair.1,
}),
})
}
pub fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
}
}
pub fn set_fallback(&self) -> Result<(), SetFallbackError> {
set_fallback(self.handle())
}
pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
self.poll(max_wait)?;
Ok(Turn { _priv: () })
}
pub fn is_idle(&self) -> bool {
self.inner.io_dispatch
.read().unwrap()
.is_empty()
}
pub fn background(self) -> io::Result<Background> {
Background::new(self)
}
fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
match self.inner.io.poll(&mut self.events, max_wait) {
Ok(_) => {}
Err(ref e) if e.kind() == ErrorKind::Interrupted => return Ok(()),
Err(e) => return Err(e),
}
let start = if log_enabled!(Level::Debug) {
Some(Instant::now())
} else {
None
};
let mut events = 0;
for event in self.events.iter() {
events += 1;
let token = event.token();
trace!("event {:?} {:?}", event.readiness(), event.token());
if token == TOKEN_WAKEUP {
self.inner.wakeup.set_readiness(mio::Ready::empty()).unwrap();
} else {
self.dispatch(token, event.readiness());
}
}
if let Some(start) = start {
let dur = start.elapsed();
debug!("loop process - {} events, {}.{:03}s",
events,
dur.as_secs(),
dur.subsec_nanos() / 1_000_000);
}
Ok(())
}
fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
let token = usize::from(token) - TOKEN_START;
let io_dispatch = self.inner.io_dispatch.read().unwrap();
if let Some(io) = io_dispatch.get(token) {
io.readiness.fetch_or(ready.as_usize(), Relaxed);
if ready.is_writable() || platform::is_hup(&ready) {
io.writer.notify();
}
if !(ready & (!mio::Ready::writable())).is_empty() {
io.reader.notify();
}
}
}
}
impl Park for Reactor {
type Unpark = Handle;
type Error = io::Error;
fn unpark(&self) -> Self::Unpark {
self.handle()
}
fn park(&mut self) -> io::Result<()> {
self.turn(None)?;
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
self.turn(Some(duration))?;
Ok(())
}
}
impl fmt::Debug for Reactor {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Reactor")
}
}
impl Handle {
pub fn current() -> Handle {
Handle::try_current()
.unwrap_or(Handle { inner: Weak::new() })
}
pub(crate) fn try_current() -> io::Result<Handle> {
CURRENT_REACTOR.with(|current| {
match *current.borrow() {
Some(ref handle) => Ok(handle.clone()),
None => Handle::fallback(),
}
})
}
fn fallback() -> io::Result<Handle> {
let mut fallback = HANDLE_FALLBACK.load(SeqCst);
if fallback == 0 {
let reactor = match Reactor::new() {
Ok(reactor) => reactor,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other,
"failed to create reactor")),
};
if set_fallback(reactor.handle().clone()).is_ok() {
let ret = reactor.handle().clone();
match reactor.background() {
Ok(bg) => bg.forget(),
Err(_) => {}
}
return Ok(ret);
}
fallback = HANDLE_FALLBACK.load(SeqCst);
}
assert!(fallback != 0);
let ret = unsafe {
let handle = Handle::from_usize(fallback);
let ret = handle.clone();
drop(handle.into_usize());
ret
};
Ok(ret)
}
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
}
}
fn into_usize(self) -> usize {
unsafe {
mem::transmute::<Weak<Inner>, usize>(self.inner)
}
}
unsafe fn from_usize(val: usize) -> Handle {
let inner = mem::transmute::<usize, Weak<Inner>>(val);;
Handle { inner }
}
fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
}
impl Unpark for Handle {
fn unpark(&self) {
self.wakeup();
}
}
impl Default for Handle {
fn default() -> Handle {
Handle::current()
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Handle")
}
}
fn set_fallback(handle: Handle) -> Result<(), SetFallbackError> {
unsafe {
let val = handle.into_usize();
match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
Ok(_) => Ok(()),
Err(_) => {
drop(Handle::from_usize(val));
Err(SetFallbackError(()))
}
}
}
}
impl Inner {
fn add_source(&self, source: &Evented)
-> io::Result<usize>
{
let mut io_dispatch = self.io_dispatch.write().unwrap();
if io_dispatch.len() == MAX_SOURCES {
return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \
registered I/O resources"));
}
let key = io_dispatch.insert(ScheduledIo {
readiness: AtomicUsize::new(0),
reader: AtomicTask::new(),
writer: AtomicTask::new(),
});
try!(self.io.register(source,
mio::Token(TOKEN_START + key),
mio::Ready::all(),
mio::PollOpt::edge()));
Ok(key)
}
fn deregister_source(&self, source: &Evented) -> io::Result<()> {
self.io.deregister(source)
}
fn drop_source(&self, token: usize) {
debug!("dropping I/O source: {}", token);
self.io_dispatch.write().unwrap().remove(token);
}
fn register(&self, token: usize, dir: Direction, t: Task) {
debug!("scheduling direction for: {}", token);
let io_dispatch = self.io_dispatch.read().unwrap();
let sched = io_dispatch.get(token).unwrap();
let (task, ready) = match dir {
Direction::Read => (&sched.reader, !mio::Ready::writable()),
Direction::Write => (&sched.writer, mio::Ready::writable()),
};
task.register_task(t);
if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
task.notify();
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
let io = self.io_dispatch.read().unwrap();
for (_, io) in io.iter() {
io.writer.notify();
io.reader.notify();
}
}
}
impl Direction {
fn mask(&self) -> mio::Ready {
match *self {
Direction::Read => {
mio::Ready::all() - mio::Ready::writable()
}
Direction::Write => mio::Ready::writable() | platform::hup(),
}
}
}
#[cfg(all(unix, not(target_os = "fuchsia")))]
mod platform {
use mio::Ready;
use mio::unix::UnixReady;
pub fn hup() -> Ready {
UnixReady::hup().into()
}
pub fn is_hup(ready: &Ready) -> bool {
UnixReady::from(*ready).is_hup()
}
}
#[cfg(any(windows, target_os = "fuchsia"))]
mod platform {
use mio::Ready;
pub fn hup() -> Ready {
Ready::empty()
}
pub fn is_hup(_: &Ready) -> bool {
false
}
}