use {JsonValue, Request, Method, JsonObject, Error, channel};
use std::fmt;
use hyper::server as http;
use hyper::Error as HyperError;
use hyper::header::{ContentLength, ContentType};
use hyper::mime;
use hyper::Chunk as HyperChunk;
use futures::{Poll, Stream, Async, IntoFuture};
use futures::future::{ok, FutureResult, BoxFuture, Future};
use futures::stream::BoxStream;
use futures::sync::mpsc;
use Sender;
type ChunkReceiver = BoxStream<HyperChunk, ()>;
#[derive(Debug)]
pub struct Reply {
data: ReplyData,
req: Request,
}
enum ReplyData {
Value(JsonObject),
Stream(ChunkReceiver),
}
impl fmt::Debug for ReplyData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
&ReplyData::Value(ref val) => write!(f, "ReplyData::Value({:?})", val),
&ReplyData::Stream(_) => write!(f, "ReplyData::Stream(<stream>)"),
}
}
}
pub fn make_reply(req: Request, data: JsonObject) -> Reply {
Reply {
req: req,
data: ReplyData::Value(data),
}
}
pub fn make_streamed_reply(req: Request) -> (Sender, Reply) {
let (tx, rx) = mpsc::unbounded();
let rx = rx
.map(|val: (String, JsonObject)| -> HyperChunk {
format!("event:{}\ndata:{}\n\n", val.0, JsonValue::Object(val.1)).into()
})
.boxed();
let reply = Reply {
req: req,
data: ReplyData::Stream(rx)
};
let sender = channel::new_sender(tx);
(sender, reply)
}
impl Reply {
pub fn data(&self) -> Option<&JsonObject> {
match self.data {
ReplyData::Value(ref dat) => Some(dat),
_ => None,
}
}
pub fn data_mut(&mut self) -> Option<&mut JsonObject> {
match self.data {
ReplyData::Value(ref mut dat) => Some(dat),
_ => None,
}
}
pub fn to_http(self) -> http::Response<Body> {
let resp = http::Response::new();
match self.data {
ReplyData::Value(val) => {
let resp_str = JsonValue::Object(val).to_string();
resp
.with_header(ContentLength(resp_str.len() as u64))
.with_header(ContentType(mime::APPLICATION_JSON))
.with_body(Body::Once(Some(resp_str.into())))
},
ReplyData::Stream(stream) => {
resp
.with_header(ContentType(mime::TEXT_EVENT_STREAM))
.with_body(Body::Stream(stream))
},
}
}
pub fn method(&self) -> Method {
self.req.method()
}
pub fn resource(&self) -> &str {
&self.req.resource()
}
pub fn id(&self) -> &Option<String> {
&self.req.id()
}
pub fn params(&self) -> &JsonObject {
&self.req.params()
}
pub fn param(&self, key: &str) -> &JsonValue {
self.req.param(key)
}
pub fn boxed(self) -> BoxFuture<Reply, Error> {
ok(self).boxed()
}
pub fn request_data(&self) -> &JsonObject {
self.req.data()
}
}
impl IntoFuture for Reply {
type Item = Reply;
type Error = Error;
type Future = FutureResult<Reply, Error>;
fn into_future(self) -> Self::Future {
ok(self)
}
}
pub enum Body {
Once(Option<HyperChunk>),
Stream(ChunkReceiver),
}
impl Stream for Body {
type Item = HyperChunk;
type Error = HyperError;
fn poll(&mut self) -> Poll<Option<HyperChunk>, HyperError> {
match self {
&mut Body::Once(ref mut opt) => Ok(Async::Ready(opt.take())),
&mut Body::Stream(ref mut stream) => {
match stream.poll() {
Ok(u) => Ok(u),
Err(()) => Ok(Async::Ready(None)), }
}
}
}
}