extern crate libc;
extern crate mio;
extern crate slab;
use std::cell::RefCell;
use std::io;
use std::sync::Arc;
use std::time::Duration;
use curl::{self, Error};
use curl::easy::Easy;
use curl::multi::{Multi, EasyHandle, Socket, SocketEvents, Events};
use futures::{self, Future, Poll, Oneshot, Complete, Async};
use futures::task::{self, EventSet, UnparkEvent};
use futures::sync::mpsc::{unbounded, UnboundedSender, UnboundedReceiver};
use futures::stream::{Stream, Fuse};
use tokio_core::reactor::{Timeout, Handle, PollEvented};
use self::mio::unix::EventedFd;
use self::slab::Slab;
use stack::Stack;
#[derive(Clone)]
pub struct Session {
// TODO: in next major version remove this `RefCell`.
tx: RefCell<UnboundedSender<Message>>,
}
enum Message {
Execute(Easy, Complete<io::Result<(Easy, Option<Error>)>>),
}
struct Data {
multi: Multi,
state: RefCell<State>,
handle: Handle,
rx: Fuse<UnboundedReceiver<Message>>,
stack: Arc<Stack<usize>>,
}
struct State {
// Active HTTP requests, storing each `EasyHandle` as well as the `Complete`
// half of the HTTP future.
handles: Slab<HandleEntry>,
// Sockets we've been requested to track by libcurl. Stores the I/O object
// we associate with the event loop as well as other state about what
// libcurl needs from the socket.
sockets: Slab<SocketEntry>,
// Last timeout requested by libcurl that we schedule.
timeout: TimeoutState,
}
struct HandleEntry {
complete: Complete<io::Result<(Easy, Option<Error>)>>,
handle: EasyHandle,
}
struct SocketEntry {
want: Option<SocketEvents>,
changed: bool,
stream: PollEvented<MioSocket>,
}
enum TimeoutState {
Waiting(Timeout),
Ready,
None,
}
scoped_thread_local!(static DATA: Data);
pub struct Perform {
inner: Oneshot<io::Result<(Easy, Option<Error>)>>,
}
impl Session {
pub fn new(handle: Handle) -> Session {
let mut m = Multi::new();
let (tx, rx) = unbounded();
m.timer_function(move |dur| {
DATA.with(|d| d.schedule_timeout(dur))
}).unwrap();
m.socket_function(move |socket, events, token| {
DATA.with(|d| d.schedule_socket(socket, events, token))
}).unwrap();
handle.clone().spawn(Data {
rx: rx.fuse(),
multi: m,
handle: handle,
stack: Arc::new(Stack::new()),
state: RefCell::new(State {
handles: Slab::with_capacity(128),
sockets: Slab::with_capacity(128),
timeout: TimeoutState::None,
}),
}.map_err(|e| {
panic!("error while processing http requests: {}", e)
}));
Session { tx: RefCell::new(tx) }
}
pub fn perform(&self, handle: Easy) -> Perform {
let (tx, rx) = futures::oneshot();
self.tx
.borrow_mut()
.send(Message::Execute(handle, tx))
.expect("driver task has gone away");
Perform { inner: rx }
}
}
impl Future for Perform {
type Item = (Easy, Option<Error>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
match self.inner.poll().expect("complete canceled") {
Async::Ready(Ok(res)) => Ok(res.into()),
Async::Ready(Err(e)) => Err(e),
Async::NotReady => Ok(Async::NotReady),
}
}
}
impl Future for Data {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
debug!("-------------------------- driver poll start");
// First up, process any incoming messages which represent new HTTP
// requests to execute.
try!(self.check_messages());
DATA.set(self, || {
// Process events for each handle which have happened since we were
// last here.
//
// Note that this implementation currently uses `with_unpark_event`
// so we **do not poll all handles** but rather just those listed in
// our `stack` where events were pushed onto. The
// `with_unpark_event` method ensures that any notifications sent to
// a task will also inform us why they're being notified.
for idx in self.stack.drain() {
let event = UnparkEvent::new(self.stack.clone(), idx);
task::with_unpark_event(event, || {
self.check(idx);
});
}
// Process a timeout, if one ocurred.
self.check_timeout();
// After all that's done, we check to see if any transfers have
// completed.
self.check_completions();
});
// If we're not receiving any messages and there are no active HTTP
// requests then we're done, otherwise we should keep going.
if self.rx.is_done() && self.state.borrow().handles.is_empty() {
assert!(self.state.borrow().sockets.len() == 0);
Ok(().into())
} else {
Ok(Async::NotReady)
}
}
}
impl Data {
/// Function called whenever a new timeout is requested from libcurl.
///
/// An argument of `None` indicates the current timeout can be cleared, and
/// otherwise this indicates a new timeout to set for informing libcurl that
/// a timeout has happened.
fn schedule_timeout(&self, dur: Option<Duration>) -> bool {
// First up, always clear the existing timeout
let mut state = self.state.borrow_mut();
state.timeout = TimeoutState::None;
// If a timeout was requested, then we configure one. Note that we know
// for sure that we're executing on the event loop because `Data` is
// owned by the event loop thread. As a result the returned future from
// `LoopHandle::timeout` should be immediately resolve-able, so we do so
// here to pull out the actual timeout future.
if let Some(dur) = dur {
debug!("scheduling a new timeout in {:?}", dur);
if dur == Duration::new(0, 0) {
state.timeout = TimeoutState::Ready;
} else {
let mut timeout = Timeout::new(dur, &self.handle).unwrap();
drop(state);
let res = timeout.poll().unwrap();
state = self.state.borrow_mut();
match res {
Async::NotReady => {
state.timeout = TimeoutState::Waiting(timeout);
}
Async::Ready(()) => state.timeout = TimeoutState::Ready,
}
}
}
true
}
/// Function called whenever libcurl requests events to be listened for on a
/// socket.
///
/// This function is informed of the raw socket file descriptor, `socket`,
/// the events that we're interested in, `events`, as well as a user-defined
/// token, `token`. It's up to us to ensure that we're waiting appropriately
/// for these events to happen, and then we'll later inform libcurl when
/// they actually happen.
fn schedule_socket(&self,
socket: Socket,
events: SocketEvents,
token: usize) {
let mut state = self.state.borrow_mut();
// First up, if libcurl wants us to forget about this socket, we do so!
if events.remove() {
assert!(token > 0);
debug!("remove socket: {} / {}", socket, token - 1);
state.sockets.remove(token - 1).unwrap()
.stream.deregister(&self.handle)
.expect("failed to deregister");
return
}
// If this is the first time we've seen the socket then we register a
// new source with the event loop. Currently that's done through
// `PollEvented` which handles registration and deregistration of
// interest on the event loop itself.
//
// Like above with timeouts, the future returned from `PollEvented`
// should be immediately resolve-able because we're guaranteed to be on
// the event loop.
let index = if token == 0 {
let source = MioSocket { inner: socket };
let stream = PollEvented::new(source, &self.handle).unwrap();
if !state.sockets.has_available() {
let len = state.sockets.len();
state.sockets.reserve_exact(len);
}
let entry = state.sockets.vacant_entry().unwrap();
let index = entry.index();
entry.insert(SocketEntry {
want: None,
changed: false,
stream: stream,
});
self.multi.assign(socket, index + 1).expect("failed to assign");
debug!("schedule new socket {} / {}", socket, index);
index
} else {
debug!("activity old socket {} / {}", socket, token - 1);
token - 1
};
let event = UnparkEvent::new(self.stack.clone(), 2 * index + 1);
let state = &mut state.sockets[index];
state.want = Some(events);
state.changed = true;
// Update the needs of our socket registered with the event loop as
// whether we want read/write may have changed.
//
// TODO: this pushes a duplicate unpark event if we're already inside of
// another unpark event.
task::with_unpark_event(event, || {
state.update_needs();
});
}
fn check_messages(&mut self) -> io::Result<()> {
loop {
let msg = match self.rx.poll().expect("cannot fail") {
Async::Ready(Some(msg)) => msg,
Async::Ready(None) => break,
Async::NotReady => break,
};
let (easy, tx) = match msg {
Message::Execute(easy, tx) => (easy, tx),
};
// Add the easy handle to the multi handle, beginning the HTTP
// request. This may entail libcurl requesting a new timeout or new
// sockets to be tracked as part of the call to `add`.
debug!("executing a new request");
let mut handle = match DATA.set(self, || self.multi.add(easy)) {
Ok(handle) => handle,
Err(e) => {
tx.complete(Err(e.into()));
continue
}
};
// Add the handle to the `handles` slab, acquiring its token we'll
// use.
let mut state = self.state.borrow_mut();
if !state.handles.has_available() {
let len = state.handles.len();
state.handles.reserve_exact(len);
}
let entry = state.handles.vacant_entry().unwrap();
let index = entry.index();
handle.set_token(index).unwrap();
entry.insert(HandleEntry {
complete: tx,
handle: handle,
});
// Enqueue a request to poll the state of the `complete` half so we
// can get a notification when it goes away.
self.stack.push(2 * index);
}
Ok(())
}
fn check(&self, idx: usize) {
if idx % 2 == 0 {
self.check_cancel(idx / 2)
} else {
self.check_socket(idx / 2)
}
}
fn check_cancel(&self, idx: usize) {
debug!("\tevent cancel {}", idx);
// See if this request has been canceled
let mut state = self.state.borrow_mut();
if state.handles.get_mut(idx).is_none() {
return
}
if let Ok(Async::Ready(())) = state.handles[idx].complete.poll_cancel() {
let entry = state.handles.remove(idx).unwrap();
drop(state);
let handle = entry.handle;
drop(self.multi.remove(handle));
}
}
fn check_socket(&self, idx: usize) {
debug!("\tevent socket {}", idx);
let mut state = self.state.borrow_mut();
let mut events = Events::new();
let mut set = false;
// If this socket has gone away ignore this notification
if state.sockets.get(idx).is_none() {
debug!("socket is gone");
return
}
if state.sockets[idx].stream.poll_read().is_ready() {
debug!("\treadable");
events.input(true);
set = true;
}
if state.sockets[idx].stream.poll_write().is_ready() {
debug!("\twritable");
events.output(true);
set = true;
}
if !set {
return
}
state.sockets[idx].changed = false;
let socket = state.sockets[idx].stream.get_ref().inner;
drop(state);
debug!("\tactivity on {}", socket);
self.multi.action(socket, &events).expect("action error");
state = self.state.borrow_mut();
let state = match state.sockets.get_mut(idx) {
Some(state) => state,
None => return,
};
// If the state didn't change in what it needed, check again to see
// what activity is on the socket and test whether we need to either
// block for a read/write or unpark ourselves as we're still able to
// make progress.
if !state.changed {
state.update_needs();
}
}
fn check_timeout(&self) {
// Sometimes telling libcurl that we timed out causes it to request
// again that we should time out, so execute this in a loop.
loop {
match self.state.borrow_mut().timeout {
TimeoutState::Waiting(ref mut t) => {
match t.poll() {
Ok(Async::Ready(())) => {}
_ => {
debug!("timeout not ready");
return
}
}
}
TimeoutState::Ready => {}
TimeoutState::None => return
}
debug!("timeout fired");
self.state.borrow_mut().timeout = TimeoutState::None;
self.multi.timeout().expect("timeout error");
}
}
fn check_completions(&self) {
self.multi.messages(|m| {
let mut state = self.state.borrow_mut();
let transfer_err = m.result().unwrap();
let idx = m.token().unwrap();
let entry = state.handles.remove(idx).unwrap();
debug!("request is now finished: {}", idx);
drop(state);
assert!(m.is_for(&entry.handle));
// If `remove_err` fails then that's super fatal, so that'll end
// up in the `Error` of the `Perform` future. If, however, the
// transfer just failed, then that's communicated through
// `transfer_err`, so we just put that next to the handle if we
// get it out successfully.
let remove_err = self.multi.remove(entry.handle);
let res = remove_err.map(|e| (e, transfer_err.err()))
.map_err(|e| e.into());
entry.complete.complete(res);
});
}
}
impl SocketEntry {
/// Depending on `self.want`, the events that this socket is interested in,
/// update the registration with the event loop to schedule a wakeup for
/// ourselves at an appropriate time.
///
/// Currently libcurl expects us to notify it with "level" semantics. That
/// is, so long as the socket is readable/writable we need to be calling
/// `Multi::action`. Currently tokio-core, however, only notifies us with
/// "edge" semantics, meaning that we only get a notification when a socket
/// *changes* state to readable/writable.
///
/// The purpose of this function is to emulate level semantics with edge
/// semantics that we have. This function will call `poll` in a nonblocking
/// fashion to learn whether a socket is readable/writable under the hood.
/// This way we know that if libcurl wants a socket to be readable and the
/// socket is actually still readable, we'll schedule a notification for us
/// to call `Multi::action` "soon".
///
/// Unfortunately this is probably not the most efficient as we'll be
/// calling `poll` a lot, but hopefully it's not too onerous to check such
/// information and in the grand scheme of things hopefully doesn't slow
/// down the http transfer too much.
fn update_needs(&mut self) {
let want = match self.want {
Some(ref want) => want,
None => return,
};
let mut fd = libc::pollfd {
fd: self.stream.get_ref().inner,
events: 0,
revents: 0,
};
if want.input() {
fd.events |= libc::POLLIN;
}
if want.output() {
fd.events |= libc::POLLOUT;
}
unsafe {
libc::poll(&mut fd, 1, 0);
}
// In these two blocks below, we test what libcurl expects (`want`)
// with what the socket actually looks like (`fd.revents`).
//
// If, for example, we want input (readable) and the socket is not
// readable then we inform the event loop of such. If we want input and
// we're still readable, then we use a "yield" operation to arrange for
// ourselves to get polled in the near future with `park().unpark()`.
// This should allow lots of transfers to make progress without
// starving anything unnecessarily.
if want.input() {
if (fd.revents & libc::POLLIN) == 0 {
self.stream.need_read();
} else {
task::park().unpark();
}
}
if want.output() {
if (fd.revents & libc::POLLOUT) == 0 {
self.stream.need_write();
} else {
// TODO: don't need to `unpark` here a second time if we
// already did so above.
task::park().unpark();
}
}
}
}
struct MioSocket {
inner: curl::multi::Socket,
}
impl mio::Evented for MioSocket {
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt) -> io::Result<()> {
EventedFd(&self.inner).register(poll, token, interest, opts)
}
fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt) -> io::Result<()> {
EventedFd(&self.inner).reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.inner).deregister(poll)
}
}
impl EventSet for Stack<usize> {
fn insert(&self, id: usize) {
self.push(id);
}
}