jsonrpc/
jsonrpc.rs

1// Copyright 2016 Bruno Medeiros
2//
3// Licensed under the Apache License, Version 2.0 
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0>. 
5// This file may not be copied, modified, or distributed
6// except according to those terms.
7
8
9
10/*!
11
12### JSON-RPC library.
13
14# Examples:
15
16See full server/client example here:
17https://github.com/RustDT/rustdt-json_rpc/blob/master/tests/example.rs
18
19*/
20
21
22#![allow(non_upper_case_globals)]
23#![allow(non_snake_case)]
24#![allow(non_camel_case_types)]
25
26#[macro_use] extern crate log;
27extern crate serde_json;
28extern crate serde;
29
30extern crate rustdt_util as util;
31pub extern crate futures;
32
33pub mod json_util;
34pub mod jsonrpc_common;
35pub mod jsonrpc_message;
36pub mod jsonrpc_request;
37pub mod jsonrpc_response;
38pub mod method_types;
39pub mod service_util;
40pub mod output_agent;
41
42/* -----------------  ----------------- */
43
44use util::core::*;
45
46use std::collections::HashMap;
47use std::result::Result;
48
49use std::sync::Arc;
50use std::sync::Mutex;
51 
52use futures::Future;
53use futures::BoxFuture;
54use futures::Complete;
55
56use service_util::MessageReader;
57use service_util::MessageWriter;
58use jsonrpc_common::*;
59use jsonrpc_message::*;
60use jsonrpc_request::*;
61use jsonrpc_response::*;
62use method_types::*;
63
64/* -----------------  Endpoint  ----------------- */
65
66use output_agent::OutputAgent;
67use output_agent::OutputAgentTask;
68
69
70/// A JSON-RPC endpoint that can send requests (Client role), 
71/// and send responses to requests (Server role).
72/// 
73/// This type has (mostly) handle semantics: it can be copied freely, used in multiple threads.
74///
75/// However, someone must be responsible for requesting an explicit shutdown of the Endpoint.
76/// If this is not done, the OutputAgent will panic once the last reference is dropped.
77///
78#[derive(Clone)]
79pub struct Endpoint {
80    id_counter : Arc<Mutex<u64>>,
81    pending_requests : Arc<Mutex<HashMap<Id, Complete<ResponseResult>>>>,
82    output_agent : Arc<Mutex<OutputAgent>>,
83}
84
85impl Endpoint {
86    
87    pub fn start_with(output_agent: OutputAgent) 
88        -> Endpoint
89    {
90        Endpoint {
91            id_counter : newArcMutex(0),
92            pending_requests : newArcMutex(HashMap::new()),
93            output_agent : newArcMutex(output_agent) 
94        }
95    }
96    
97    pub fn is_shutdown(& self) -> bool {
98        self.output_agent.lock().unwrap().is_shutdown()
99    }
100    
101    pub fn request_shutdown(&self) {
102        self.output_agent.lock().unwrap().request_shutdown();
103    }
104    
105    pub fn shutdown_and_join(&self) {
106        self.output_agent.lock().unwrap().shutdown_and_join();
107    }
108    
109    pub fn next_id(&self) -> Id {
110           let id_num : &mut u64 = &mut *self.id_counter.lock().unwrap();
111        *id_num += 1;
112        Id::Number(*id_num)
113    }
114}
115
116/// Combine an Endpoint with a request handler, 
117/// to create a complete Endpoint Handler, capable of handling incoming requests from a message reader.
118///
119/// See also: Endpoint
120pub struct EndpointHandler {
121    pub endpoint : Endpoint,
122    pub request_handler : Box<RequestHandler>,
123}
124
125impl EndpointHandler {
126    
127    pub fn create_with_writer<WRITER>(msg_writer: WRITER, request_handler: Box<RequestHandler>) 
128        -> EndpointHandler
129    where 
130        WRITER : MessageWriter + 'static + Send, 
131    {
132        let output_agent = OutputAgent::start_with_provider(|| msg_writer);
133        Self::create_with_output_agent(output_agent, request_handler)
134    }
135    
136    pub fn create_with_output_agent(output_agent: OutputAgent, request_handler: Box<RequestHandler>) 
137        -> EndpointHandler
138    {
139        let output = Endpoint::start_with(output_agent);
140        Self::create(output, request_handler)
141    }
142    
143    pub fn create(endpoint: Endpoint, request_handler: Box<RequestHandler>) 
144        -> EndpointHandler
145    {
146        EndpointHandler { endpoint : endpoint, request_handler: request_handler }
147    }
148    
149    /// Run a message read loop with given message reader.
150    /// Loop will be terminated only when there is an error reading a message.
151    pub fn run_message_read_loop<MSG_READER : ?Sized>(mut self, input: &mut MSG_READER) 
152        -> GResult<()>
153    where
154        MSG_READER : MessageReader
155    {
156        loop {
157            let message = match input.read_next() {
158                Ok(ok) => { ok } 
159                Err(error) => { 
160                    self.endpoint.request_shutdown();
161                    return Err(error);
162                }
163            };
164            
165            self.handle_incoming_message(&message);
166            
167            if self.endpoint.is_shutdown() {
168                return Ok(())
169            }
170        }
171    }
172    
173    /// Handle an incoming message
174    pub fn handle_incoming_message(&mut self, message_json: &str) {
175        
176        let message = serde_json::from_str::<Message>(message_json);
177         
178        match message {
179            Ok(message) => {
180                match message {
181                	Message::Request(request) => self.handle_incoming_request(request),  
182                	Message::Response(response) => self.endpoint.handle_incoming_response(response),
183                }
184            } 
185            Err(error) => {
186                let error = error_JSON_RPC_InvalidRequest(error);
187                submit_error_write_task(&self.endpoint.output_agent, error); 
188            }
189        }
190    }
191
192    /// Handle a well-formed incoming JsonRpc request object
193    pub fn handle_incoming_request(&mut self, request: Request) {
194        let output_agent = self.endpoint.output_agent.clone();
195        
196        let on_response = new(move |response: Option<Response>| {
197            if let Some(response) = response {
198                submit_message_write_task(&output_agent, response.into()); 
199            } else {
200                let method_name = ""; // TODO
201                info!("JSON-RPC notification complete. {:?}", method_name);
202            } 
203        });
204        let completable = ResponseCompletable::new(request.id, on_response);
205        
206        self.request_handler.handle_request(&request.method, request.params, completable); 
207    }
208
209}
210
211/* ----------------- Response handling ----------------- */
212
213pub trait RequestHandler {
214    fn handle_request(
215        &mut self, method_name: &str, request_params: RequestParams, completable: ResponseCompletable
216    );
217}
218
219pub struct NullRequestHandler;
220
221impl RequestHandler for NullRequestHandler {
222    fn handle_request(
223        &mut self, _request_method: &str, _request_params: RequestParams, completable: ResponseCompletable
224    ) {
225        completable.complete_with_error(error_JSON_RPC_MethodNotFound());
226    }
227}
228
229/// A completable for a JSON-RPC request. This is an object that must be "completed", 
230/// that is, a result must be provided. (this is the inverse of a future)
231/// 
232/// Must be completed once and only once, otherwise a panic is generated upon drop.
233/// 
234/// On completion, the on_response callback is invoked. 
235/// Typically: this will write an appropriate JSON-RPC response to the endpoint output.
236pub struct ResponseCompletable {
237    completion_flag: FinishedFlag,
238    id: Option<Id>,
239    on_response: Box<FnMut(Option<Response>) + Send>,
240}
241
242impl ResponseCompletable {
243    
244    pub fn new(id: Option<Id>, on_response: Box<FnMut(Option<Response>) + Send>) -> ResponseCompletable {
245        ResponseCompletable { 
246            completion_flag : FinishedFlag(false), id : id, on_response: on_response
247        }
248    }
249    
250    pub fn complete(mut self, response_result: Option<ResponseResult>) {
251        self.completion_flag.finish();
252        
253        // From the spec: `A Notification is a Request object without an "id" member.`
254        if let Some(response_result) = response_result {
255            
256            let response =
257            if let Some(id) = self.id {
258                Response{ id : id, result_or_error : response_result }
259            } else {
260                Response::new_error(Id::Null, 
261                    error_JSON_RPC_InvalidRequest("Property `id` not provided for request."))
262            };
263            
264            (self.on_response)(Some(response));
265        } else {
266            (self.on_response)(None)
267        }
268    }
269    
270    pub fn complete_with_error(self, error: RequestError) {
271        self.complete(Some(ResponseResult::Error(error)));
272    }
273    
274    pub fn handle_request_with<PARAMS, RET, RET_ERROR, METHOD>(
275        self, params: RequestParams, method_handler: METHOD
276    ) 
277    where 
278        PARAMS : serde::Deserialize, 
279        RET : serde::Serialize, 
280        RET_ERROR : serde::Serialize,
281        METHOD : FnOnce(PARAMS, MethodCompletable<RET, RET_ERROR>),
282    {
283        let mc = MethodCompletable::<RET, RET_ERROR>::new(self);
284        mc.parse_params_and_complete_with(params, method_handler);
285    }
286    
287    pub fn sync_handle_request<PARAMS, RET, RET_ERROR, METHOD>(
288        self, params: RequestParams, sync_method_handler: METHOD
289    ) 
290    where 
291        PARAMS : serde::Deserialize, 
292        RET : serde::Serialize, 
293        RET_ERROR : serde::Serialize ,
294        METHOD : FnOnce(PARAMS) -> MethodResult<RET, RET_ERROR>,
295    {
296        self.handle_request_with(params, |params, completable| {
297            let result = sync_method_handler(params);
298            completable.complete(result);
299        })
300    }
301    
302    pub fn handle_notification_with<PARAMS, METHOD>(
303        self, params: RequestParams, method_handler: METHOD
304    ) 
305    where 
306        PARAMS : serde::Deserialize, 
307        METHOD : FnOnce(PARAMS),
308    {
309        let mc = MethodCompletable::<(), ()>::new(self);
310        mc.parse_params_and_complete_with(params, |params, completable| {
311            // early completion for notification
312            completable.completable.complete(None);
313            method_handler(params)
314        });
315    }
316    
317    pub fn sync_handle_notification<PARAMS, METHOD>(
318        self, params: RequestParams, sync_method_handler: METHOD
319    ) 
320    where 
321        PARAMS : serde::Deserialize, 
322        METHOD : FnOnce(PARAMS),
323    {
324        self.handle_notification_with(params, |params| {
325            sync_method_handler(params);
326        })
327    }
328    
329}
330
331use std::marker::PhantomData;
332
333/// Helper type that wraps a ResponseCompletable, 
334/// and binds the possible completion to a result `MethodResult<RET, RET_ERROR>` 
335pub struct MethodCompletable
336<
337    RET : serde::Serialize, 
338    RET_ERROR : serde::Serialize,
339>
340{
341    completable: ResponseCompletable,
342    p1: PhantomData<RET>,
343    p2: PhantomData<RET_ERROR>,
344}
345
346impl<
347    RET : serde::Serialize, 
348    RET_ERROR : serde::Serialize,
349> 
350    MethodCompletable<RET, RET_ERROR>
351{
352    pub fn new(completable: ResponseCompletable) -> MethodCompletable<RET, RET_ERROR> {
353        MethodCompletable { completable : completable, p1 : PhantomData, p2 : PhantomData}
354    }
355    
356    pub fn parse_params_and_complete_with<PARAMS, METHOD>(
357        self,
358        params: RequestParams,
359        method_fn: METHOD
360    )
361    where 
362        PARAMS : serde::Deserialize, 
363        RET : serde::Serialize, 
364        RET_ERROR : serde::Serialize,
365        METHOD : FnOnce(PARAMS, Self),
366    {
367        let params_value = params.into_value();
368        
369        let params_result : Result<PARAMS, _> = serde_json::from_value(params_value);
370        
371        match params_result {
372            Ok(params) => { 
373                method_fn(params, self);
374            }
375            Err(error) => {
376                self.completable.complete_with_error(error_JSON_RPC_InvalidParams(error));
377            }
378        }
379    }
380    
381    pub fn complete(self, result: MethodResult<RET, RET_ERROR>) {
382        self.completable.complete(Some(ResponseResult::from(result)));
383    }
384}
385
386pub fn submit_message_write_task(output_agent: &Arc<Mutex<OutputAgent>>, jsonrpc_message: Message) {
387    
388    let write_task : OutputAgentTask = Box::new(move |mut response_handler| {
389        info!("JSON-RPC message: {:?}", jsonrpc_message);
390        
391        let response_str = serde_json::to_string(&jsonrpc_message).unwrap_or_else(|error| -> String { 
392            panic!("Failed to serialize to JSON object: {}", error);
393        });
394        
395        let write_res = response_handler.write_message(&response_str);
396        if let Err(error) = write_res {
397            // FIXME handle output stream write error by shutting down
398            error!("Error writing JSON-RPC message: {}", error);
399        };
400    });
401    
402    let res = {
403        output_agent.lock().unwrap().try_submit_task(write_task)
404    }; 
405    // If res is error, panic here, outside of thread lock
406    res.expect("Output agent is shutdown or thread panicked!");
407}
408
409pub fn submit_error_write_task(output_agent: &Arc<Mutex<OutputAgent>>, error: RequestError) {
410    let id = Id::Null;
411    let response = Response::new_error(id, error);
412    submit_message_write_task(output_agent, response.into()); 
413}
414
415/* -----------------  Request sending  ----------------- */
416
417pub type RequestFuture<RET, RET_ERROR> = BoxFuture<RequestResult<RET, RET_ERROR>, futures::Canceled>;
418
419
420impl Endpoint {
421    
422    /// Send a (non-notification) request
423    pub fn send_request<
424        PARAMS : serde::Serialize, 
425        RET: serde::Deserialize, 
426        RET_ERROR : serde::Deserialize, 
427    >(&mut self, method_name: &str, params: PARAMS) 
428        -> GResult<RequestFuture<RET, RET_ERROR>> 
429    {
430        let (completable, future) = futures::oneshot::<ResponseResult>();
431        let future : futures::Oneshot<ResponseResult> = future;
432        
433        let id = self.next_id();
434        
435        self.pending_requests.lock().unwrap().insert(id.clone(), completable);
436        
437        self.write_request(Some(id), method_name, params)?;
438        
439        let future = future.map(|response_result : ResponseResult| {
440            RequestResult::<RET, RET_ERROR>::from(response_result)
441        });
442        
443        Ok(new(future))
444    }
445    
446    
447    /// Send a notification
448    pub fn send_notification<
449        PARAMS : serde::Serialize, 
450    >(&self, method_name: &str, params: PARAMS) 
451        -> GResult<()> 
452    {
453        let id = None;
454        self.write_request::<_>(id, method_name, params)
455    }
456    
457    pub fn write_request<
458        PARAMS : serde::Serialize, 
459    >(&self, id: Option<Id>, method_name: &str, params: PARAMS) 
460        -> GResult<()> 
461    {
462        let params_value = serde_json::to_value(&params);
463        let params = jsonrpc_request::to_jsonrpc_params(params_value)?;
464        
465        let rpc_request = Request { id: id.clone(), method : method_name.into(), params : params };
466        
467        submit_message_write_task(&self.output_agent, Message::Request(rpc_request));
468        Ok(())
469    }
470    
471    
472    /// Handle a well-formed incoming JsonRpc request object
473    pub fn handle_incoming_response(&mut self, response: Response) {
474        let id = response.id;
475        let result_or_error = response.result_or_error;
476        
477        let entry = self.pending_requests.lock().unwrap().remove(&id);
478        
479        match entry {
480        	Some(entry) => { 
481        	    entry.complete(result_or_error) 
482        	} 
483        	None => { 
484                let id = Id::Null;
485                let error = error_JSON_RPC_InvalidResponse(format!("id `{}` not found", id));
486                submit_error_write_task(&self.output_agent, error); 
487        	}
488        }
489    }
490    
491}
492
493pub mod map_request_handler;
494
495
496/* ----------------- Tests ----------------- */
497
498mod tests_sample_types;
499
500#[cfg(test)]
501mod tests_ {
502    
503    use super::*;
504    use util::core::*;
505    use util::tests::*;
506    use tests_sample_types::*;
507    use map_request_handler::MapRequestHandler;
508    
509    use std::thread;
510    
511    use serde_json::Value;
512    use serde_json;
513    
514    use jsonrpc_common::*;
515    use jsonrpc_response::*;
516    use jsonrpc_request::*;
517    use jsonrpc_request::request_tests::check_error;
518    use method_types::*;
519    
520    use json_util::JsonObject;
521    use json_util::test_util::to_json;
522    use service_util::WriteLineMessageWriter;
523    
524    use futures::task::Unpark;
525    use futures::Async;
526    use std::sync::Arc;
527    
528    
529    pub fn sample_fn(params: Point) -> MethodResult<String, ()> {
530        let x_str : String = params.x.to_string();
531        let y_str : String = params.y.to_string();
532        Ok(x_str + &y_str)
533    }
534    pub fn no_params_method(_params: ()) -> Result<String, MethodError<()>> {
535        Ok("okay".into())
536    }
537    
538    pub fn check_request(result: ResponseResult, expected: ResponseResult) {
539        if let ResponseResult::Error(ref error) = result {
540            
541            if let ResponseResult::Error(expected_error) = expected {
542                check_error(error.clone(), expected_error.clone());
543                return;
544            }
545            
546        }
547        
548        assert_equal(&result, &expected);
549    }
550    
551    pub fn async_method(request_params: RequestParams, completable: ResponseCompletable) {
552        thread::spawn(move || {
553            completable.sync_handle_request(request_params, sample_fn);
554        });
555    }
556        
557    fn invoke_method<FN>(
558        req_handler: &mut RequestHandler, 
559        method_name: &str, 
560        request_params: RequestParams, 
561        mut and_then: FN
562    ) 
563    where 
564        FN : FnMut(Option<ResponseResult>) + 'static + Send
565    {
566        let on_response : Box<FnMut(Option<Response>) + Send> = new(move |response: Option<Response>| {
567            and_then(response.and_then(|e| Some(e.result_or_error)));
568        });
569        
570        let completable = ResponseCompletable::new(Some(Id::Number(123)), on_response);
571        req_handler.handle_request(method_name, request_params, completable);
572    }
573    
574    #[test]
575    fn test_Endpoint() {
576        
577        {
578            // Test handle unknown method
579            let mut request_handler = MapRequestHandler::new();
580            
581            let request = Request::new(1, "unknown_method".to_string(), JsonObject::new());
582            invoke_method(&mut request_handler, &request.method, request.params,
583                |result| 
584                check_request(result.unwrap(), ResponseResult::Error(error_JSON_RPC_MethodNotFound())) 
585            );
586        }
587        
588        let mut request_handler = MapRequestHandler::new();
589        request_handler.add_request("sample_fn", Box::new(sample_fn));
590        request_handler.add_rpc_handler("async_method", Box::new(async_method));
591        
592        // test with invalid params = "{}" 
593        let request = Request::new(1, "sample_fn".to_string(), JsonObject::new());
594        invoke_method(&mut request_handler, &request.method, request.params, 
595            |result| 
596            check_request(result.unwrap(), ResponseResult::Error(error_JSON_RPC_InvalidParams(r#"missing field "x""#)))
597        );
598        
599        // test with valid params
600        let params_value = match serde_json::to_value(&new_sample_params(10, 20)) {
601            Value::Object(object) => object, 
602            _ => panic!("Not serialized into Object") 
603        };
604        let request = Request::new(1, "sample_fn".to_string(), params_value);
605        invoke_method(&mut request_handler, &request.method, request.params.clone(),
606            |result| 
607            assert_equal(result.unwrap(), ResponseResult::Result(
608                Value::String("1020".to_string())
609            ))
610        );
611        
612        
613        // Test valid request with params = "null"
614        request_handler.add_request("no_params_method", Box::new(no_params_method));
615        
616        let id1 = Some(Id::Number(1));
617        let request = Request { id : id1, method : "no_params_method".into(), params : RequestParams::None, };
618        invoke_method(&mut request_handler, &request.method, request.params.clone(), 
619            |result| 
620            assert_equal(result.unwrap(), ResponseResult::Result(
621                Value::String("okay".to_string())
622            ))
623        );
624        
625        // --- Endpoint:
626        let output = vec![];
627        let mut eh = EndpointHandler::create_with_writer(WriteLineMessageWriter(output), new(request_handler));
628        
629        // Test ResponseCompletable - missing id for notification method
630        let completable = ResponseCompletable::new(None, new(|_| {}));
631        completable.complete(None);
632        
633        // Test ResponseCompletable - missing id for regular method
634        let completable = ResponseCompletable::new(None, new(|_| {}));
635        completable.complete(Some(ResponseResult::Result(Value::String("1020".to_string()))));
636        
637        // test again using handle_request
638        // TODO review this code
639        let request = Request {     
640            id : None,
641            method : "sample_fn".into(),
642            params : request.params.clone(),
643        }; 
644        eh.handle_incoming_request(request);
645        
646        // Test send_request
647        
648        let params = new_sample_params(123, 66);
649        eh.endpoint.send_notification("sample_fn", params.clone()).unwrap();
650        
651        eh.endpoint.send_notification("async_method", params.clone()).unwrap();
652        
653        assert_eq!(*eh.endpoint.id_counter.lock().unwrap(), 0);
654        
655        let my_method = "sample_fn".to_string();
656        let future : RequestFuture<String, ()> = eh.endpoint.send_request(&my_method, params.clone()).unwrap();
657        
658        assert_eq!(*eh.endpoint.id_counter.lock().unwrap(), 1);
659        
660        // Test future is not completed
661        let mut spawn = futures::task::spawn(future);
662        let poll = spawn.poll_future(noop_unpark());
663        assert_eq!(poll, Ok(Async::NotReady));
664        
665        // Complete the request
666        let expected_result = "sample_fn result".to_string();
667        let id = Id::Number(1);
668        let response = Response::new_result(id, Value::String(expected_result.clone())); 
669        eh.handle_incoming_message(&to_json(&response));
670
671        // ...check future was completed.
672        let result : Result<RequestResult<String, ()>, _> = spawn.wait_future();
673        assert_eq!(result.unwrap(), RequestResult::MethodResult(Ok(expected_result)));
674        
675        eh.endpoint.request_shutdown();
676    }
677    
678    pub fn noop_unpark() -> Arc<Unpark> {
679        struct Foo;
680        
681        impl Unpark for Foo {
682            fn unpark(&self) {}
683        }
684        
685        Arc::new(Foo)
686    }
687    
688}
689
690