1use {JsonValue, Request, Method, JsonObject, Error, channel};
2use std::fmt;
3use hyper::server as http;
4use hyper::Error as HyperError;
5use hyper::header::{ContentLength, ContentType};
6use hyper::mime;
7use hyper::Chunk as HyperChunk;
8use futures::{Poll, Stream, Async, IntoFuture};
9use futures::future::{ok, FutureResult, BoxFuture, Future};
10use futures::stream::BoxStream;
11use futures::sync::mpsc;
12use Sender;
13
14type ChunkReceiver = BoxStream<HyperChunk, ()>;
15
16#[derive(Debug)]
34pub struct Reply {
35 data: ReplyData,
36 req: Request,
37}
38
39enum ReplyData {
40 Value(JsonObject),
41 Stream(ChunkReceiver),
42}
43
44impl fmt::Debug for ReplyData {
45 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46 match self {
47 &ReplyData::Value(ref val) => write!(f, "ReplyData::Value({:?})", val),
48 &ReplyData::Stream(_) => write!(f, "ReplyData::Stream(<stream>)"),
49 }
50 }
51}
52
53pub fn make_reply(req: Request, data: JsonObject) -> Reply {
55 Reply {
56 req: req,
57 data: ReplyData::Value(data),
58 }
59}
60
61pub fn make_streamed_reply(req: Request) -> (Sender, Reply) {
63 let (tx, rx) = mpsc::unbounded();
64 let rx = rx
65 .map(|val: (String, JsonObject)| -> HyperChunk {
66 format!("event:{}\ndata:{}\n\n", val.0, JsonValue::Object(val.1)).into()
67 })
68 .boxed();
69 let reply = Reply {
70 req: req,
71 data: ReplyData::Stream(rx)
72 };
73 let sender = channel::new_sender(tx);
74 (sender, reply)
75}
76
77impl Reply {
78 pub fn data(&self) -> Option<&JsonObject> {
79 match self.data {
80 ReplyData::Value(ref dat) => Some(dat),
81 _ => None,
82 }
83 }
84
85 pub fn data_mut(&mut self) -> Option<&mut JsonObject> {
86 match self.data {
87 ReplyData::Value(ref mut dat) => Some(dat),
88 _ => None,
89 }
90 }
91
92 pub fn to_http(self) -> http::Response<Body> {
95 let resp = http::Response::new();
96
97 match self.data {
98 ReplyData::Value(val) => {
99 let resp_str = JsonValue::Object(val).to_string();
100 resp
101 .with_header(ContentLength(resp_str.len() as u64))
102 .with_header(ContentType(mime::APPLICATION_JSON))
103 .with_body(Body::Once(Some(resp_str.into())))
104 },
105 ReplyData::Stream(stream) => {
106 resp
107 .with_header(ContentType(mime::TEXT_EVENT_STREAM))
108 .with_body(Body::Stream(stream))
109 },
110 }
111 }
112
113 pub fn method(&self) -> Method {
114 self.req.method()
115 }
116
117 pub fn resource(&self) -> &str {
118 &self.req.resource()
119 }
120
121 pub fn id(&self) -> &Option<String> {
122 &self.req.id()
123 }
124
125 pub fn params(&self) -> &JsonObject {
126 &self.req.params()
127 }
128
129 pub fn param(&self, key: &str) -> &JsonValue {
130 self.req.param(key)
131 }
132
133 pub fn boxed(self) -> BoxFuture<Reply, Error> {
134 ok(self).boxed()
135 }
136
137 pub fn request_data(&self) -> &JsonObject {
138 self.req.data()
139 }
140}
141
142impl IntoFuture for Reply {
143 type Item = Reply;
144 type Error = Error;
145 type Future = FutureResult<Reply, Error>;
146 fn into_future(self) -> Self::Future {
147 ok(self)
148 }
149}
150
151pub enum Body {
153 Once(Option<HyperChunk>),
154 Stream(ChunkReceiver),
155}
156
157impl Stream for Body {
158 type Item = HyperChunk;
159 type Error = HyperError;
160
161 fn poll(&mut self) -> Poll<Option<HyperChunk>, HyperError> {
162 match self {
163 &mut Body::Once(ref mut opt) => Ok(Async::Ready(opt.take())),
164 &mut Body::Stream(ref mut stream) => {
165 match stream.poll() {
166 Ok(u) => Ok(u),
167 Err(()) => Ok(Async::Ready(None)), }
169 }
170 }
171 }
172}