dscfg_server/
lib.rs

1extern crate futures;
2extern crate tokio_io;
3extern crate dscfg_proto;
4extern crate void;
5extern crate same;
6#[macro_use]
7extern crate slog;
8extern crate serde_json;
9
10pub use dscfg_proto::json;
11
12use futures::sync::mpsc::{self, UnboundedSender};
13use futures::{Future, Stream, Sink};
14use futures::future;
15use void::Void;
16use std::collections::{HashMap, HashSet};
17use std::sync::{Arc, Mutex};
18use same::RefCmp;
19use std::sync::RwLock;
20use std::io;
21
22#[derive(Clone)]
23struct Subscriptions(Arc<RwLock<HashMap<String, HashSet<RefCmp<Arc<mpsc::UnboundedSender<dscfg_proto::Response>>>>>>>);
24
25impl Subscriptions {
26    fn new() -> Self {
27        Subscriptions(Default::default())
28    }
29
30    fn subscribe(&self, client: &Arc<mpsc::UnboundedSender<dscfg_proto::Response>>, key: String) -> bool {
31        let mut subscriptions = self.0.write().unwrap();
32        let client = RefCmp(Arc::clone(client));
33
34        subscriptions.entry(key).or_insert_with(HashSet::new).insert(client)
35    }
36
37    fn unsubscribe(&self, client: &Arc<mpsc::UnboundedSender<dscfg_proto::Response>>, key: &str) -> bool {
38        let mut subscriptions = self.0.write().unwrap();
39        let client = RefCmp(Arc::clone(client));
40
41        if let Some(subscriptions) = subscriptions.get_mut(key) {
42            subscriptions.remove(&client)
43        } else {
44            false
45        }
46    }
47
48    fn unsubscribe_all(&self, client: &Arc<mpsc::UnboundedSender<dscfg_proto::Response>>) {
49        let mut subscriptions = self.0.write().unwrap();
50        let client = RefCmp(Arc::clone(client));
51
52        for (_, subscriptions) in &mut *subscriptions {
53            subscriptions.remove(&client);
54        }
55    }
56
57    fn broadcast(&self, key: String, value: json::Value) {
58        use dscfg_proto::Response;
59
60        let subscriptions = self.0.read().unwrap();
61
62        if let Some(subscriptions) = subscriptions.get(&key) {
63            for subscription in subscriptions {
64                subscription
65                    .unbounded_send(Response::Value { key: key.clone(), value: value.clone() })
66                    // This should never happen as the client unregisters itself.
67                    .unwrap()
68            }
69        }
70    }
71}
72
73/// A trait for errors to tell whether they are fatal.
74///
75/// This is used for determining whether the server should continue runnin or stop.
76pub trait IsFatalError {
77    /// Returns `true` if all future operations will (very likely) fail.
78    fn is_fatal(&self) -> bool;
79}
80
81impl IsFatalError for Void {
82    fn is_fatal(&self) -> bool {
83        match *self {}
84    }
85}
86
87/// Specification of interface for accessing configuration.
88///
89/// The configuration can be stored using many different methods. In order to implement a new way
90/// of storing configuration data, you must implement this trait for your type.
91pub trait Storage {
92    /// Error which may occur when attempting to write to the storage.
93    type SetError: IsFatalError;
94    /// Error which may occur when attempting to read from the storage.
95    type GetError: IsFatalError;
96
97    /// When this function is called, the implementor must store the given value for key in the
98    /// storage or return error in case of failure.
99    fn set(&mut self, key: String, value: json::Value) -> Result<(), Self::SetError>;
100
101    /// The implementor must return the value at given key (if exists, `None` if not) or error if getting fails.
102    fn get(&mut self, key: &str) -> Result<Option<json::Value>, Self::GetError>;
103}
104
105impl<T: Storage + ?Sized> Storage for Box<T> {
106    type SetError = T::SetError;
107    type GetError = T::GetError;
108
109    fn set(&mut self, key: String, value: json::Value) -> Result<(), Self::SetError> {
110        (**self).set(key, value)
111    }
112
113    fn get(&mut self, key: &str) -> Result<Option<json::Value>, Self::GetError> {
114        (**self).get(key)
115    }
116}
117
118/// Error that might occur when accessing `Store` synchronized with mutex.
119pub enum SyncOpResult<T> {
120    /// The mutex was poisoned
121    Poisoned,
122    /// The underlying implementation failed.
123    Other(T),
124}
125
126impl<T: IsFatalError> IsFatalError for SyncOpResult<T> {
127    fn is_fatal(&self) -> bool {
128        match self {
129            SyncOpResult::Poisoned => true,
130            SyncOpResult::Other(err) => err.is_fatal(),
131        }
132    }
133}
134
135impl<T> Storage for Arc<Mutex<T>> where T: Storage + ?Sized {
136    type SetError = SyncOpResult<T::SetError>;
137    type GetError = SyncOpResult<T::GetError>;
138
139    fn set(&mut self, key: String, value: json::Value) -> Result<(), Self::SetError> {
140        self.lock()
141            .map_err(|_| SyncOpResult::Poisoned)?
142            .set(key, value)
143            .map_err(SyncOpResult::Other)
144    }
145
146    fn get(&mut self, key: &str) -> Result<Option<json::Value>, Self::GetError> {
147        self.lock()
148            .map_err(|_| SyncOpResult::Poisoned)?
149            .get(key)
150            .map_err(SyncOpResult::Other)
151    }
152}
153
154/// Parameters the server needs to run
155///
156/// Since there are several parameters the server needs, it's better
157/// to pass them as struct containing them.
158pub struct ServerParams<Incoming, Store, Executor, Logger> where 
159    Incoming: Stream,
160    Store: Storage + Clone + Send,
161    Executor: future::Executor<Box<'static + Future<Item=(), Error=()> + Send>>,
162    Logger: Into<slog::Logger> {
163
164    /// Clients that are accepted.
165    pub incoming_clients: Incoming,
166    /// The implementation of configuration storage.
167    pub storage: Store,
168    /// Futures executor used for handling the clients. 
169    pub executor: Executor,
170    /// `slog` Logger used for logging.
171    pub logger: Logger,
172}
173
174/// This struct can be used in place of logger to discard all logs.
175pub struct DiscardLogs;
176
177impl From<DiscardLogs> for slog::Logger {
178    fn from(_: DiscardLogs) -> Self {
179        slog::Logger::root(slog::Discard, o!())
180    }
181}
182
183/// Error that might occur when attempting to accept a connection.
184#[derive(Debug, Clone, Eq, PartialEq, Hash)]
185pub enum HandlingError<E> {
186    /// Accepting failed.
187    AcceptError(E),
188    /// The server is stopping.
189    Shutdown,
190}
191
192fn handle_client<Client, Store, Error>(client: Client, subscriptions: Subscriptions, mut storage: Store, canceler: UnboundedSender<()>) -> Box<'static + Future<Item=(), Error=()> + Send> where
193    Client: 'static + Stream<Item=dscfg_proto::Request, Error=Error> + Sink<SinkItem=dscfg_proto::Response, SinkError=Error> + Send,
194    Store: 'static + Storage + Send,
195    Error: 'static {
196
197    use dscfg_proto::{Request, Response};
198
199    let (sender, receiver) = mpsc::unbounded();
200    let sender = Arc::new(sender);
201    let unsubscriber = subscriptions.clone();
202    let sender_unsubscribe = sender.clone();
203
204    let (sink, stream) = client.split();
205
206    let stream = stream
207        .map(move |request| {
208            match request {
209                Request::Set { key, value } => {
210                    match storage.set(key.clone(), value.clone()) {
211                        Ok(_) => {
212                            subscriptions.broadcast(key, value);
213                            Response::OperationOk
214                        },
215                        Err(err) => {
216                            if err.is_fatal() {
217                                let _ = canceler.unbounded_send(());
218                            }
219                            Response::OperationFailed
220                        },
221                    }
222                },
223                Request::Get { key } => {
224                    match storage.get(&key) {
225                        Ok(Some(value)) => {
226                            Response::Value { key, value }
227                        },
228                        Ok(None) => Response::Value { key, value: json::Value::Null },
229                        Err(err) => {
230                            if err.is_fatal() {
231                                let _ = canceler.unbounded_send(());
232                            }
233                            Response::OperationFailed
234                        },
235                    }
236                },
237                Request::Subscribe { key, notify_now } => {
238                    if notify_now {
239                        match storage.get(&key) {
240                            Ok(Some(value)) => {
241                                let notification = Response::Value {
242                                    key: key.clone(),
243                                    value: value,
244                                };
245                                sender.unbounded_send(notification).unwrap();
246                            },
247                            Ok(None) => {
248                                let notification = Response::Value {
249                                    key: key.clone(),
250                                    value: json::Value::Null,
251                                };
252                                sender.unbounded_send(notification).unwrap();
253                            },
254                            Err(_) => return Response::OperationFailed,
255                        }
256                    }
257
258                    if subscriptions.subscribe(&sender, key) {
259                        Response::OperationOk
260                    } else {
261                        Response::Ignored
262                    }
263                },
264                Request::Unsubscribe { key } => {
265                    if subscriptions.unsubscribe(&sender, &key) {
266                        Response::OperationOk
267                    } else {
268                        Response::Ignored
269                    }
270                }
271            }
272        })
273        .map_err(std::mem::drop);
274
275    let receiver = receiver.map_err(|_| panic!("sender terminated"));
276
277    let sink = sink.sink_map_err(std::mem::drop);
278
279    Box::new(stream
280        .select(receiver)
281        .forward(sink)
282        .map(std::mem::drop)
283        .map_err(std::mem::drop)
284        .then(move |result| { unsubscriber.unsubscribe_all(&sender_unsubscribe); result })
285    )
286}
287
288/// Creates a server with custom client stream.
289///
290/// This may be used if one wants control over how the messages are serialized.
291/// If you want to use the default serialization (length-delimited json encoding),
292/// use `serve()` function.
293pub fn custom<Incoming, Store, Executor, Logger, CommError>(server_params: ServerParams<Incoming, Store, Executor, Logger>) -> impl Future<Item=(), Error=HandlingError<Incoming::Error>> where
294    Incoming: Stream,
295    Incoming::Item: 'static + Stream<Item=dscfg_proto::Request, Error=CommError> + Sink<SinkItem=dscfg_proto::Response, SinkError=CommError> + Send,
296    Store: 'static + Storage + Clone + Send,
297    Executor: future::Executor<Box<'static + Future<Item=(), Error=()> + Send>>,
298    Logger: Into<slog::Logger>,
299    CommError: 'static {
300
301    let logger = server_params.logger.into();
302    let executor = server_params.executor;
303    let storage = server_params.storage;
304
305    let subscriptions = Subscriptions::new();
306    let (canceler, cancelable) = mpsc::unbounded();
307
308    let cancelable = cancelable
309        .into_future()
310        .then(|_: Result<(Option<()>, futures::sync::mpsc::UnboundedReceiver<()>), _>| -> Result<(), HandlingError<Incoming::Error>> { Ok(()) });
311
312    let server = server_params.incoming_clients
313        .map_err(HandlingError::AcceptError)
314        .for_each(move |client| {
315            let client = handle_client(client, subscriptions.clone(), storage.clone(), canceler.clone());
316
317            match executor.execute(client) {
318                Ok(_) => Ok(()),
319                Err(ref err) if err.kind() == future::ExecuteErrorKind::NoCapacity => {
320                    error!(logger, "failed to handle the client"; "cause" => "no capacity");
321                    // We'll just ignore this client - others might work
322                    Ok(())
323                },
324                Err(_) => {
325                    info!(logger, "shutting down");
326                    Err(HandlingError::Shutdown)
327                },
328            }
329        })
330        .select(cancelable)
331        .map(std::mem::drop)
332        .map_err(|(e, _)| e);
333    server
334}
335
336/// Creates default dscfg server.
337///
338/// This server uses length-delimited Json messages to transfer the data. Use `custom()` if you
339/// want to control encoding.
340pub fn serve<Incoming, Store, Executor, Logger>(server_params: ServerParams<Incoming, Store, Executor, Logger>) -> impl Future<Item=(), Error=HandlingError<Incoming::Error>> where
341    Incoming: Stream,
342    Incoming::Item: 'static + tokio_io::AsyncRead + tokio_io::AsyncWrite + Send,
343    Store: 'static + Storage + Clone + Send,
344    Executor: future::Executor<Box<'static + Future<Item=(), Error=()> + Send>>,
345    Logger: Into<slog::Logger> {
346
347    let incoming_clients = server_params.incoming_clients.map(move |stream| {
348        // Workaround for unsuitable deprecation message - see
349        // https://github.com/tokio-rs/tokio/issues/680
350        #[allow(deprecated)]
351        tokio_io::codec::length_delimited::Builder::new()
352            .native_endian()
353            .new_framed(stream)
354            .and_then(|message| serde_json::from_slice(&message).map_err(Into::into))
355            .with(|message| serde_json::to_vec(&message).map_err(io::Error::from))
356    });
357
358    let params = ServerParams {
359        incoming_clients,
360        storage: server_params.storage,
361        executor: server_params.executor,
362        logger: server_params.logger
363    };
364    custom(params)
365}