1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
extern crate dscfg_proto;
extern crate futures;
extern crate tokio_io;
extern crate serde;
extern crate serde_json;
pub use dscfg_proto::json;
use futures::{Stream, Sink, Future};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
use serde::{Serialize, Deserialize};
#[derive(Debug)]
pub enum ProtocolError<E> {
UnexpectedResponse,
UnexpectedEof,
Communication(E),
}
pub struct Client<C> {
connection: C,
}
impl<Val: Serialize + for<'a> Deserialize<'a>, E, C: Stream<Item=dscfg_proto::Response<Val>, Error=E> + Sink<SinkItem=dscfg_proto::Request<Val>, SinkError=E>> Client<C> {
pub fn custom(connection: C) -> Self {
Client {
connection,
}
}
pub fn set_value(self, key: String, value: Val) -> impl Future<Item=Self, Error=E> {
self.connection.send(dscfg_proto::Request::Set { key, value, })
.map(|connection| Client { connection, })
}
pub fn get_value<K: Into<String>>(self, key: K) -> impl Future<Item=(Val, Self), Error=ProtocolError<E>> {
self.connection.send(dscfg_proto::Request::Get { key: key.into() })
.and_then(|connection| connection.into_future().map_err(|(err, _)| err))
.map_err(ProtocolError::Communication)
.and_then(|(result, connection)| {
match result {
Some(dscfg_proto::Response::Value { key: _, value, }) => Ok((value, Client { connection, })),
None => Err(ProtocolError::UnexpectedEof),
_ => Err(ProtocolError::UnexpectedResponse),
}
})
}
pub fn listen_notifications<K: Into<String>>(self, key: K, notify_now: bool) -> impl Stream<Item=(String, Val), Error=E> {
self.connection
.send(dscfg_proto::Request::Subscribe { key: key.into(), notify_now, })
.and_then(|s| s.into_future().map_err(|(err, _)| err))
.map(|(_, s)| s)
.flatten_stream()
.filter_map(|msg| match msg {
dscfg_proto::Response::Value { key, value } => Some((key, value)),
_ => None,
})
}
}
pub fn new<Val: Serialize + for<'a> Deserialize<'a>, C: AsyncRead + AsyncWrite>(connection: C) -> Client<impl Stream<Item=dscfg_proto::Response<Val>, Error=io::Error> + Sink<SinkItem=dscfg_proto::Request<Val>, SinkError=io::Error>> {
#[allow(deprecated)]
let client = tokio_io::codec::length_delimited::Builder::new()
.native_endian()
.new_framed(connection)
.and_then(|message| serde_json::from_slice(&message).map_err(Into::into))
.with(|message| serde_json::to_vec(&message).map_err(io::Error::from));
Client::custom(client)
}