1extern crate futures;
2extern crate tokio_io;
3extern crate dscfg_proto;
4extern crate void;
5extern crate same;
6#[macro_use]
7extern crate slog;
8extern crate serde_json;
9
10pub use dscfg_proto::json;
11
12use futures::sync::mpsc::{self, UnboundedSender};
13use futures::{Future, Stream, Sink};
14use futures::future;
15use void::Void;
16use std::collections::{HashMap, HashSet};
17use std::sync::{Arc, Mutex};
18use same::RefCmp;
19use std::sync::RwLock;
20use std::io;
21
22#[derive(Clone)]
23struct Subscriptions(Arc<RwLock<HashMap<String, HashSet<RefCmp<Arc<mpsc::UnboundedSender<dscfg_proto::Response>>>>>>>);
24
25impl Subscriptions {
26 fn new() -> Self {
27 Subscriptions(Default::default())
28 }
29
30 fn subscribe(&self, client: &Arc<mpsc::UnboundedSender<dscfg_proto::Response>>, key: String) -> bool {
31 let mut subscriptions = self.0.write().unwrap();
32 let client = RefCmp(Arc::clone(client));
33
34 subscriptions.entry(key).or_insert_with(HashSet::new).insert(client)
35 }
36
37 fn unsubscribe(&self, client: &Arc<mpsc::UnboundedSender<dscfg_proto::Response>>, key: &str) -> bool {
38 let mut subscriptions = self.0.write().unwrap();
39 let client = RefCmp(Arc::clone(client));
40
41 if let Some(subscriptions) = subscriptions.get_mut(key) {
42 subscriptions.remove(&client)
43 } else {
44 false
45 }
46 }
47
48 fn unsubscribe_all(&self, client: &Arc<mpsc::UnboundedSender<dscfg_proto::Response>>) {
49 let mut subscriptions = self.0.write().unwrap();
50 let client = RefCmp(Arc::clone(client));
51
52 for (_, subscriptions) in &mut *subscriptions {
53 subscriptions.remove(&client);
54 }
55 }
56
57 fn broadcast(&self, key: String, value: json::Value) {
58 use dscfg_proto::Response;
59
60 let subscriptions = self.0.read().unwrap();
61
62 if let Some(subscriptions) = subscriptions.get(&key) {
63 for subscription in subscriptions {
64 subscription
65 .unbounded_send(Response::Value { key: key.clone(), value: value.clone() })
66 .unwrap()
68 }
69 }
70 }
71}
72
73pub trait IsFatalError {
77 fn is_fatal(&self) -> bool;
79}
80
81impl IsFatalError for Void {
82 fn is_fatal(&self) -> bool {
83 match *self {}
84 }
85}
86
87pub trait Storage {
92 type SetError: IsFatalError;
94 type GetError: IsFatalError;
96
97 fn set(&mut self, key: String, value: json::Value) -> Result<(), Self::SetError>;
100
101 fn get(&mut self, key: &str) -> Result<Option<json::Value>, Self::GetError>;
103}
104
105impl<T: Storage + ?Sized> Storage for Box<T> {
106 type SetError = T::SetError;
107 type GetError = T::GetError;
108
109 fn set(&mut self, key: String, value: json::Value) -> Result<(), Self::SetError> {
110 (**self).set(key, value)
111 }
112
113 fn get(&mut self, key: &str) -> Result<Option<json::Value>, Self::GetError> {
114 (**self).get(key)
115 }
116}
117
118pub enum SyncOpResult<T> {
120 Poisoned,
122 Other(T),
124}
125
126impl<T: IsFatalError> IsFatalError for SyncOpResult<T> {
127 fn is_fatal(&self) -> bool {
128 match self {
129 SyncOpResult::Poisoned => true,
130 SyncOpResult::Other(err) => err.is_fatal(),
131 }
132 }
133}
134
135impl<T> Storage for Arc<Mutex<T>> where T: Storage + ?Sized {
136 type SetError = SyncOpResult<T::SetError>;
137 type GetError = SyncOpResult<T::GetError>;
138
139 fn set(&mut self, key: String, value: json::Value) -> Result<(), Self::SetError> {
140 self.lock()
141 .map_err(|_| SyncOpResult::Poisoned)?
142 .set(key, value)
143 .map_err(SyncOpResult::Other)
144 }
145
146 fn get(&mut self, key: &str) -> Result<Option<json::Value>, Self::GetError> {
147 self.lock()
148 .map_err(|_| SyncOpResult::Poisoned)?
149 .get(key)
150 .map_err(SyncOpResult::Other)
151 }
152}
153
154pub struct ServerParams<Incoming, Store, Executor, Logger> where
159 Incoming: Stream,
160 Store: Storage + Clone + Send,
161 Executor: future::Executor<Box<'static + Future<Item=(), Error=()> + Send>>,
162 Logger: Into<slog::Logger> {
163
164 pub incoming_clients: Incoming,
166 pub storage: Store,
168 pub executor: Executor,
170 pub logger: Logger,
172}
173
174pub struct DiscardLogs;
176
177impl From<DiscardLogs> for slog::Logger {
178 fn from(_: DiscardLogs) -> Self {
179 slog::Logger::root(slog::Discard, o!())
180 }
181}
182
183#[derive(Debug, Clone, Eq, PartialEq, Hash)]
185pub enum HandlingError<E> {
186 AcceptError(E),
188 Shutdown,
190}
191
192fn handle_client<Client, Store, Error>(client: Client, subscriptions: Subscriptions, mut storage: Store, canceler: UnboundedSender<()>) -> Box<'static + Future<Item=(), Error=()> + Send> where
193 Client: 'static + Stream<Item=dscfg_proto::Request, Error=Error> + Sink<SinkItem=dscfg_proto::Response, SinkError=Error> + Send,
194 Store: 'static + Storage + Send,
195 Error: 'static {
196
197 use dscfg_proto::{Request, Response};
198
199 let (sender, receiver) = mpsc::unbounded();
200 let sender = Arc::new(sender);
201 let unsubscriber = subscriptions.clone();
202 let sender_unsubscribe = sender.clone();
203
204 let (sink, stream) = client.split();
205
206 let stream = stream
207 .map(move |request| {
208 match request {
209 Request::Set { key, value } => {
210 match storage.set(key.clone(), value.clone()) {
211 Ok(_) => {
212 subscriptions.broadcast(key, value);
213 Response::OperationOk
214 },
215 Err(err) => {
216 if err.is_fatal() {
217 let _ = canceler.unbounded_send(());
218 }
219 Response::OperationFailed
220 },
221 }
222 },
223 Request::Get { key } => {
224 match storage.get(&key) {
225 Ok(Some(value)) => {
226 Response::Value { key, value }
227 },
228 Ok(None) => Response::Value { key, value: json::Value::Null },
229 Err(err) => {
230 if err.is_fatal() {
231 let _ = canceler.unbounded_send(());
232 }
233 Response::OperationFailed
234 },
235 }
236 },
237 Request::Subscribe { key, notify_now } => {
238 if notify_now {
239 match storage.get(&key) {
240 Ok(Some(value)) => {
241 let notification = Response::Value {
242 key: key.clone(),
243 value: value,
244 };
245 sender.unbounded_send(notification).unwrap();
246 },
247 Ok(None) => {
248 let notification = Response::Value {
249 key: key.clone(),
250 value: json::Value::Null,
251 };
252 sender.unbounded_send(notification).unwrap();
253 },
254 Err(_) => return Response::OperationFailed,
255 }
256 }
257
258 if subscriptions.subscribe(&sender, key) {
259 Response::OperationOk
260 } else {
261 Response::Ignored
262 }
263 },
264 Request::Unsubscribe { key } => {
265 if subscriptions.unsubscribe(&sender, &key) {
266 Response::OperationOk
267 } else {
268 Response::Ignored
269 }
270 }
271 }
272 })
273 .map_err(std::mem::drop);
274
275 let receiver = receiver.map_err(|_| panic!("sender terminated"));
276
277 let sink = sink.sink_map_err(std::mem::drop);
278
279 Box::new(stream
280 .select(receiver)
281 .forward(sink)
282 .map(std::mem::drop)
283 .map_err(std::mem::drop)
284 .then(move |result| { unsubscriber.unsubscribe_all(&sender_unsubscribe); result })
285 )
286}
287
288pub fn custom<Incoming, Store, Executor, Logger, CommError>(server_params: ServerParams<Incoming, Store, Executor, Logger>) -> impl Future<Item=(), Error=HandlingError<Incoming::Error>> where
294 Incoming: Stream,
295 Incoming::Item: 'static + Stream<Item=dscfg_proto::Request, Error=CommError> + Sink<SinkItem=dscfg_proto::Response, SinkError=CommError> + Send,
296 Store: 'static + Storage + Clone + Send,
297 Executor: future::Executor<Box<'static + Future<Item=(), Error=()> + Send>>,
298 Logger: Into<slog::Logger>,
299 CommError: 'static {
300
301 let logger = server_params.logger.into();
302 let executor = server_params.executor;
303 let storage = server_params.storage;
304
305 let subscriptions = Subscriptions::new();
306 let (canceler, cancelable) = mpsc::unbounded();
307
308 let cancelable = cancelable
309 .into_future()
310 .then(|_: Result<(Option<()>, futures::sync::mpsc::UnboundedReceiver<()>), _>| -> Result<(), HandlingError<Incoming::Error>> { Ok(()) });
311
312 let server = server_params.incoming_clients
313 .map_err(HandlingError::AcceptError)
314 .for_each(move |client| {
315 let client = handle_client(client, subscriptions.clone(), storage.clone(), canceler.clone());
316
317 match executor.execute(client) {
318 Ok(_) => Ok(()),
319 Err(ref err) if err.kind() == future::ExecuteErrorKind::NoCapacity => {
320 error!(logger, "failed to handle the client"; "cause" => "no capacity");
321 Ok(())
323 },
324 Err(_) => {
325 info!(logger, "shutting down");
326 Err(HandlingError::Shutdown)
327 },
328 }
329 })
330 .select(cancelable)
331 .map(std::mem::drop)
332 .map_err(|(e, _)| e);
333 server
334}
335
336pub fn serve<Incoming, Store, Executor, Logger>(server_params: ServerParams<Incoming, Store, Executor, Logger>) -> impl Future<Item=(), Error=HandlingError<Incoming::Error>> where
341 Incoming: Stream,
342 Incoming::Item: 'static + tokio_io::AsyncRead + tokio_io::AsyncWrite + Send,
343 Store: 'static + Storage + Clone + Send,
344 Executor: future::Executor<Box<'static + Future<Item=(), Error=()> + Send>>,
345 Logger: Into<slog::Logger> {
346
347 let incoming_clients = server_params.incoming_clients.map(move |stream| {
348 #[allow(deprecated)]
351 tokio_io::codec::length_delimited::Builder::new()
352 .native_endian()
353 .new_framed(stream)
354 .and_then(|message| serde_json::from_slice(&message).map_err(Into::into))
355 .with(|message| serde_json::to_vec(&message).map_err(io::Error::from))
356 });
357
358 let params = ServerParams {
359 incoming_clients,
360 storage: server_params.storage,
361 executor: server_params.executor,
362 logger: server_params.logger
363 };
364 custom(params)
365}