dscfg_client/
lib.rs

1extern crate dscfg_proto;
2extern crate futures;
3extern crate tokio_io;
4extern crate serde;
5extern crate serde_json;
6
7pub use dscfg_proto::json;
8
9//use tokio_io::{AsyncRead, AsyncWrite};
10use futures::{Stream, Sink, Future};
11use std::io;
12use tokio_io::{AsyncRead, AsyncWrite};
13use serde::{Serialize, Deserialize};
14
15/// Error returned when DSCFG protocol fails.
16#[derive(Debug)]
17pub enum ProtocolError<E> {
18    /// The response to the message wasn't expected
19    UnexpectedResponse,
20    /// The stream has ended before a message could be fully decoded.
21    UnexpectedEof,
22    /// Underlying communication error - e.g. I/O error.
23    Communication(E),
24}
25
26/// DSCFG client
27///
28/// This represents a connection to the DSCFG server and allows
29/// manipulating shared configurationn as well as receiving notifictions
30/// about changes.
31///
32/// You should usually create it by calling `new()` function of this crate,
33/// but you may use custom stream if you need finer control.
34pub struct Client<C> {
35    connection: C,
36}
37
38impl<Val: Serialize + for<'a> Deserialize<'a>, E, C: Stream<Item=dscfg_proto::Response<Val>, Error=E> + Sink<SinkItem=dscfg_proto::Request<Val>, SinkError=E>> Client<C> {
39    /// Intantiates Client using provided custom `Stream + Sink`.
40    pub fn custom(connection: C) -> Self {
41        Client {
42            connection,
43        }
44    }
45
46    /// Sends request to set the `key` to given `value`.
47    ///
48    /// Returns future which resolves to `Client`, if the request succeeded.
49    pub fn set_value(self, key: String, value: Val) -> impl Future<Item=Self, Error=E> {
50        self.connection.send(dscfg_proto::Request::Set { key, value, })
51            .map(|connection| Client { connection, })
52    }
53
54    /// Sends request for getting value of given key and waits for the answer.
55    ///
56    /// Returns future which resolves to `(Val, Self)` if successful.
57    pub fn get_value<K: Into<String>>(self, key: K) -> impl Future<Item=(Val, Self), Error=ProtocolError<E>> {
58        self.connection.send(dscfg_proto::Request::Get { key: key.into() })
59            .and_then(|connection| connection.into_future().map_err(|(err, _)| err))
60            .map_err(ProtocolError::Communication)
61            .and_then(|(result, connection)| {
62                match result {
63                    Some(dscfg_proto::Response::Value { key: _, value, }) => Ok((value, Client { connection, })),
64                    None => Err(ProtocolError::UnexpectedEof),
65                    _ => Err(ProtocolError::UnexpectedResponse),
66                }
67            })
68    }
69
70    /// Subscribes for notifications of changes of value of specified `key`
71    pub fn listen_notifications<K: Into<String>>(self, key: K, notify_now: bool) -> impl Stream<Item=(String, Val), Error=E> {
72        self.connection
73            .send(dscfg_proto::Request::Subscribe { key: key.into(), notify_now, })
74            .and_then(|s| s.into_future().map_err(|(err, _)| err))
75            .map(|(_, s)| s)
76            .flatten_stream()
77            .filter_map(|msg| match msg {
78                dscfg_proto::Response::Value { key, value } => Some((key, value)),
79                _ => None,
80            })
81    }
82}
83
84/// Creates a dscfg client that encodes communication as length-delimited Json messages.
85pub fn new<Val: Serialize + for<'a> Deserialize<'a>, C: AsyncRead + AsyncWrite>(connection: C) -> Client<impl Stream<Item=dscfg_proto::Response<Val>, Error=io::Error> + Sink<SinkItem=dscfg_proto::Request<Val>, SinkError=io::Error>> {
86    // The deprecation message suggests to use `tokio-codec` instead,
87    // which doesn't actually implement it, and depending on `tokio::codec`
88    // just pulls in too many dependencies.
89    #[allow(deprecated)]
90    let client = tokio_io::codec::length_delimited::Builder::new()
91        .native_endian()
92        .new_framed(connection)
93        .and_then(|message| serde_json::from_slice(&message).map_err(Into::into))
94        .with(|message| serde_json::to_vec(&message).map_err(io::Error::from));
95
96    Client::custom(client)
97}