extern crate tokio_core;
use std::thread;
use std::sync::{mpsc, Arc};
use futures::{self, Future};
pub use self::tokio_core::reactor::Remote;
use {MetaIoHandler, Metadata};
pub struct RpcEventLoop {
remote: Remote,
handle: RpcEventLoopHandle,
}
impl RpcEventLoop {
pub fn spawn() -> Self {
let (stop, stopped) = futures::oneshot();
let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
let mut el = tokio_core::reactor::Core::new().expect("Creating an event loop should not fail.");
tx.send(el.remote()).expect("Rx is blocking upper thread.");
let _ = el.run(futures::empty().select(stopped));
});
let remote = rx.recv().expect("tx is transfered to a newly spawned thread.");
RpcEventLoop {
remote: remote,
handle: RpcEventLoopHandle {
close: Some(stop),
handle: Some(handle),
},
}
}
pub fn handler<M: Metadata>(&self, handler: Arc<MetaIoHandler<M>>) -> RpcHandler<M> {
RpcHandler::new(handler, self.remote.clone())
}
pub fn remote(&self) -> Remote {
self.remote.clone()
}
}
pub struct RpcEventLoopHandle {
close: Option<futures::Complete<()>>,
handle: Option<thread::JoinHandle<()>>
}
impl From<RpcEventLoop> for RpcEventLoopHandle {
fn from(el: RpcEventLoop) -> Self {
el.handle
}
}
impl Drop for RpcEventLoopHandle {
fn drop(&mut self) {
self.close.take().map(|v| v.complete(()));
}
}
impl RpcEventLoopHandle {
pub fn wait(mut self) -> thread::Result<()> {
self.handle.take().unwrap().join()
}
pub fn close(mut self) {
self.close.take().unwrap().complete(())
}
}
#[derive(Clone)]
pub struct RpcHandler<M: Metadata = ()> {
handler: Arc<MetaIoHandler<M>>,
remote: Remote,
}
impl<M: Metadata> RpcHandler<M> {
pub fn new(handler: Arc<MetaIoHandler<M>>, remote: Remote) -> Self {
RpcHandler {
handler: handler,
remote: remote,
}
}
pub fn handle_request<F>(&self, request: &str, metadata: M, on_response: F) where
F: Fn(Option<String>) + Send + 'static
{
let future = self.handler.handle_request(request, metadata);
self.remote.spawn(|_| future.map(on_response))
}
pub fn handle_request_sync(&self, request: &str, metadata: M) -> Option<String> {
let (tx, rx) = mpsc::channel();
self.handle_request(request, metadata, move |res| {
tx.send(res).unwrap();
});
rx.recv().unwrap()
}
}