backtalk/
reply.rs

1use {JsonValue, Request, Method, JsonObject, Error, channel};
2use std::fmt;
3use hyper::server as http;
4use hyper::Error as HyperError;
5use hyper::header::{ContentLength, ContentType};
6use hyper::mime;
7use hyper::Chunk as HyperChunk;
8use futures::{Poll, Stream, Async, IntoFuture};
9use futures::future::{ok, FutureResult, BoxFuture, Future};
10use futures::stream::BoxStream;
11use futures::sync::mpsc;
12use Sender;
13
14type ChunkReceiver = BoxStream<HyperChunk, ()>;
15
16/**
17A successful response with JSON data to be sent back to the client.
18
19There are two kinds of replies. Static replies represent JSON data that is ready. Most requests
20return static replies. Streaming replies represent a stream of JSON data that will stream from
21a `Channel` directly to the client. You can't access the data of a streaming reply through the
22`Reply` struct, since it's not ready yet. If you want to transform or edit the reply data for a
23stream, you'll need to implement a custom `Channel` instead.
24
25These are several ways to create a Reply:
26
27- pass a Request to an Adapter to get a static response from a database
28- pass a Request to a Channel to get a streaming response
29- in your custom Resource, call `request.into_reply(data)` to create a Reply object.
30
31Reply implements `IntoFuture`, so you can return it directly from a `and_then` block.
32*/
33#[derive(Debug)]
34pub struct Reply {
35  data: ReplyData,
36  req: Request,
37}
38
39enum ReplyData {
40  Value(JsonObject),
41  Stream(ChunkReceiver),
42}
43
44impl fmt::Debug for ReplyData {
45  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
46    match self {
47      &ReplyData::Value(ref val) => write!(f, "ReplyData::Value({:?})", val),
48      &ReplyData::Stream(_) => write!(f, "ReplyData::Stream(<stream>)"),
49    }
50  }
51}
52
53// only used internally
54pub fn make_reply(req: Request, data: JsonObject) -> Reply {
55  Reply {
56    req: req,
57    data: ReplyData::Value(data),
58  }
59}
60
61// only used internally
62pub fn make_streamed_reply(req: Request) -> (Sender, Reply) {
63  let (tx, rx) = mpsc::unbounded();
64  let rx = rx
65    .map(|val: (String, JsonObject)| -> HyperChunk {
66      format!("event:{}\ndata:{}\n\n", val.0, JsonValue::Object(val.1)).into()
67    })
68    .boxed();
69  let reply = Reply {
70    req: req,
71    data: ReplyData::Stream(rx)
72  };
73  let sender = channel::new_sender(tx);
74  (sender, reply)
75}
76
77impl Reply {
78  pub fn data(&self) -> Option<&JsonObject> {
79    match self.data {
80      ReplyData::Value(ref dat) => Some(dat),
81      _ => None,
82    }
83  }
84
85  pub fn data_mut(&mut self) -> Option<&mut JsonObject> {
86    match self.data {
87      ReplyData::Value(ref mut dat) => Some(dat),
88      _ => None,
89    }
90  }
91
92  // TODO data_then accepts a function that returns a future<JsonObject, Error>
93
94  pub fn to_http(self) -> http::Response<Body> {
95    let resp = http::Response::new();
96
97    match self.data {
98      ReplyData::Value(val) => {
99        let resp_str = JsonValue::Object(val).to_string();
100        resp
101          .with_header(ContentLength(resp_str.len() as u64))
102          .with_header(ContentType(mime::APPLICATION_JSON))
103          .with_body(Body::Once(Some(resp_str.into())))
104      },
105      ReplyData::Stream(stream) => {
106        resp
107          .with_header(ContentType(mime::TEXT_EVENT_STREAM))
108          .with_body(Body::Stream(stream))
109      },
110    }
111  }
112
113  pub fn method(&self) -> Method {
114    self.req.method()
115  }
116
117  pub fn resource(&self) -> &str {
118    &self.req.resource()
119  }
120
121  pub fn id(&self) -> &Option<String> {
122    &self.req.id()
123  }
124
125  pub fn params(&self) -> &JsonObject {
126    &self.req.params()
127  }
128
129  pub fn param(&self, key: &str) -> &JsonValue {
130    self.req.param(key)
131  }
132
133  pub fn boxed(self) -> BoxFuture<Reply, Error> {
134    ok(self).boxed()
135  }
136
137  pub fn request_data(&self) -> &JsonObject {
138    self.req.data()
139  }
140}
141
142impl IntoFuture for Reply {
143  type Item = Reply;
144  type Error = Error;
145  type Future = FutureResult<Reply, Error>;
146  fn into_future(self) -> Self::Future {
147    ok(self)
148  }
149}
150
151/// A `Stream` for `HyperChunk`s used in requests and responses.
152pub enum Body {
153  Once(Option<HyperChunk>),
154  Stream(ChunkReceiver),
155}
156
157impl Stream for Body {
158  type Item = HyperChunk;
159  type Error = HyperError;
160
161  fn poll(&mut self) -> Poll<Option<HyperChunk>, HyperError> {
162    match self {
163      &mut Body::Once(ref mut opt) => Ok(Async::Ready(opt.take())),
164      &mut Body::Stream(ref mut stream) => {
165        match stream.poll() {
166          Ok(u) => Ok(u),
167          Err(()) => Ok(Async::Ready(None)), // this probably can never happen?
168        }
169      }
170    }
171  }
172}