1use std::sync::Arc;
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4
5use serde_json;
6use futures::{self, future, Future};
7
8use calls::{RemoteProcedure, Metadata, RpcMethodSimple, RpcMethod, RpcNotificationSimple, RpcNotification};
9use middleware::{self, Middleware};
10use types::{Params, Error, ErrorCode, Version};
11use types::{Request, Response, Call, Output};
12
13pub type FutureResponse = Box<Future<Item=Option<Response>, Error=()> + Send>;
15
16pub type FutureResult<F> = future::Map<
18 future::Either<future::FutureResult<Option<Response>, ()>, F>,
19 fn(Option<Response>) -> Option<String>,
20>;
21
22pub type FutureOutput = future::Either<
24 Box<Future<Item=Option<Output>, Error=()> + Send>,
25 future::FutureResult<Option<Output>, ()>,
26>;
27
28#[derive(Debug, Clone, Copy)]
30pub enum Compatibility {
31 V1,
33 V2,
35 Both,
37}
38
39impl Default for Compatibility {
40 fn default() -> Self {
41 Compatibility::V2
42 }
43}
44
45impl Compatibility {
46 fn is_version_valid(&self, version: Option<Version>) -> bool {
47 match (*self, version) {
48 (Compatibility::V1, None) |
49 (Compatibility::V2, Some(Version::V2)) |
50 (Compatibility::Both, _) => true,
51 _ => false,
52 }
53 }
54
55 fn default_version(&self) -> Option<Version> {
56 match *self {
57 Compatibility::V1 => None,
58 Compatibility::V2 | Compatibility::Both => Some(Version::V2),
59 }
60 }
61}
62
63#[derive(Debug)]
67pub struct MetaIoHandler<T: Metadata, S: Middleware<T> = middleware::Noop> {
68 middleware: S,
69 compatibility: Compatibility,
70 methods: HashMap<String, RemoteProcedure<T>>,
71}
72
73impl<T: Metadata> Default for MetaIoHandler<T> {
74 fn default() -> Self {
75 MetaIoHandler::with_compatibility(Default::default())
76 }
77}
78
79impl<T: Metadata> MetaIoHandler<T> {
80 pub fn with_compatibility(compatibility: Compatibility) -> Self {
82 MetaIoHandler {
83 compatibility: compatibility,
84 middleware: Default::default(),
85 methods: Default::default(),
86 }
87 }
88}
89
90
91impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
92 pub fn new(compatibility: Compatibility, middleware: S) -> Self {
94 MetaIoHandler {
95 compatibility: compatibility,
96 middleware: middleware,
97 methods: Default::default(),
98 }
99 }
100
101 pub fn with_middleware(middleware: S) -> Self {
103 MetaIoHandler {
104 compatibility: Default::default(),
105 middleware: middleware,
106 methods: Default::default(),
107 }
108 }
109
110 pub fn add_alias(&mut self, alias: &str, other: &str) {
112 self.methods.insert(
113 alias.into(),
114 RemoteProcedure::Alias(other.into()),
115 );
116 }
117
118 pub fn add_method<F>(&mut self, name: &str, method: F) where
120 F: RpcMethodSimple,
121 {
122 self.add_method_with_meta(name, move |params, _meta| {
123 method.call(params)
124 })
125 }
126
127 pub fn add_notification<F>(&mut self, name: &str, notification: F) where
129 F: RpcNotificationSimple,
130 {
131 self.add_notification_with_meta(name, move |params, _meta| notification.execute(params))
132 }
133
134 pub fn add_method_with_meta<F>(&mut self, name: &str, method: F) where
136 F: RpcMethod<T>,
137 {
138 self.methods.insert(
139 name.into(),
140 RemoteProcedure::Method(Arc::new(method)),
141 );
142 }
143
144 pub fn add_notification_with_meta<F>(&mut self, name: &str, notification: F) where
146 F: RpcNotification<T>,
147 {
148 self.methods.insert(
149 name.into(),
150 RemoteProcedure::Notification(Arc::new(notification)),
151 );
152 }
153
154 pub fn extend_with<F>(&mut self, methods: F) where
156 F: Into<HashMap<String, RemoteProcedure<T>>>
157 {
158 self.methods.extend(methods.into())
159 }
160
161 pub fn handle_request_sync(&self, request: &str, meta: T) -> Option<String> {
165 self.handle_request(request, meta).wait().expect("Handler calls can never fail.")
166 }
167
168 pub fn handle_request(&self, request: &str, meta: T) -> FutureResult<S::Future> {
170 use self::future::Either::{A, B};
171 fn as_string(response: Option<Response>) -> Option<String> {
172 let res = response.map(write_response);
173 debug!(target: "rpc", "Response: {}.", match res {
174 Some(ref res) => res,
175 None => "None",
176 });
177 res
178 }
179
180 trace!(target: "rpc", "Request: {}.", request);
181 let request = read_request(request);
182 let result = match request {
183 Err(error) => A(futures::finished(Some(Response::from(error, self.compatibility.default_version())))),
184 Ok(request) => B(self.handle_rpc_request(request, meta)),
185 };
186
187 result.map(as_string)
188 }
189
190 pub fn handle_rpc_request(&self, request: Request, meta: T) -> S::Future {
192 use self::future::Either::{A, B};
193
194 self.middleware.on_request(request, meta, |request, meta| match request {
195 Request::Single(call) => {
196 A(self.handle_call(call, meta).map(|output| output.map(Response::Single)))
197 },
198 Request::Batch(calls) => {
199 let futures: Vec<_> = calls.into_iter().map(move |call| self.handle_call(call, meta.clone())).collect();
200 B(futures::future::join_all(futures).map(|outs| {
201 let outs: Vec<_> = outs.into_iter().filter_map(|v| v).collect();
202 if outs.is_empty() {
203 None
204 } else {
205 Some(Response::Batch(outs))
206 }
207 }))
208 },
209 })
210 }
211
212 pub fn handle_call(&self, call: Call, meta: T) -> FutureOutput {
214 use self::future::Either::{A, B};
215
216 match call {
217 Call::MethodCall(method) => {
218 let params = method.params.unwrap_or(Params::None);
219 let id = method.id;
220 let jsonrpc = method.jsonrpc;
221 let valid_version = self.compatibility.is_version_valid(jsonrpc);
222
223 let call_method = |method: &Arc<RpcMethod<T>>| {
224 let method = method.clone();
225 futures::lazy(move || method.call(params, meta))
226 };
227
228 let result = match (valid_version, self.methods.get(&method.method)) {
229 (false, _) => Err(Error::invalid_version()),
230 (true, Some(&RemoteProcedure::Method(ref method))) => Ok(call_method(method)),
231 (true, Some(&RemoteProcedure::Alias(ref alias))) => match self.methods.get(alias) {
232 Some(&RemoteProcedure::Method(ref method)) => Ok(call_method(method)),
233 _ => Err(Error::method_not_found()),
234 },
235 (true, _) => Err(Error::method_not_found()),
236 };
237
238 match result {
239 Ok(result) => A(Box::new(
240 result.then(move |result| futures::finished(Some(Output::from(result, id, jsonrpc))))
241 )),
242 Err(err) => B(futures::finished(Some(Output::from(Err(err), id, jsonrpc)))),
243 }
244 },
245 Call::Notification(notification) => {
246 let params = notification.params.unwrap_or(Params::None);
247 let jsonrpc = notification.jsonrpc;
248 if !self.compatibility.is_version_valid(jsonrpc) {
249 return B(futures::finished(None));
250 }
251
252 match self.methods.get(¬ification.method) {
253 Some(&RemoteProcedure::Notification(ref notification)) => {
254 notification.execute(params, meta);
255 },
256 Some(&RemoteProcedure::Alias(ref alias)) => {
257 if let Some(&RemoteProcedure::Notification(ref notification)) = self.methods.get(alias) {
258 notification.execute(params, meta);
259 }
260 },
261 _ => {},
262 }
263
264 B(futures::finished(None))
265 },
266 Call::Invalid(id) => {
267 B(futures::finished(Some(Output::invalid_request(id, self.compatibility.default_version()))))
268 },
269 }
270 }
271}
272
273#[derive(Debug, Default)]
275pub struct IoHandler<M: Metadata = ()>(MetaIoHandler<M>);
276
277impl IoHandler {
279 pub fn new() -> Self {
281 IoHandler::default()
282 }
283
284 pub fn with_compatibility(compatibility: Compatibility) -> Self {
286 IoHandler(MetaIoHandler::with_compatibility(compatibility))
287 }
288}
289
290impl<M: Metadata> IoHandler<M> {
291 pub fn handle_request(&self, request: &str) -> FutureResult<FutureResponse> {
293 self.0.handle_request(request, M::default())
294 }
295
296 pub fn handle_rpc_request(&self, request: Request) -> FutureResponse {
298 self.0.handle_rpc_request(request, M::default())
299 }
300
301 pub fn handle_call(&self, call: Call) -> FutureOutput {
303 self.0.handle_call(call, M::default())
304 }
305
306 pub fn handle_request_sync(&self, request: &str) -> Option<String> {
310 self.0.handle_request_sync(request, M::default())
311 }
312}
313
314impl<M: Metadata> Deref for IoHandler<M> {
315 type Target = MetaIoHandler<M>;
316
317 fn deref(&self) -> &Self::Target {
318 &self.0
319 }
320}
321
322impl<M: Metadata> DerefMut for IoHandler<M> {
323 fn deref_mut(&mut self) -> &mut Self::Target {
324 &mut self.0
325 }
326}
327
328impl From<IoHandler> for MetaIoHandler<()> {
329 fn from(io: IoHandler) -> Self {
330 io.0
331 }
332}
333
334fn read_request(request_str: &str) -> Result<Request, Error> {
335 serde_json::from_str(request_str).map_err(|_| Error::new(ErrorCode::ParseError))
336}
337
338fn write_response(response: Response) -> String {
339 serde_json::to_string(&response).unwrap()
341}
342
343#[cfg(test)]
344mod tests {
345 use futures;
346 use types::{Value};
347 use super::{IoHandler, Compatibility};
348
349 #[test]
350 fn test_io_handler() {
351 let mut io = IoHandler::new();
352
353 io.add_method("say_hello", |_| {
354 Ok(Value::String("hello".to_string()))
355 });
356
357 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
358 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
359
360 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
361 }
362
363 #[test]
364 fn test_io_handler_1dot0() {
365 let mut io = IoHandler::with_compatibility(Compatibility::Both);
366
367 io.add_method("say_hello", |_| {
368 Ok(Value::String("hello".to_string()))
369 });
370
371 let request = r#"{"method": "say_hello", "params": [42, 23], "id": 1}"#;
372 let response = r#"{"result":"hello","id":1}"#;
373
374 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
375 }
376
377 #[test]
378 fn test_async_io_handler() {
379 let mut io = IoHandler::new();
380
381 io.add_method("say_hello", |_| {
382 futures::finished(Value::String("hello".to_string()))
383 });
384
385 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
386 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
387
388 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
389 }
390
391 #[test]
392 fn test_notification() {
393 use std::sync::Arc;
394 use std::sync::atomic;
395
396 let mut io = IoHandler::new();
397
398 let called = Arc::new(atomic::AtomicBool::new(false));
399 let c = called.clone();
400 io.add_notification("say_hello", move |_| {
401 c.store(true, atomic::Ordering::SeqCst);
402 });
403 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23]}"#;
404
405 assert_eq!(io.handle_request_sync(request), None);
406 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
407 }
408
409 #[test]
410 fn test_method_not_found() {
411 let io = IoHandler::new();
412
413 let request = r#"{"jsonrpc": "2.0", "method": "say_hello", "params": [42, 23], "id": 1}"#;
414 let response = r#"{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":1}"#;
415
416 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
417 }
418
419 #[test]
420 fn test_method_alias() {
421 let mut io = IoHandler::new();
422 io.add_method("say_hello", |_| {
423 Ok(Value::String("hello".to_string()))
424 });
425 io.add_alias("say_hello_alias", "say_hello");
426
427
428 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23], "id": 1}"#;
429 let response = r#"{"jsonrpc":"2.0","result":"hello","id":1}"#;
430
431 assert_eq!(io.handle_request_sync(request), Some(response.to_string()));
432 }
433
434 #[test]
435 fn test_notification_alias() {
436 use std::sync::Arc;
437 use std::sync::atomic;
438
439 let mut io = IoHandler::new();
440
441 let called = Arc::new(atomic::AtomicBool::new(false));
442 let c = called.clone();
443 io.add_notification("say_hello", move |_| {
444 c.store(true, atomic::Ordering::SeqCst);
445 });
446 io.add_alias("say_hello_alias", "say_hello");
447
448 let request = r#"{"jsonrpc": "2.0", "method": "say_hello_alias", "params": [42, 23]}"#;
449 assert_eq!(io.handle_request_sync(request), None);
450 assert_eq!(called.load(atomic::Ordering::SeqCst), true);
451 }
452
453 #[test]
454 fn test_send_sync() {
455 fn is_send_sync<T>(_obj: T) -> bool where
456 T: Send + Sync
457 {
458 true
459 }
460
461 let io = IoHandler::new();
462
463 assert!(is_send_sync(io))
464 }
465}