1use std::sync::Arc;
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4
5use serde_json;
6use futures::{self, future, Future};
7
8use calls::{RemoteProcedure, Metadata, RpcMethodSimple, RpcMethod, RpcNotificationSimple, RpcNotification};
9use middleware::{self, Middleware};
10use types::{Error, ErrorCode, Version};
11use types::{Request, Response, Call, Output};
12
13pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
15
16pub type FutureResult<F> = future::Map<
18 future::Either<future::FutureResult<Option<Response>, ()>, FutureRpcResult<F>>,
19 fn(Option<Response>) -> Option<String>,
20>;
21
22pub type FutureOutput = future::Either<
24 Box<Future<Item=Option<Output>, Error=()> + Send>,
25 future::FutureResult<Option<Output>, ()>,
26>;
27
28pub type FutureRpcResult<F> = future::Either<
30 F,
31 future::Either<
32 future::Map<
33 FutureOutput,
34 fn(Option<Output>) -> Option<Response>,
35 >,
36 future::Map<
37 future::JoinAll<Vec<FutureOutput>>,
38 fn(Vec<Option<Output>>) -> Option<Response>,
39 >,
40 >,
41>;
42
43#[derive(Debug, Clone, Copy)]
45pub enum Compatibility {
46 V1,
48 V2,
50 Both,
52}
53
54impl Default for Compatibility {
55 fn default() -> Self {
56 Compatibility::V2
57 }
58}
59
60impl Compatibility {
61 fn is_version_valid(&self, version: Option<Version>) -> bool {
62 match (*self, version) {
63 (Compatibility::V1, None) |
64 (Compatibility::V2, Some(Version::V2)) |
65 (Compatibility::Both, _) => true,
66 _ => false,
67 }
68 }
69
70 fn default_version(&self) -> Option<Version> {
71 match *self {
72 Compatibility::V1 => None,
73 Compatibility::V2 | Compatibility::Both => Some(Version::V2),
74 }
75 }
76}
77
78#[derive(Debug)]
82pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
83 middleware: S,
84 compatibility: Compatibility,
85 methods: HashMap<String, RemoteProcedure<T>>,
86}
87
88impl<T: Metadata> Default for MetaIoHandler<T> {
89 fn default() -> Self {
90 MetaIoHandler::with_compatibility(Default::default())
91 }
92}
93
94impl<T: Metadata> MetaIoHandler<T> {
95 pub fn with_compatibility(compatibility: Compatibility) -> Self {
97 MetaIoHandler {
98 compatibility: compatibility,
99 middleware: Default::default(),
100 methods: Default::default(),
101 }
102 }
103}
104
105
106impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
107 pub fn new(compatibility: Compatibility, middleware: S) -> Self {
109 MetaIoHandler {
110 compatibility: compatibility,
111 middleware: middleware,
112 methods: Default::default(),
113 }
114 }
115
116 pub fn with_middleware(middleware: S) -> Self {
118 MetaIoHandler {
119 compatibility: Default::default(),
120 middleware: middleware,
121 methods: Default::default(),
122 }
123 }
124
125 pub fn add_alias(&mut self, alias: &str, other: &str) {
127 self.methods.insert(
128 alias.into(),
129 RemoteProcedure::Alias(other.into()),
130 );
131 }
132
133 pub fn add_method<F>(&mut self, name: &str, method: F) where
135 F: RpcMethodSimple,
136 {
137 self.add_method_with_meta(name, move |params, _meta| {
138 method.call(params)
139 })
140 }
141
142 pub fn add_notification<F>(&mut self, name: &str, notification: F) where
144 F: RpcNotificationSimple,
145 {
146 self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
147 }
148
149 pub fn add_method_with_meta<F>(&mut self, name: &str, method: F) where
151 F: RpcMethod<T>,
152 {
153 self.methods.insert(
154 name.into(),
155 RemoteProcedure::Method(Arc::new(method)),
156 );
157 }
158
159 pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F) where
161 F: RpcNotification<T>,
162 {
163 self.methods.insert(
164 name.into(),
165 RemoteProcedure::Notification(Arc::new(notification)),
166 );
167 }
168
169 pub fn extend_with<F>(&mut self, methods: F) where
171 F: Into<HashMap<String, RemoteProcedure<T>>>
172 {
173 self.methods.extend(methods.into())
174 }
175
176 pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
180 self.handle_request(request, meta).wait().expect("Handler calls can never fail.")
181 }
182
183 pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future> {
185 use self::future::Either::{A, B};
186 fn as_string(response: Option<Response>) -> Option<String> {
187 let res = response.map(write_response);
188 debug!(target: "rpc", "Response: {}.", match res {
189 Some(ref res) => res,
190 None => "None",
191 });
192 res
193 }
194
195 trace!(target: "rpc", "Request: {}.", request);
196 let request = read_request(request);
197 let result = match request {
198 Err(error) => A(futures::finished(Some(Response::from(error, self.compatibility.default_version())))),
199 Ok(request) => B(self.handle_rpc_request(request, meta)),
200 };
201
202 result.map(as_string)
203 }
204
205 pub fn handle_rpc_request(&self, request: Request, meta: T) -> FutureRpcResult<S::Future> {
207 use self::future::Either::{A, B};
208
209 fn output_as_response(output: Option<Output>) -> Option<Response> {
210 output.map(Response::Single)
211 }
212
213 fn outputs_as_batch(outs: Vec<Option<Output>>) -> Option<Response> {
214 let outs: Vec<_> = outs.into_iter().filter_map(|v| v).collect();
215 if outs.is_empty() {
216 None
217 } else {
218 Some(Response::Batch(outs))
219 }
220 }
221
222 self.middleware.on_request(request, meta, |request, meta| match request {
223 Request::Single(call) => {
224 A(self.handle_call(call, meta).map(output_as_response as fn(Option<Output>) ->
225 Option<Response>))
226 },
227 Request::Batch(calls) => {
228 let futures: Vec<_> = calls.into_iter().map(move |call| self.handle_call(call, meta.clone())).collect();
229 B(futures::future::join_all(futures).map(outputs_as_batch as fn(Vec<Option<Output>>) ->
230 Option<Response>))
231 },
232 })
233 }
234
235 pub fn handle_call(&self, call: Call, meta: T) -> FutureOutput {
237 use self::future::Either::{A, B};
238
239 match call {
240 Call::MethodCall(method) => {
241 let params = method.params;
242 let id = method.id;
243 let jsonrpc = method.jsonrpc;
244 let valid_version = self.compatibility.is_version_valid(jsonrpc);
245
246 let call_method = |method: &Arc<RpcMethod<T>>| {
247 let method = method.clone();
248 futures::lazy(move || method.call(params, meta))
249 };
250
251 let result = match (valid_version, self.methods.get(&method.method)) {
252 (false, _) => Err(Error::invalid_version()),
253 (true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
254 (true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
255 Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
256 _ => Err(Error::method_not_found()),
257 },
258 (true, _) => Err(Error::method_not_found()),
259 };
260
261 match result {
262 Ok(result) => A(Box::new(
263 result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
264 )),
265 Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
266 }
267 },
268 Call::Notification(notification) => {
269 let params = notification.params;
270 let jsonrpc = notification.jsonrpc;
271 if !self.compatibility.is_version_valid(jsonrpc) {
272 return B(futures::finished(None));
273 }
274
275 match self.methods.get(¬ification.method) {
276 Some(&RemoteProcedure::Notification(ref notification)) => {
277 notification.execute(params, meta);
278 },
279 Some(&RemoteProcedure::Alias(ref alias)) => {
280 if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
281 notification.execute(params, meta);
282 }
283 },
284 _ => {},
285 }
286
287 B(futures::finished(None))
288 },
289 Call::Invalid { id } => {
290 B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
291 },
292 }
293 }
294}
295
296#[derive(Debug, Default)]
298pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
299
300impl IoHandler {
302 pub fn new() -> Self {
304 IoHandler::default()
305 }
306
307 pub fn with_compatibility(compatibility: Compatibility) -> Self {
309 IoHandler(MetaIoHandler::with_compatibility(compatibility))
310 }
311}
312
313impl<M: Metadata + Default> IoHandler<M> {
314 pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse> {
316 self.0.handle_request(request, M::default())
317 }
318
319 pub fn handle_rpc_request(&self, request: Request) -> FutureRpcResult<FutureResponse> {
321 self.0.handle_rpc_request(request, M::default())
322 }
323
324 pub fn handle_call(&self, call: Call) -> FutureOutput {
326 self.0.handle_call(call, M::default())
327 }
328
329 pub fn handle_request_sync(&self, request: &str) -> Option<String> {
333 self.0.handle_request_sync(request, M::default())
334 }
335}
336
337impl<M: Metadata> Deref for IoHandler<M> {
338 type Target = MetaIoHandler<M>;
339
340 fn deref(&self) -> &Self::Target {
341 &self.0
342 }
343}
344
345impl<M: Metadata> DerefMut for IoHandler<M> {
346 fn deref_mut(&mut self) -> &mut Self::Target {
347 &mut self.0
348 }
349}
350
351impl From<IoHandler> for MetaIoHandler<()> {
352 fn from(io: IoHandler) -> Self {
353 io.0
354 }
355}
356
357fn read_request(request_str: &str) -> Result<Request, Error> {
358 serde_json::from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
359}
360
361fn write_response(response: Response) -> String {
362 serde_json::to_string(&response).unwrap()
364}
365
366#[cfg(test)]
367mod tests {
368 use futures;
369 use types::{Value};
370 use super::{IoHandler, Compatibility};
371
372 #[test]
373 fn test_io_handler() {
374 let mut io = IoHandler::new();
375
376 io.add_method("say_hello", |_| {
377 Ok(Value::String("hello".to_string()))
378 });
379
380 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
381 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
382
383 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
384 }
385
386 #[test]
387 fn test_io_handler_1dot0() {
388 let mut io = IoHandler::with_compatibility(Compatibility::Both);
389
390 io.add_method("say_hello", |_| {
391 Ok(Value::String("hello".to_string()))
392 });
393
394 let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
395 let response = r#"{"result":"hello","id":1}"#;
396
397 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
398 }
399
400 #[test]
401 fn test_async_io_handler() {
402 let mut io = IoHandler::new();
403
404 io.add_method("say_hello", |_| {
405 futures::finished(Value::String("hello".to_string()))
406 });
407
408 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
409 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
410
411 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
412 }
413
414 #[test]
415 fn test_notification() {
416 use std::sync::Arc;
417 use std::sync::atomic;
418
419 let mut io = IoHandler::new();
420
421 let called = Arc::new(atomic::AtomicBool::new(false));
422 let c = called.clone();
423 io.add_notification("say_hello", move |_| {
424 c.store(true, atomic::Ordering::SeqCst);
425 });
426 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
427
428 assert_eq!(io.handle_request_sync(request), None);
429 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
430 }
431
432 #[test]
433 fn test_method_not_found() {
434 let io = IoHandler::new();
435
436 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
437 let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
438
439 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
440 }
441
442 #[test]
443 fn test_method_alias() {
444 let mut io = IoHandler::new();
445 io.add_method("say_hello", |_| {
446 Ok(Value::String("hello".to_string()))
447 });
448 io.add_alias("say_hello_alias", "say_hello");
449
450
451 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
452 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
453
454 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
455 }
456
457 #[test]
458 fn test_notification_alias() {
459 use std::sync::Arc;
460 use std::sync::atomic;
461
462 let mut io = IoHandler::new();
463
464 let called = Arc::new(atomic::AtomicBool::new(false));
465 let c = called.clone();
466 io.add_notification("say_hello", move |_| {
467 c.store(true, atomic::Ordering::SeqCst);
468 });
469 io.add_alias("say_hello_alias", "say_hello");
470
471 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
472 assert_eq!(io.handle_request_sync(request), None);
473 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
474 }
475
476 #[test]
477 fn test_send_sync() {
478 fn is_send_sync<T>(_obj: T) -> bool where
479 T: Send + Sync
480 {
481 true
482 }
483
484 let io = IoHandler::new();
485
486 assert!(is_send_sync(io))
487 }
488}