1extern crate dscfg_proto;
2extern crate futures;
3extern crate tokio_io;
4extern crate serde;
5extern crate serde_json;
6
7pub use dscfg_proto::json;
8
9use futures::{Stream, Sink, Future};
11use std::io;
12use tokio_io::{AsyncRead, AsyncWrite};
13use serde::{Serialize, Deserialize};
14
15#[derive(Debug)]
17pub enum ProtocolError<E> {
18 UnexpectedResponse,
20 UnexpectedEof,
22 Communication(E),
24}
25
26pub struct Client<C> {
35 connection: C,
36}
37
38impl<Val: Serialize + for<'a> Deserialize<'a>, E, C: Stream<Item=dscfg_proto::Response<Val>, Error=E> + Sink<SinkItem=dscfg_proto::Request<Val>, SinkError=E>> Client<C> {
39 pub fn custom(connection: C) -> Self {
41 Client {
42 connection,
43 }
44 }
45
46 pub fn set_value(self, key: String, value: Val) -> impl Future<Item=Self, Error=E> {
50 self.connection.send(dscfg_proto::Request::Set { key, value, })
51 .map(|connection| Client { connection, })
52 }
53
54 pub fn get_value<K: Into<String>>(self, key: K) -> impl Future<Item=(Val, Self), Error=ProtocolError<E>> {
58 self.connection.send(dscfg_proto::Request::Get { key: key.into() })
59 .and_then(|connection| connection.into_future().map_err(|(err, _)| err))
60 .map_err(ProtocolError::Communication)
61 .and_then(|(result, connection)| {
62 match result {
63 Some(dscfg_proto::Response::Value { key: _, value, }) => Ok((value, Client { connection, })),
64 None => Err(ProtocolError::UnexpectedEof),
65 _ => Err(ProtocolError::UnexpectedResponse),
66 }
67 })
68 }
69
70 pub fn listen_notifications<K: Into<String>>(self, key: K, notify_now: bool) -> impl Stream<Item=(String, Val), Error=E> {
72 self.connection
73 .send(dscfg_proto::Request::Subscribe { key: key.into(), notify_now, })
74 .and_then(|s| s.into_future().map_err(|(err, _)| err))
75 .map(|(_, s)| s)
76 .flatten_stream()
77 .filter_map(|msg| match msg {
78 dscfg_proto::Response::Value { key, value } => Some((key, value)),
79 _ => None,
80 })
81 }
82}
83
84pub fn new<Val: Serialize + for<'a> Deserialize<'a>, C: AsyncRead + AsyncWrite>(connection: C) -> Client<impl Stream<Item=dscfg_proto::Response<Val>, Error=io::Error> + Sink<SinkItem=dscfg_proto::Request<Val>, SinkError=io::Error>> {
86 #[allow(deprecated)]
90 let client = tokio_io::codec::length_delimited::Builder::new()
91 .native_endian()
92 .new_framed(connection)
93 .and_then(|message| serde_json::from_slice(&message).map_err(Into::into))
94 .with(|message| serde_json::to_vec(&message).map_err(io::Error::from));
95
96 Client::custom(client)
97}