use log::debug;
use std::pin::Pin;
use futures::future::{Future, FutureObj};
use futures::task::ArcWake;
use futures::task::{Spawn, SpawnError, Waker};
use futures::task::Poll;
use libc::{fd_set, select, timeval, FD_ISSET, FD_SET, FD_ZERO};
use std::os::unix::io::RawFd;
use std::task::Context;
use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use std::sync::Arc;
mod async_tcp_listener;
mod async_tcp_stream;
pub use crate::async_tcp_listener::AsyncTcpListener;
pub use crate::async_tcp_stream::AsyncTcpStream;
thread_local! {
static REACTOR: Rc<EventLoop> = Rc::new(EventLoop::new());
}
type TaskId = usize;
pub fn run<F: Future<Output = ()> + Send + 'static>(f: F) {
REACTOR.with(|reactor| reactor.run(f))
}
pub fn spawn<F: Future<Output = ()> + Send + 'static>(f: F) {
REACTOR.with(|reactor| reactor.do_spawn(f))
}
#[derive(Debug)]
struct Token(usize);
impl ArcWake for Token {
fn wake_by_ref(arc_self: &Arc<Self>) {
debug!("waking {:?}", arc_self);
let Token(idx) = **arc_self;
REACTOR.with(|reactor| {
let wakeup = Wakeup {
index: idx,
waker: futures::task::waker(arc_self.clone()),
};
reactor.wake(wakeup);
});
}
}
struct Wakeup {
index: usize,
waker: Waker,
}
struct Task {
future: FutureObj<'static, ()>,
}
impl Task {
fn poll(&mut self, waker: Waker) -> Poll<()> {
let future = Pin::new(&mut self.future);
let mut ctx = Context::from_waker(&waker);
match future.poll(&mut ctx) {
Poll::Ready(_) => {
debug!("future done");
Poll::Ready(())
}
Poll::Pending => {
debug!("future not yet ready");
Poll::Pending
}
}
}
}
struct EventLoop {
read: RefCell<BTreeMap<RawFd, Waker>>,
write: RefCell<BTreeMap<RawFd, Waker>>,
counter: Cell<usize>,
wait_queue: RefCell<BTreeMap<TaskId, Task>>,
run_queue: RefCell<VecDeque<Wakeup>>,
}
impl EventLoop {
fn new() -> Self {
EventLoop {
read: RefCell::new(BTreeMap::new()),
write: RefCell::new(BTreeMap::new()),
counter: Cell::new(0),
wait_queue: RefCell::new(BTreeMap::new()),
run_queue: RefCell::new(VecDeque::new()),
}
}
fn add_read_interest(&self, fd: RawFd, waker: Waker) {
debug!("adding read interest for {}", fd);
if !self.read.borrow().contains_key(&fd) {
self.read.borrow_mut().insert(fd, waker);
}
}
fn remove_read_interest(&self, fd: RawFd) {
debug!("removing read interest for {}", fd);
self.read.borrow_mut().remove(&fd);
}
fn remove_write_interest(&self, fd: RawFd) {
debug!("removing write interest for {}", fd);
self.write.borrow_mut().remove(&fd);
}
fn add_write_interest(&self, fd: RawFd, waker: Waker) {
debug!("adding write interest for {}", fd);
if !self.write.borrow().contains_key(&fd) {
self.write.borrow_mut().insert(fd, waker);
}
}
fn wake(&self, wakeup: Wakeup) {
self.run_queue.borrow_mut().push_back(wakeup);
}
fn next_task(&self) -> (TaskId, Waker) {
let counter = self.counter.get();
let w = Arc::new(Token(counter));
self.counter.set(counter + 1);
(counter, futures::task::waker(w))
}
fn do_spawn<F: Future<Output = ()> + Send + 'static>(&self, f: F) {
let (id, waker) = self.next_task();
let f = Box::new(f);
let mut task = Task {
future: FutureObj::new(f),
};
if let Poll::Ready(_) = task.poll(waker) {
return;
}
self.wait_queue.borrow_mut().insert(id, task);
}
pub fn run<F: Future<Output = ()> + Send + 'static>(&self, f: F) {
self.do_spawn(f);
loop {
debug!("select loop start");
let mut tv: timeval = timeval {
tv_sec: 1,
tv_usec: 0,
};
let mut read_fds: fd_set = unsafe { std::mem::zeroed() };
let mut write_fds: fd_set = unsafe { std::mem::zeroed() };
unsafe { FD_ZERO(&mut read_fds) };
unsafe { FD_ZERO(&mut write_fds) };
let mut nfds = 0;
for fd in self.read.borrow().keys() {
debug!("added fd {} for read", fd);
unsafe { FD_SET(*fd, &mut read_fds as *mut fd_set) };
nfds = std::cmp::max(nfds, fd + 1);
}
for fd in self.write.borrow().keys() {
debug!("added fd {} for write", fd);
unsafe { FD_SET(*fd, &mut write_fds as *mut fd_set) };
nfds = std::cmp::max(nfds, fd + 1);
}
let rv = unsafe {
select(
nfds,
&mut read_fds,
&mut write_fds,
std::ptr::null_mut(),
&mut tv,
)
};
if rv == -1 {
panic!("select()");
} else if rv == 0 {
debug!("timeout");
} else {
debug!("data available on {} fds", rv);
}
for (fd, waker) in self.read.borrow().iter() {
let is_set = unsafe { FD_ISSET(*fd, &mut read_fds as *mut fd_set) };
debug!("fd#{} set (read)", fd);
if is_set {
waker.wake_by_ref();
}
}
for (fd, waker) in self.write.borrow().iter() {
let is_set = unsafe { FD_ISSET(*fd, &mut write_fds as *mut fd_set) };
debug!("fd#{} set (write)", fd);
if is_set {
waker.wake_by_ref();
}
}
loop {
let w = self.run_queue.borrow_mut().pop_front();
match w {
Some(w) => {
debug!("polling task#{}", w.index);
let task = self.wait_queue.borrow_mut().remove(&w.index);
if let Some(mut task) = task {
if let Poll::Pending = task.poll(w.waker) {
self.wait_queue.borrow_mut().insert(w.index, task);
}
}
}
None => break,
}
}
if self.wait_queue.borrow().is_empty() {
return;
}
}
}
}
pub struct Handle(Rc<EventLoop>);
impl Spawn for Handle {
fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
debug!("spawning from handle");
self.0.do_spawn(f);
Ok(())
}
}
impl Spawn for EventLoop {
fn spawn_obj(&self, f: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.do_spawn(f);
Ok(())
}
}