1use futures;
2use hyper::{self, Body, Chunk, Request, Response, StatusCode};
3use hyper::rt::{Future, Stream};
4use request::{self, RequestType};
5use serde::Deserialize;
6use serde_json;
7use sled;
8use sled_search;
9use std::error::Error as StdError;
10use std::fmt;
11use std::mem;
12use std::sync::Arc;
13
14pub trait IntoResponse {
16 fn into_response(self, Arc<sled::Tree>) -> Response<Body>;
18}
19
20pub type ResponseFuture = Box<Future<Item = Response<Body>, Error = hyper::Error> + Send>;
22
23#[derive(Debug)]
25pub struct UnknownRequest;
26
27struct Iter {
34 _tree: Arc<sled::Tree>,
35 iter: sled::Iter<'static>,
36}
37
38impl IntoResponse for request::Get {
39 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
40 tree.get(&self.key)
41 .map(|value| {
42 let bytes = serde_json::to_vec(&value)
43 .expect("failed to serialize value to JSON");
44 Response::new(bytes.into())
45 })
46 .unwrap_or_else(|err| db_err_response(&err))
47 }
48}
49
50impl IntoResponse for request::Del {
51 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
52 tree.del(&self.key)
53 .map(|value| {
54 let bytes = serde_json::to_vec(&value)
55 .expect("failed to serialize value to JSON");
56 Response::new(bytes.into())
57 })
58 .unwrap_or_else(|err| db_err_response(&err))
59 }
60}
61
62impl IntoResponse for request::Set {
63 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
64 let request::Set { key, value } = self;
65 tree.set(key, value)
66 .map(|value| {
67 let bytes = serde_json::to_vec(&value)
68 .expect("failed to serialize value to JSON");
69 Response::builder()
70 .status(StatusCode::CREATED)
71 .body(bytes.into())
72 .expect("failed to construct `Set` response")
73 })
74 .unwrap_or_else(|err| db_err_response(&err))
75 }
76}
77
78impl IntoResponse for request::Cas {
79 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
80 let request::Cas { key, old, new } = self;
81 match tree.cas(key, old, new) {
82 Ok(()) => {
83 let res: Result<(), Option<Vec<u8>>> = Ok(());
84 let bytes = serde_json::to_vec(&res)
85 .expect("failed to serialize result to JSON");
86 Response::new(bytes.into())
87 }
88 Err(sled::Error::CasFailed(opt_bytes)) => {
89 let res: Result<(), Option<Vec<u8>>> = Err(opt_bytes);
90 let bytes = serde_json::to_vec(&res)
91 .expect("failed to serialize result to JSON");
92 Response::new(bytes.into())
93 }
94 Err(err) => db_err_response(&err),
95 }
96 }
97}
98
99impl IntoResponse for request::Merge {
100 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
101 let request::Merge { key, value } = self;
102 tree.merge(key, value)
103 .map(|value| {
104 let bytes = serde_json::to_vec(&value)
105 .expect("failed to serialize value to JSON");
106 Response::builder()
107 .status(StatusCode::CREATED)
108 .body(bytes.into())
109 .expect("failed to construct `Set` response")
110 })
111 .unwrap_or_else(|err| db_err_response(&err))
112 }
113}
114
115impl IntoResponse for request::Flush {
116 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
117 tree.flush()
118 .map(|value| {
119 let bytes = serde_json::to_vec(&value)
120 .expect("failed to serialize value to JSON");
121 Response::new(bytes.into())
122 })
123 .unwrap_or_else(|err| db_err_response(&err))
124 }
125}
126
127impl IntoResponse for request::Iter {
128 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
129 let iter = tree_iter(tree)
130 .map(|res| {
131 let kv = res.map_err(|err| Box::new(err))?;
132 let bytes = serde_json::to_vec(&kv).map_err(|err| Box::new(err))?;
133 Ok(Chunk::from(bytes))
134 });
135 let stream = Box::new(futures::stream::iter_result(iter)) as Box<_>;
136 Response::builder()
137 .body(Body::from(stream))
138 .expect("failed to construct `Iter` response")
139 }
140}
141
142impl IntoResponse for request::Scan {
143 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
144 let scan = tree_scan(tree, &self.key)
145 .map(|res| {
146 let kv = res.map_err(|err| Box::new(err))?;
147 let bytes = serde_json::to_vec(&kv).map_err(|err| Box::new(err))?;
148 Ok(Chunk::from(bytes))
149 });
150 let stream = Box::new(futures::stream::iter_result(scan)) as Box<_>;
151 Response::builder()
152 .body(Body::from(stream))
153 .expect("failed to construct `Iter` response")
154 }
155}
156
157impl IntoResponse for request::ScanRange {
158 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
159 let request::ScanRange { start, end } = self;
160 let scan = tree_scan(tree, &start)
161 .filter_map(move |res| {
162 let (k, v) = match res {
163 Err(err) => return Some(Err(Box::new(err) as Box<StdError + Send + Sync>)),
164 Ok(kv) => kv,
165 };
166 if k >= end {
167 return None;
168 }
169 let bytes = match serde_json::to_vec(&(k, v)) {
170 Err(err) => return Some(Err(Box::new(err))),
171 Ok(bytes) => bytes,
172 };
173 Some(Ok(Chunk::from(bytes)))
174 });
175 let stream = Box::new(futures::stream::iter_result(scan)) as Box<_>;
176 Response::builder()
177 .body(Body::from(stream))
178 .expect("failed to construct `Iter` response")
179 }
180}
181
182impl IntoResponse for request::Max {
183 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
184 sled_search::max(&tree)
185 .map(|entry| {
186 let bytes = serde_json::to_vec(&entry)
187 .expect("failed to serialize entry to JSON");
188 Response::builder()
189 .body(bytes.into())
190 .expect("failed to construct `Max` response")
191 })
192 .unwrap_or_else(|err| db_err_response(&err))
193 }
194}
195
196impl IntoResponse for request::Pred {
197 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
198 sled_search::pred(&tree, &self.key)
199 .map(|entry| {
200 let bytes = serde_json::to_vec(&entry)
201 .expect("failed to serialize entry to JSON");
202 Response::new(bytes.into())
203 })
204 .unwrap_or_else(|err| db_err_response(&err))
205 }
206}
207
208impl IntoResponse for request::PredIncl {
209 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
210 sled_search::pred_incl(&tree, &self.key)
211 .map(|entry| {
212 let bytes = serde_json::to_vec(&entry)
213 .expect("failed to serialize entry to JSON");
214 Response::new(bytes.into())
215 })
216 .unwrap_or_else(|err| db_err_response(&err))
217 }
218}
219
220impl IntoResponse for request::Succ {
221 fn into_response(mut self, tree: Arc<sled::Tree>) -> Response<Body> {
222 self.key.push(0);
223 let entry = match tree.scan(&self.key).next() {
224 Some(Err(err)) => return db_err_response(&err),
225 Some(Ok(entry)) => Some(entry),
226 None => None,
227 };
228 let bytes = serde_json::to_vec(&entry)
229 .expect("failed to serialize entry to JSON");
230 Response::new(bytes.into())
231 }
232}
233
234impl IntoResponse for request::SuccIncl {
235 fn into_response(self, tree: Arc<sled::Tree>) -> Response<Body> {
236 let entry = match tree.scan(&self.key).next() {
237 Some(Err(err)) => return db_err_response(&err),
238 Some(Ok(entry)) => Some(entry),
239 None => None,
240 };
241 let bytes = serde_json::to_vec(&entry)
242 .expect("failed to serialize entry to JSON");
243 Response::new(bytes.into())
244 }
245}
246
247impl Iterator for Iter {
248 type Item = sled::Result<(Vec<u8>, Vec<u8>), ()>;
249 fn next(&mut self) -> Option<Self::Item> {
250 self.iter.next()
251 }
252}
253
254impl StdError for UnknownRequest {
255 fn description(&self) -> &str {
256 "no known valid response for the given request"
257 }
258}
259
260impl fmt::Display for UnknownRequest {
261 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262 write!(f, "{}", self.description())
263 }
264}
265
266fn tree_iter(tree: Arc<sled::Tree>) -> Iter {
268 let _tree = tree.clone();
269 let iter: sled::Iter = tree.iter();
270 let iter: sled::Iter<'static> = unsafe { mem::transmute(iter) };
271 Iter { _tree, iter }
272}
273
274fn tree_scan(tree: Arc<sled::Tree>, key: &[u8]) -> Iter {
276 let _tree = tree.clone();
277 let iter: sled::Iter = tree.scan(key);
278 let iter: sled::Iter<'static> = unsafe { mem::transmute(iter) };
279 Iter { _tree, iter }
280}
281
282fn deserialize_and_respond<T>(bytes: &[u8], tree: Arc<sled::Tree>) -> Response<Body>
284where
285 T: IntoResponse + for<'de> Deserialize<'de>,
286{
287 serde_json::from_slice(bytes)
288 .map(|req: T| req.into_response(tree))
289 .unwrap_or_else(|err| deserialization_err_response(&err))
290}
291
292fn concat_and_respond<T>(
294 request: Request<Body>,
295 tree: Arc<sled::Tree>,
296) -> impl Future<Item = Response<Body>, Error = hyper::Error> + Send
297where
298 T: IntoResponse + for<'de> Deserialize<'de>,
299{
300 request
301 .into_body()
302 .concat2()
303 .map(move |chunk| deserialize_and_respond::<T>(&chunk, tree))
304}
305
306fn err_to_json_bytes(err: &StdError) -> Vec<u8> {
308 let string = format!("{}", err);
309 serde_json::to_vec(&string)
310 .expect("failed to serialize error string")
311}
312
313fn db_err_response(err: &StdError) -> Response<Body> {
318 Response::builder()
319 .status(StatusCode::INTERNAL_SERVER_ERROR)
320 .body(err_to_json_bytes(err).into())
321 .expect("failed to construct INTERNAL_SERVER_ERROR response")
322}
323
324fn deserialization_err_response(err: &StdError) -> Response<Body> {
329 Response::builder()
330 .status(StatusCode::BAD_REQUEST)
331 .body(err_to_json_bytes(err).into())
332 .expect("failed to construct BAD_REQUEST response")
333}
334
335pub fn response(
376 request: Request<Body>,
377 tree: Arc<sled::Tree>,
378) -> Result<ResponseFuture, UnknownRequest> {
379 match (request.method(), request.uri().path()) {
380 (&request::Get::METHOD, request::Get::PATH_AND_QUERY) => {
381 Ok(Box::new(concat_and_respond::<request::Get>(request, tree)))
382 }
383 (&request::Del::METHOD, request::Del::PATH_AND_QUERY) => {
384 Ok(Box::new(concat_and_respond::<request::Del>(request, tree)))
385 }
386 (&request::Set::METHOD, request::Set::PATH_AND_QUERY) => {
387 Ok(Box::new(concat_and_respond::<request::Set>(request, tree)))
388 }
389 (&request::Cas::METHOD, request::Cas::PATH_AND_QUERY) => {
390 Ok(Box::new(concat_and_respond::<request::Cas>(request, tree)))
391 }
392 (&request::Merge::METHOD, request::Merge::PATH_AND_QUERY) => {
393 Ok(Box::new(concat_and_respond::<request::Merge>(request, tree)))
394 }
395 (&request::Flush::METHOD, request::Flush::PATH_AND_QUERY) => {
396 Ok(Box::new(concat_and_respond::<request::Flush>(request, tree)))
397 }
398 (&request::Iter::METHOD, request::Iter::PATH_AND_QUERY) => {
399 Ok(Box::new(concat_and_respond::<request::Iter>(request, tree)))
400 }
401 (&request::Scan::METHOD, request::Scan::PATH_AND_QUERY) => {
402 Ok(Box::new(concat_and_respond::<request::Scan>(request, tree)))
403 }
404 (&request::ScanRange::METHOD, request::ScanRange::PATH_AND_QUERY) => {
405 Ok(Box::new(concat_and_respond::<request::ScanRange>(request, tree)))
406 }
407 (&request::Max::METHOD, request::Max::PATH_AND_QUERY) => {
408 Ok(Box::new(concat_and_respond::<request::Max>(request, tree)))
409 }
410 (&request::Pred::METHOD, request::Pred::PATH_AND_QUERY) => {
411 Ok(Box::new(concat_and_respond::<request::Pred>(request, tree)))
412 }
413 (&request::PredIncl::METHOD, request::PredIncl::PATH_AND_QUERY) => {
414 Ok(Box::new(concat_and_respond::<request::PredIncl>(request, tree)))
415 }
416 (&request::Succ::METHOD, request::Succ::PATH_AND_QUERY) => {
417 Ok(Box::new(concat_and_respond::<request::Succ>(request, tree)))
418 }
419 (&request::SuccIncl::METHOD, request::SuccIncl::PATH_AND_QUERY) => {
420 Ok(Box::new(concat_and_respond::<request::SuccIncl>(request, tree)))
421 }
422 _ => Err(UnknownRequest)
423 }
424}
425
426pub fn or_404(
429 result: Result<ResponseFuture, UnknownRequest>,
430) -> impl Future<Item = Response<Body>, Error = hyper::Error> + Send {
431 result
432 .unwrap_or_else(|UnknownRequest| {
433 let response = Response::builder()
434 .status(StatusCode::NOT_FOUND)
435 .body(vec![].into())
436 .expect("failed to construct NOT_FOUND response");
437 Box::new(futures::future::ok(response)) as _
438 })
439}