use std::io;
use futures::sync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Sink, Stream};
use serde_json::Value;
use tokio::io::{AsyncRead, AsyncWrite};
use super::client::Client;
use super::message::Response as ResponseMessage;
use super::message::{Message, Notification, Request};
use super::transport::Transport;
pub trait Service: Send {
type T: Into<Value> + Send + 'static;
type E: Into<Value> + Send + 'static;
type RequestFuture: IntoStaticFuture<Item = Self::T, Error = Self::E>;
type NotificationFuture: IntoStaticFuture<Item = (), Error = ()>;
fn handle_request(&mut self, method: &str, params: Value) -> Self::RequestFuture;
fn handle_notification(&mut self, method: &str, params: Value) -> Self::NotificationFuture;
}
pub struct Server<S: Service + Send> {
service: S,
pending_responses: mpsc::UnboundedReceiver<(u64, Result<S::T, S::E>)>,
response_sender: mpsc::UnboundedSender<(u64, Result<S::T, S::E>)>,
}
unsafe impl<T: Service> Send for Server<T> {}
impl<S: Service> Server<S> {
pub fn new(service: S) -> Self {
let (tx, rx) = mpsc::unbounded();
Server {
service,
pending_responses: rx,
response_sender: tx,
}
}
pub fn send_responses<T: AsyncRead + AsyncWrite>(
&mut self,
sink: &mut Transport<T>,
) -> Poll<(), io::Error> {
trace!("Server: flushing responses");
while let Ok(poll) = self.pending_responses.poll() {
if let Async::Ready(Some((id, result))) = poll {
let msg = Message::Response(ResponseMessage {
id,
result: result.map(Into::into).map_err(Into::into),
});
sink.start_send(msg).unwrap();
} else {
if let Async::Ready(None) = poll {
panic!("we store the sender, it can't be dropped");
}
return sink.poll_complete();
}
}
panic!("an UnboundedReceiver should never give an error");
}
pub fn process_request(&mut self, request: Request) {
let Request { method, params, id } = request;
let response_sender = self.response_sender.clone();
let future = self
.service
.handle_request(method.as_str(), params)
.into_static_future()
.then(move |response| {
response_sender
.unbounded_send((id, response))
.map_err(|_| ())
});
let _ = tokio::spawn(future);
}
pub fn process_notification(&mut self, notification: Notification) {
let Notification { method, params } = notification;
let future = self.service.handle_notification(method.as_str(), params);
let _ = tokio::spawn(future.into_static_future());
}
}
pub trait ServiceBuilder {
type Service: Service;
fn build(self, client: Client) -> Self::Service;
}
pub trait IntoStaticFuture {
type Future: Future<Item = Self::Item, Error = Self::Error> + 'static + Send;
type Item;
type Error;
fn into_static_future(self) -> Self::Future;
}
impl<F: IntoFuture> IntoStaticFuture for F
where
<F as IntoFuture>::Future: 'static + Send,
{
type Future = <F as IntoFuture>::Future;
type Item = <F as IntoFuture>::Item;
type Error = <F as IntoFuture>::Error;
fn into_static_future(self) -> Self::Future {
self.into_future()
}
}