1use std::sync::Arc;
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4
5use serde_json;
6use futures::{self, future, Future};
7
8use calls::{RemoteProcedure, Metadata, RpcMethodSimple, RpcMethod, RpcNotificationSimple, RpcNotification};
9use middleware::{self, Middleware};
10use types::{Error, ErrorCode, Version};
11use types::{Request, Response, Call, Output};
12
13pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
15
16pub type FutureOutput = Box<Future<Item=Option<Output>, Error=()> + Send>;
18
19pub type FutureResult<F, G> = future::Map<
21 future::Either<future::FutureResult<Option<Response>, ()>, FutureRpcResult<F, G>>,
22 fn(Option<Response>) -> Option<String>,
23>;
24
25pub type FutureRpcOutput<F> = future::Either<
27 F,
28 future::Either<
29 FutureOutput,
30 future::FutureResult<Option<Output>, ()>,
31 >,
32>;
33
34pub type FutureRpcResult<F, G> = future::Either<
36 F,
37 future::Either<
38 future::Map<
39 FutureRpcOutput<G>,
40 fn(Option<Output>) -> Option<Response>,
41 >,
42 future::Map<
43 future::JoinAll<Vec<FutureRpcOutput<G>>>,
44 fn(Vec<Option<Output>>) -> Option<Response>,
45 >,
46 >,
47>;
48
49#[derive(Debug, Clone, Copy)]
51pub enum Compatibility {
52 V1,
54 V2,
56 Both,
58}
59
60impl Default for Compatibility {
61 fn default() -> Self {
62 Compatibility::V2
63 }
64}
65
66impl Compatibility {
67 fn is_version_valid(&self, version: Option<Version>) -> bool {
68 match (*self, version) {
69 (Compatibility::V1, None) |
70 (Compatibility::V2, Some(Version::V2)) |
71 (Compatibility::Both, _) => true,
72 _ => false,
73 }
74 }
75
76 fn default_version(&self) -> Option<Version> {
77 match *self {
78 Compatibility::V1 => None,
79 Compatibility::V2 | Compatibility::Both => Some(Version::V2),
80 }
81 }
82}
83
84#[derive(Debug)]
88pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
89 middleware: S,
90 compatibility: Compatibility,
91 methods: HashMap<String, RemoteProcedure<T>>,
92}
93
94impl<T: Metadata> Default for MetaIoHandler<T> {
95 fn default() -> Self {
96 MetaIoHandler::with_compatibility(Default::default())
97 }
98}
99
100impl<T: Metadata> MetaIoHandler<T> {
101 pub fn with_compatibility(compatibility: Compatibility) -> Self {
103 MetaIoHandler {
104 compatibility: compatibility,
105 middleware: Default::default(),
106 methods: Default::default(),
107 }
108 }
109}
110
111
112impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
113 pub fn new(compatibility: Compatibility, middleware: S) -> Self {
115 MetaIoHandler {
116 compatibility: compatibility,
117 middleware: middleware,
118 methods: Default::default(),
119 }
120 }
121
122 pub fn with_middleware(middleware: S) -> Self {
124 MetaIoHandler {
125 compatibility: Default::default(),
126 middleware: middleware,
127 methods: Default::default(),
128 }
129 }
130
131 pub fn add_alias(&mut self, alias: &str, other: &str) {
133 self.methods.insert(
134 alias.into(),
135 RemoteProcedure::Alias(other.into()),
136 );
137 }
138
139 pub fn add_method<F>(&mut self, name: &str, method: F) where
141 F: RpcMethodSimple,
142 {
143 self.add_method_with_meta(name, move |params, _meta| {
144 method.call(params)
145 })
146 }
147
148 pub fn add_notification<F>(&mut self, name: &str, notification: F) where
150 F: RpcNotificationSimple,
151 {
152 self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
153 }
154
155 pub fn add_method_with_meta<F>(&mut self, name: &str, method: F) where
157 F: RpcMethod<T>,
158 {
159 self.methods.insert(
160 name.into(),
161 RemoteProcedure::Method(Arc::new(method)),
162 );
163 }
164
165 pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F) where
167 F: RpcNotification<T>,
168 {
169 self.methods.insert(
170 name.into(),
171 RemoteProcedure::Notification(Arc::new(notification)),
172 );
173 }
174
175 pub fn extend_with<F>(&mut self, methods: F) where
177 F: Into<HashMap<String, RemoteProcedure<T>>>
178 {
179 self.methods.extend(methods.into())
180 }
181
182 pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
186 self.handle_request(request, meta).wait().expect("Handler calls can never fail.")
187 }
188
189 pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future, S::CallFuture> {
191 use self::future::Either::{A, B};
192 fn as_string(response: Option<Response>) -> Option<String> {
193 let res = response.map(write_response);
194 debug!(target: "rpc", "Response: {}.", match res {
195 Some(ref res) => res,
196 None => "None",
197 });
198 res
199 }
200
201 trace!(target: "rpc", "Request: {}.", request);
202 let request = read_request(request);
203 let result = match request {
204 Err(error) => A(futures::finished(Some(Response::from(error, self.compatibility.default_version())))),
205 Ok(request) => B(self.handle_rpc_request(request, meta)),
206 };
207
208 result.map(as_string)
209 }
210
211 pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future, S::CallFuture> {
213 use self::future::Either::{A, B};
214
215 fn output_as_response(output: Option<Output>) -> Option<Response> {
216 output.map(Response::Single)
217 }
218
219 fn outputs_as_batch(outs: Vec<Option<Output>>) -> Option<Response> {
220 let outs: Vec<_> = outs.into_iter().filter_map(|v| v).collect();
221 if outs.is_empty() {
222 None
223 } else {
224 Some(Response::Batch(outs))
225 }
226 }
227
228 self.middleware.on_request(request, meta, |request, meta| match request {
229 Request::Single(call) => {
230 A(self.handle_call(call, meta).map(output_as_response as fn(Option<Output>) ->
231 Option<Response>))
232 },
233 Request::Batch(calls) => {
234 let futures: Vec<_> = calls.into_iter().map(move |call| self.handle_call(call, meta.clone())).collect();
235 B(futures::future::join_all(futures).map(outputs_as_batch as fn(Vec<Option<Output>>) ->
236 Option<Response>))
237 },
238 })
239 }
240
241 pub fn handle_call(&self, call: Call, meta: T) -> FutureRpcOutput<S::CallFuture> {
243 use self::future::Either::{A, B};
244
245 self.middleware.on_call(call, meta, |call, meta| match call {
246 Call::MethodCall(method) => {
247 let params = method.params;
248 let id = method.id;
249 let jsonrpc = method.jsonrpc;
250 let valid_version = self.compatibility.is_version_valid(jsonrpc);
251
252 let call_method = |method: &Arc<RpcMethod<T>>| {
253 let method = method.clone();
254 futures::lazy(move || method.call(params, meta))
255 };
256
257 let result = match (valid_version, self.methods.get(&method.method)) {
258 (false, _) => Err(Error::invalid_version()),
259 (true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
260 (true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
261 Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
262 _ => Err(Error::method_not_found()),
263 },
264 (true, _) => Err(Error::method_not_found()),
265 };
266
267 match result {
268 Ok(result) => A(Box::new(
269 result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
270 ) as _),
271 Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
272 }
273 },
274 Call::Notification(notification) => {
275 let params = notification.params;
276 let jsonrpc = notification.jsonrpc;
277 if !self.compatibility.is_version_valid(jsonrpc) {
278 return B(futures::finished(None));
279 }
280
281 match self.methods.get(¬ification.method) {
282 Some(&RemoteProcedure::Notification(ref notification)) => {
283 notification.execute(params, meta);
284 },
285 Some(&RemoteProcedure::Alias(ref alias)) => {
286 if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
287 notification.execute(params, meta);
288 }
289 },
290 _ => {},
291 }
292
293 B(futures::finished(None))
294 },
295 Call::Invalid { id } => {
296 B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
297 },
298 })
299 }
300}
301
302#[derive(Debug, Default)]
304pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
305
306impl IoHandler {
308 pub fn new() -> Self {
310 IoHandler::default()
311 }
312
313 pub fn with_compatibility(compatibility: Compatibility) -> Self {
315 IoHandler(MetaIoHandler::with_compatibility(compatibility))
316 }
317}
318
319impl<M: Metadata + Default> IoHandler<M> {
320 pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse, FutureOutput> {
322 self.0.handle_request(request, M::default())
323 }
324
325 pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse, FutureOutput> {
327 self.0.handle_rpc_request(request, M::default())
328 }
329
330 pub fn handle_call(&self, call: Call) -> FutureRpcOutput<FutureOutput> {
332 self.0.handle_call(call, M::default())
333 }
334
335 pub fn handle_request_sync(&self, request: &str) -> Option<String> {
339 self.0.handle_request_sync(request, M::default())
340 }
341}
342
343impl<M: Metadata> Deref for IoHandler<M> {
344 type Target = MetaIoHandler<M>;
345
346 fn deref(&self) -> &Self::Target {
347 &self.0
348 }
349}
350
351impl<M: Metadata> DerefMut for IoHandler<M> {
352 fn deref_mut(&mut self) -> &mut Self::Target {
353 &mut self.0
354 }
355}
356
357impl From<IoHandler> for MetaIoHandler<()> {
358 fn from(io: IoHandler) -> Self {
359 io.0
360 }
361}
362
363fn read_request(request_str: &str) -> Result<Request, Error> {
364 serde_json::from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
365}
366
367fn write_response(response: Response) -> String {
368 serde_json::to_string(&response).unwrap()
370}
371
372#[cfg(test)]
373mod tests {
374 use futures;
375 use types::{Value};
376 use super::{IoHandler, Compatibility};
377
378 #[test]
379 fn test_io_handler() {
380 let mut io = IoHandler::new();
381
382 io.add_method("say_hello", |_| {
383 Ok(Value::String("hello".to_string()))
384 });
385
386 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
387 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
388
389 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
390 }
391
392 #[test]
393 fn test_io_handler_1dot0() {
394 let mut io = IoHandler::with_compatibility(Compatibility::Both);
395
396 io.add_method("say_hello", |_| {
397 Ok(Value::String("hello".to_string()))
398 });
399
400 let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
401 let response = r#"{"result":"hello","id":1}"#;
402
403 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
404 }
405
406 #[test]
407 fn test_async_io_handler() {
408 let mut io = IoHandler::new();
409
410 io.add_method("say_hello", |_| {
411 futures::finished(Value::String("hello".to_string()))
412 });
413
414 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
415 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
416
417 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
418 }
419
420 #[test]
421 fn test_notification() {
422 use std::sync::Arc;
423 use std::sync::atomic;
424
425 let mut io = IoHandler::new();
426
427 let called = Arc::new(atomic::AtomicBool::new(false));
428 let c = called.clone();
429 io.add_notification("say_hello", move |_| {
430 c.store(true, atomic::Ordering::SeqCst);
431 });
432 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
433
434 assert_eq!(io.handle_request_sync(request), None);
435 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
436 }
437
438 #[test]
439 fn test_method_not_found() {
440 let io = IoHandler::new();
441
442 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
443 let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
444
445 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
446 }
447
448 #[test]
449 fn test_method_alias() {
450 let mut io = IoHandler::new();
451 io.add_method("say_hello", |_| {
452 Ok(Value::String("hello".to_string()))
453 });
454 io.add_alias("say_hello_alias", "say_hello");
455
456
457 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
458 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
459
460 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
461 }
462
463 #[test]
464 fn test_notification_alias() {
465 use std::sync::Arc;
466 use std::sync::atomic;
467
468 let mut io = IoHandler::new();
469
470 let called = Arc::new(atomic::AtomicBool::new(false));
471 let c = called.clone();
472 io.add_notification("say_hello", move |_| {
473 c.store(true, atomic::Ordering::SeqCst);
474 });
475 io.add_alias("say_hello_alias", "say_hello");
476
477 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
478 assert_eq!(io.handle_request_sync(request), None);
479 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
480 }
481
482 #[test]
483 fn test_send_sync() {
484 fn is_send_sync<T>(_obj: T) -> bool where
485 T: Send + Sync
486 {
487 true
488 }
489
490 let io = IoHandler::new();
491
492 assert!(is_send_sync(io))
493 }
494}