sled_web/
client.rs

1use futures::{Async, Poll};
2use hyper::{self, Body, Request, Response, StatusCode, Uri};
3use hyper::client::HttpConnector;
4use hyper::rt::{Future, Stream};
5use request;
6use serde::Deserialize;
7use serde_json;
8use std::error::Error as StdError;
9use std::fmt;
10
11/// A hyper `Client` wrapper that simplifies communication with the sled `Tree` server.
12#[derive(Clone, Debug)]
13pub struct Client {
14    uri: Uri,
15    client: hyper::Client<HttpConnector, Body>,
16}
17
18/// The possible errors that may be produced by the `Client` request methods.
19#[derive(Debug)]
20pub enum Error {
21    Hyper(hyper::Error),
22    SerdeJson(serde_json::Error),
23    Server(String),
24}
25
26pub type Key = Vec<u8>;
27pub type Value = Vec<u8>;
28pub type Entry = (Vec<u8>, Vec<u8>);
29
30/// A stream that converts a hyper `Body` into a stream yielding JSON `Value`s.
31///
32/// Assumes that the `Body` will never yield parts of two separate JSON objects within the same
33/// chunk, but may split individual JSON objects across multiple chunks.
34#[derive(Debug)]
35pub struct BodyToJsonChunks {
36    body: Body,
37    buffer: Vec<u8>,
38}
39
40impl Client {
41    /// Create a new `Client` pointing towards the given `Uri`.
42    ///
43    /// The `Uri` should contain the `Scheme` and `Authority` parts of the URI but not the
44    /// following path. This following path will be created as necessary within each of the request
45    /// calls.
46    pub fn new(uri: Uri) -> Self {
47        let client = hyper::Client::builder().build_http();
48        Client { uri, client }
49    }
50
51    /// A method for performing the `Get` request.
52    ///
53    /// Given the key for an entry in the `sled::Tree`, produce a `Future` with the value.
54    pub fn get(&self, key: Key) -> impl Future<Item = Option<Value>, Error = Error> {
55        let request = request::get(self.uri.clone(), key);
56        request_concat_and_deserialize(self, request)
57    }
58
59    /// A method for performing the `Del` request.
60    ///
61    /// Given the key for an entry in the `sled::Tree`, delete the entry and return a `Future` with
62    /// the removed value.
63    pub fn del(&self, key: Key) -> impl Future<Item = Option<Value>, Error = Error> {
64        let request = request::del(self.uri.clone(), key);
65        request_concat_and_deserialize(self, request)
66    }
67
68    /// A method for performing the `Set` request.
69    ///
70    /// Send the given key and value to the database for insertion into the `sled::Tree`.
71    pub fn set(&self, key: Key, value: Value) -> impl Future<Item = (), Error = Error> {
72        let request = request::set(self.uri.clone(), key, value);
73        request_concat_and_deserialize(self, request)
74    }
75
76    /// A method for performing the `Cas` request.
77    ///
78    /// Compare and swap. Capable of unique creation, conditional modification, or deletion.
79    ///
80    /// If old is None, this will only set the value if it doesn't exist yet. If new is None, will
81    /// delete the value if old is correct. If both old and new are Some, will modify the value if
82    /// old is correct.
83    ///
84    /// If Tree is read-only, will do nothing.
85    pub fn cas(
86        &self,
87        key: Key,
88        old: Option<Value>,
89        new: Option<Value>,
90    ) -> impl Future<Item = Result<(), Option<Value>>, Error = Error> {
91        let request = request::cas(self.uri.clone(), key, old, new);
92        request_concat_and_deserialize(self, request)
93    }
94
95    /// A method for performing the `Merge` request.
96    ///
97    /// Merge a new value into the total state for a key.
98    pub fn merge(&self, key: Key, value: Value) -> impl Future<Item = (), Error = Error> {
99        let request = request::merge(self.uri.clone(), key, value);
100        request_concat_and_deserialize(self, request)
101    }
102
103    /// A method for performing the `Flush` request.
104    ///
105    /// Flushes any pending IO buffers to disk to ensure durability.
106    pub fn flush(&self) -> impl Future<Item = (), Error = Error> {
107        let request = request::flush(self.uri.clone());
108        request_concat_and_deserialize(self, request)
109    }
110
111    /// A method for performing the `Iter` request.
112    ///
113    /// The result is a `Stream` of ordered key value pairs.
114    pub fn iter(&self) -> impl Stream<Item = Entry, Error = Error> {
115        let request = request::iter(self.uri.clone());
116        request_stream_and_deserialize(self, request)
117    }
118
119    /// A method for performing the `Scan` request.
120    ///
121    /// The result is a `Stream` of ordered key value pairs, starting from the given key.
122    pub fn scan(&self, key: Key) -> impl Stream<Item = Entry, Error = Error> {
123        let request = request::scan(self.uri.clone(), key);
124        request_stream_and_deserialize(self, request)
125    }
126
127    /// A method for performing the `Scan` request.
128    ///
129    /// The result is a `Stream` of all ordered key value pairs within the given key range.
130    pub fn scan_range(&self, start: Key, end: Key) -> impl Stream<Item = Entry, Error = Error> {
131        let request = request::scan_range(self.uri.clone(), start, end);
132        request_stream_and_deserialize(self, request)
133    }
134
135    /// A method for perfomring the `Max` request.
136    ///
137    /// The result is a `Future` yielding the greatest entry in the `sled::Tree`.
138    ///
139    /// Returns `None` if there are no entries within the tree.
140    pub fn max(&self) -> impl Future<Item = Option<Entry>, Error = Error> {
141        let request = request::max(self.uri.clone());
142        request_concat_and_deserialize(self, request)
143    }
144
145    /// A method for performing the `Pred` request.
146    ///
147    /// Given the key for an entry in the `sled::Tree`, produce a `Future` with the preceding
148    /// entry.
149    pub fn pred(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
150        let request = request::pred(self.uri.clone(), key);
151        request_concat_and_deserialize(self, request)
152    }
153
154    /// A method for performing the `PredIncl` request.
155    ///
156    /// Given the key for an entry in the `sled::Tree`, produce a `Future` with the preceding
157    /// entry or the entry associated with the key if there is one.
158    pub fn pred_incl(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
159        let request = request::pred_incl(self.uri.clone(), key);
160        request_concat_and_deserialize(self, request)
161    }
162
163    /// A method for performing the `Succ` request.
164    ///
165    /// Given the key for an entry in the `sled::Tree`, produce a `Future` with the following
166    /// entry.
167    pub fn succ(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
168        let request = request::succ(self.uri.clone(), key);
169        request_concat_and_deserialize(self, request)
170    }
171
172    /// A method for performing the `SuccIncl` request.
173    ///
174    /// Given the key for an entry in the `sled::Tree`, produce a `Future` with the following
175    /// entry or the entry associated with the key if there is one.
176    pub fn succ_incl(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
177        let request = request::succ_incl(self.uri.clone(), key);
178        request_concat_and_deserialize(self, request)
179    }
180}
181
182impl Stream for BodyToJsonChunks {
183    type Item = serde_json::Value;
184    type Error = Error;
185    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
186        loop {
187            match self.body.poll() {
188                Err(err) => return Err(err.into()),
189                Ok(Async::NotReady) => return Ok(Async::NotReady),
190                Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
191                Ok(Async::Ready(Some(chunk))) => self.buffer.extend(chunk),
192            }
193            let v = match serde_json::from_slice::<serde_json::Value>(&self.buffer) {
194                Err(_err) => continue,
195                Ok(v) => v,
196            };
197            self.buffer.clear();
198            return Ok(Async::Ready(Some(v)));
199        }
200    }
201}
202
203impl StdError for Error {
204    fn description(&self) -> &str {
205        match *self {
206            Error::Hyper(ref err) => err.description(),
207            Error::SerdeJson(ref err) => err.description(),
208            Error::Server(ref s) => s,
209        }
210    }
211    fn cause(&self) -> Option<&StdError> {
212        match *self {
213            Error::Hyper(ref err) => Some(err),
214            Error::SerdeJson(ref err) => Some(err),
215            Error::Server(_) => None,
216        }
217    }
218}
219
220impl fmt::Display for Error {
221    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222        write!(f, "{}", self.description())
223    }
224}
225
226impl From<hyper::Error> for Error {
227    fn from(e: hyper::Error) -> Self {
228        Error::Hyper(e)
229    }
230}
231
232impl From<serde_json::Error> for Error {
233    fn from(e: serde_json::Error) -> Self {
234        Error::SerdeJson(e)
235    }
236}
237
238impl From<Body> for BodyToJsonChunks {
239    fn from(body: Body) -> Self {
240        let buffer = vec![];
241        BodyToJsonChunks { body, buffer }
242    }
243}
244
245/// Concatenate and deserialize a single-chunk reponse.
246fn concat_and_deserialize<T>(response: Response<Body>) -> impl Future<Item = T, Error = Error>
247where
248    T: for<'de> Deserialize<'de>,
249{
250    let status = response.status();
251    BodyToJsonChunks::from(response.into_body())
252        .and_then(move |value| {
253            if status == StatusCode::INTERNAL_SERVER_ERROR {
254                let s = serde_json::from_value(value).map_err(Error::SerdeJson)?;
255                return Err(Error::Server(s));
256            }
257            serde_json::from_value::<T>(value).map_err(Error::SerdeJson)
258        })
259        .into_future()
260        .map_err(|(err, _)| err)
261        .and_then(|(opt, _stream)| opt.ok_or_else(|| unreachable!()))
262}
263
264/// Convert the given response body chunks into a stream of deserialized items.
265fn stream_and_deserialize<T>(response: Response<Body>) -> impl Stream<Item = T, Error = Error>
266where
267    T: for<'de> Deserialize<'de>,
268{
269    BodyToJsonChunks::from(response.into_body())
270        .and_then(|json| serde_json::from_value(json).map_err(Error::SerdeJson))
271}
272
273/// Submit the given request, then concatenate and deserialize a single-chunk response.
274fn request_concat_and_deserialize<T>(
275    client: &Client,
276    request: Request<Body>,
277) -> impl Future<Item = T, Error = Error>
278where
279    T: for<'de> Deserialize<'de>,
280{
281    client
282        .client
283        .request(request)
284        .map_err(Error::Hyper)
285        .and_then(concat_and_deserialize)
286}
287
288/// Submit the given request, then convert the response body chunks into a stream of deserialized
289/// items.
290fn request_stream_and_deserialize<T>(
291    client: &Client,
292    request: Request<Body>,
293) -> impl Stream<Item = T, Error = Error>
294where
295    T: for<'de> Deserialize<'de>,
296{
297    client
298        .client
299        .request(request)
300        .map_err(Error::Hyper)
301        .map(stream_and_deserialize)
302        .flatten_stream()
303}