use std::{fmt, mem, thread};
use std::sync::{self, atomic, Arc};
use futures::{self, Future};
use futures::sync::oneshot;
use transports::Result;
use transports::tokio_core::reactor;
use {Error, ErrorKind, RequestId};
pub struct EventLoopHandle {
thread: Option<thread::JoinHandle<()>>,
remote: reactor::Remote,
done: Arc<atomic::AtomicBool>,
}
impl EventLoopHandle {
pub fn spawn<T, F>(func: F) -> Result<(Self, T)>
where
F: FnOnce(&reactor::Handle) -> Result<T>,
F: Send + 'static,
T: Send + 'static,
{
let done = Arc::new(atomic::AtomicBool::new(false));
let (tx, rx) = sync::mpsc::sync_channel(1);
let done2 = done.clone();
let eloop = thread::spawn(move || {
let run = move || {
let event_loop = reactor::Core::new()?;
let http = func(&event_loop.handle())?;
Ok((http, event_loop))
};
let send = move |result| {
tx.send(result).expect("Receiving end is always waiting.");
};
let res = run();
match res {
Err(e) => send(Err(e)),
Ok((http, mut event_loop)) => {
send(Ok((http, event_loop.remote())));
while !done2.load(atomic::Ordering::Relaxed) {
event_loop.turn(None);
}
}
}
});
rx.recv()
.expect("Thread is always spawned.")
.map(|(http, remote)| {
(
EventLoopHandle {
thread: Some(eloop),
remote,
done,
},
http,
)
})
}
pub fn remote(&self) -> &reactor::Remote {
&self.remote
}
}
impl Drop for EventLoopHandle {
fn drop(&mut self) {
self.done.store(true, atomic::Ordering::Relaxed);
self.thread
.take()
.expect("We never touch thread except for drop; drop happens only once; qed")
.join()
.expect("Thread should shut down cleanly.");
}
}
type PendingResult<O> = oneshot::Receiver<Result<O>>;
enum RequestState<O> {
Sending(Option<Result<()>>, PendingResult<O>),
WaitingForResponse(PendingResult<O>),
Done,
}
pub struct Response<T, O> {
id: RequestId,
state: RequestState<O>,
extract: T,
}
impl<T, O> Response<T, O> {
pub fn new(id: RequestId, result: Result<()>, rx: PendingResult<O>, extract: T) -> Self {
Response {
id,
extract,
state: RequestState::Sending(Some(result), rx),
}
}
}
impl<T, O, Out> Future for Response<T, O>
where
T: Fn(O) -> Result<Out>,
Out: fmt::Debug,
{
type Item = Out;
type Error = Error;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
loop {
let extract = &self.extract;
match self.state {
RequestState::Sending(ref mut result, _) => {
trace!("[{}] Request pending.", self.id);
if let Some(Err(e)) = result.take() {
return Err(e);
}
}
RequestState::WaitingForResponse(ref mut rx) => {
trace!("[{}] Checking response.", self.id);
let result = try_ready!(
rx.poll()
.map_err(|_| Error::from(ErrorKind::Io(::std::io::ErrorKind::TimedOut.into())))
);
trace!("[{}] Extracting result.", self.id);
return result.and_then(|x| extract(x)).map(futures::Async::Ready);
}
RequestState::Done => {
return Err(ErrorKind::Unreachable.into());
}
}
let state = mem::replace(&mut self.state, RequestState::Done);
self.state = if let RequestState::Sending(_, rx) = state {
RequestState::WaitingForResponse(rx)
} else {
state
}
}
}
}