1#![allow(non_upper_case_globals)]
23#![allow(non_snake_case)]
24#![allow(non_camel_case_types)]
25
26#[macro_use] extern crate log;
27extern crate serde_json;
28extern crate serde;
29
30extern crate rustdt_util as util;
31pub extern crate futures;
32
33pub mod json_util;
34pub mod jsonrpc_common;
35pub mod jsonrpc_message;
36pub mod jsonrpc_request;
37pub mod jsonrpc_response;
38pub mod method_types;
39pub mod service_util;
40pub mod output_agent;
41
42use util::core::*;
45
46use std::collections::HashMap;
47use std::result::Result;
48
49use std::sync::Arc;
50use std::sync::Mutex;
51
52use futures::Future;
53use futures::BoxFuture;
54use futures::Complete;
55
56use service_util::MessageReader;
57use service_util::MessageWriter;
58use jsonrpc_common::*;
59use jsonrpc_message::*;
60use jsonrpc_request::*;
61use jsonrpc_response::*;
62use method_types::*;
63
64use output_agent::OutputAgent;
67use output_agent::OutputAgentTask;
68
69
70#[derive(Clone)]
79pub struct Endpoint {
80 id_counter : Arc<Mutex<u64>>,
81 pending_requests : Arc<Mutex<HashMap<Id, Complete<ResponseResult>>>>,
82 output_agent : Arc<Mutex<OutputAgent>>,
83}
84
85impl Endpoint {
86
87 pub fn start_with(output_agent: OutputAgent)
88 -> Endpoint
89 {
90 Endpoint {
91 id_counter : newArcMutex(0),
92 pending_requests : newArcMutex(HashMap::new()),
93 output_agent : newArcMutex(output_agent)
94 }
95 }
96
97 pub fn is_shutdown(& self) -> bool {
98 self.output_agent.lock().unwrap().is_shutdown()
99 }
100
101 pub fn request_shutdown(&self) {
102 self.output_agent.lock().unwrap().request_shutdown();
103 }
104
105 pub fn shutdown_and_join(&self) {
106 self.output_agent.lock().unwrap().shutdown_and_join();
107 }
108
109 pub fn next_id(&self) -> Id {
110 let id_num : &mut u64 = &mut *self.id_counter.lock().unwrap();
111 *id_num += 1;
112 Id::Number(*id_num)
113 }
114}
115
116pub struct EndpointHandler {
121 pub endpoint : Endpoint,
122 pub request_handler : Box<RequestHandler>,
123}
124
125impl EndpointHandler {
126
127 pub fn create_with_writer<WRITER>(msg_writer: WRITER, request_handler: Box<RequestHandler>)
128 -> EndpointHandler
129 where
130 WRITER : MessageWriter + 'static + Send,
131 {
132 let output_agent = OutputAgent::start_with_provider(|| msg_writer);
133 Self::create_with_output_agent(output_agent, request_handler)
134 }
135
136 pub fn create_with_output_agent(output_agent: OutputAgent, request_handler: Box<RequestHandler>)
137 -> EndpointHandler
138 {
139 let output = Endpoint::start_with(output_agent);
140 Self::create(output, request_handler)
141 }
142
143 pub fn create(endpoint: Endpoint, request_handler: Box<RequestHandler>)
144 -> EndpointHandler
145 {
146 EndpointHandler { endpoint : endpoint, request_handler: request_handler }
147 }
148
149 pub fn run_message_read_loop<MSG_READER : ?Sized>(mut self, input: &mut MSG_READER)
152 -> GResult<()>
153 where
154 MSG_READER : MessageReader
155 {
156 loop {
157 let message = match input.read_next() {
158 Ok(ok) => { ok }
159 Err(error) => {
160 self.endpoint.request_shutdown();
161 return Err(error);
162 }
163 };
164
165 self.handle_incoming_message(&message);
166
167 if self.endpoint.is_shutdown() {
168 return Ok(())
169 }
170 }
171 }
172
173 pub fn handle_incoming_message(&mut self, message_json: &str) {
175
176 let message = serde_json::from_str::<Message>(message_json);
177
178 match message {
179 Ok(message) => {
180 match message {
181 Message::Request(request) => self.handle_incoming_request(request),
182 Message::Response(response) => self.endpoint.handle_incoming_response(response),
183 }
184 }
185 Err(error) => {
186 let error = error_JSON_RPC_InvalidRequest(error);
187 submit_error_write_task(&self.endpoint.output_agent, error);
188 }
189 }
190 }
191
192 pub fn handle_incoming_request(&mut self, request: Request) {
194 let output_agent = self.endpoint.output_agent.clone();
195
196 let on_response = new(move |response: Option<Response>| {
197 if let Some(response) = response {
198 submit_message_write_task(&output_agent, response.into());
199 } else {
200 let method_name = ""; info!("JSON-RPC notification complete. {:?}", method_name);
202 }
203 });
204 let completable = ResponseCompletable::new(request.id, on_response);
205
206 self.request_handler.handle_request(&request.method, request.params, completable);
207 }
208
209}
210
211pub trait RequestHandler {
214 fn handle_request(
215 &mut self, method_name: &str, request_params: RequestParams, completable: ResponseCompletable
216 );
217}
218
219pub struct NullRequestHandler;
220
221impl RequestHandler for NullRequestHandler {
222 fn handle_request(
223 &mut self, _request_method: &str, _request_params: RequestParams, completable: ResponseCompletable
224 ) {
225 completable.complete_with_error(error_JSON_RPC_MethodNotFound());
226 }
227}
228
229pub struct ResponseCompletable {
237 completion_flag: FinishedFlag,
238 id: Option<Id>,
239 on_response: Box<FnMut(Option<Response>) + Send>,
240}
241
242impl ResponseCompletable {
243
244 pub fn new(id: Option<Id>, on_response: Box<FnMut(Option<Response>) + Send>) -> ResponseCompletable {
245 ResponseCompletable {
246 completion_flag : FinishedFlag(false), id : id, on_response: on_response
247 }
248 }
249
250 pub fn complete(mut self, response_result: Option<ResponseResult>) {
251 self.completion_flag.finish();
252
253 if let Some(response_result) = response_result {
255
256 let response =
257 if let Some(id) = self.id {
258 Response{ id : id, result_or_error : response_result }
259 } else {
260 Response::new_error(Id::Null,
261 error_JSON_RPC_InvalidRequest("Property `id` not provided for request."))
262 };
263
264 (self.on_response)(Some(response));
265 } else {
266 (self.on_response)(None)
267 }
268 }
269
270 pub fn complete_with_error(self, error: RequestError) {
271 self.complete(Some(ResponseResult::Error(error)));
272 }
273
274 pub fn handle_request_with<PARAMS, RET, RET_ERROR, METHOD>(
275 self, params: RequestParams, method_handler: METHOD
276 )
277 where
278 PARAMS : serde::Deserialize,
279 RET : serde::Serialize,
280 RET_ERROR : serde::Serialize,
281 METHOD : FnOnce(PARAMS, MethodCompletable<RET, RET_ERROR>),
282 {
283 let mc = MethodCompletable::<RET, RET_ERROR>::new(self);
284 mc.parse_params_and_complete_with(params, method_handler);
285 }
286
287 pub fn sync_handle_request<PARAMS, RET, RET_ERROR, METHOD>(
288 self, params: RequestParams, sync_method_handler: METHOD
289 )
290 where
291 PARAMS : serde::Deserialize,
292 RET : serde::Serialize,
293 RET_ERROR : serde::Serialize ,
294 METHOD : FnOnce(PARAMS) -> MethodResult<RET, RET_ERROR>,
295 {
296 self.handle_request_with(params, |params, completable| {
297 let result = sync_method_handler(params);
298 completable.complete(result);
299 })
300 }
301
302 pub fn handle_notification_with<PARAMS, METHOD>(
303 self, params: RequestParams, method_handler: METHOD
304 )
305 where
306 PARAMS : serde::Deserialize,
307 METHOD : FnOnce(PARAMS),
308 {
309 let mc = MethodCompletable::<(), ()>::new(self);
310 mc.parse_params_and_complete_with(params, |params, completable| {
311 completable.completable.complete(None);
313 method_handler(params)
314 });
315 }
316
317 pub fn sync_handle_notification<PARAMS, METHOD>(
318 self, params: RequestParams, sync_method_handler: METHOD
319 )
320 where
321 PARAMS : serde::Deserialize,
322 METHOD : FnOnce(PARAMS),
323 {
324 self.handle_notification_with(params, |params| {
325 sync_method_handler(params);
326 })
327 }
328
329}
330
331use std::marker::PhantomData;
332
333pub struct MethodCompletable
336<
337 RET : serde::Serialize,
338 RET_ERROR : serde::Serialize,
339>
340{
341 completable: ResponseCompletable,
342 p1: PhantomData<RET>,
343 p2: PhantomData<RET_ERROR>,
344}
345
346impl<
347 RET : serde::Serialize,
348 RET_ERROR : serde::Serialize,
349>
350 MethodCompletable<RET, RET_ERROR>
351{
352 pub fn new(completable: ResponseCompletable) -> MethodCompletable<RET, RET_ERROR> {
353 MethodCompletable { completable : completable, p1 : PhantomData, p2 : PhantomData}
354 }
355
356 pub fn parse_params_and_complete_with<PARAMS, METHOD>(
357 self,
358 params: RequestParams,
359 method_fn: METHOD
360 )
361 where
362 PARAMS : serde::Deserialize,
363 RET : serde::Serialize,
364 RET_ERROR : serde::Serialize,
365 METHOD : FnOnce(PARAMS, Self),
366 {
367 let params_value = params.into_value();
368
369 let params_result : Result<PARAMS, _> = serde_json::from_value(params_value);
370
371 match params_result {
372 Ok(params) => {
373 method_fn(params, self);
374 }
375 Err(error) => {
376 self.completable.complete_with_error(error_JSON_RPC_InvalidParams(error));
377 }
378 }
379 }
380
381 pub fn complete(self, result: MethodResult<RET, RET_ERROR>) {
382 self.completable.complete(Some(ResponseResult::from(result)));
383 }
384}
385
386pub fn submit_message_write_task(output_agent: &Arc<Mutex<OutputAgent>>, jsonrpc_message: Message) {
387
388 let write_task : OutputAgentTask = Box::new(move |mut response_handler| {
389 info!("JSON-RPC message: {:?}", jsonrpc_message);
390
391 let response_str = serde_json::to_string(&jsonrpc_message).unwrap_or_else(|error| -> String {
392 panic!("Failed to serialize to JSON object: {}", error);
393 });
394
395 let write_res = response_handler.write_message(&response_str);
396 if let Err(error) = write_res {
397 error!("Error writing JSON-RPC message: {}", error);
399 };
400 });
401
402 let res = {
403 output_agent.lock().unwrap().try_submit_task(write_task)
404 };
405 res.expect("Output agent is shutdown or thread panicked!");
407}
408
409pub fn submit_error_write_task(output_agent: &Arc<Mutex<OutputAgent>>, error: RequestError) {
410 let id = Id::Null;
411 let response = Response::new_error(id, error);
412 submit_message_write_task(output_agent, response.into());
413}
414
415pub type RequestFuture<RET, RET_ERROR> = BoxFuture<RequestResult<RET, RET_ERROR>, futures::Canceled>;
418
419
420impl Endpoint {
421
422 pub fn send_request<
424 PARAMS : serde::Serialize,
425 RET: serde::Deserialize,
426 RET_ERROR : serde::Deserialize,
427 >(&mut self, method_name: &str, params: PARAMS)
428 -> GResult<RequestFuture<RET, RET_ERROR>>
429 {
430 let (completable, future) = futures::oneshot::<ResponseResult>();
431 let future : futures::Oneshot<ResponseResult> = future;
432
433 let id = self.next_id();
434
435 self.pending_requests.lock().unwrap().insert(id.clone(), completable);
436
437 self.write_request(Some(id), method_name, params)?;
438
439 let future = future.map(|response_result : ResponseResult| {
440 RequestResult::<RET, RET_ERROR>::from(response_result)
441 });
442
443 Ok(new(future))
444 }
445
446
447 pub fn send_notification<
449 PARAMS : serde::Serialize,
450 >(&self, method_name: &str, params: PARAMS)
451 -> GResult<()>
452 {
453 let id = None;
454 self.write_request::<_>(id, method_name, params)
455 }
456
457 pub fn write_request<
458 PARAMS : serde::Serialize,
459 >(&self, id: Option<Id>, method_name: &str, params: PARAMS)
460 -> GResult<()>
461 {
462 let params_value = serde_json::to_value(¶ms);
463 let params = jsonrpc_request::to_jsonrpc_params(params_value)?;
464
465 let rpc_request = Request { id: id.clone(), method : method_name.into(), params : params };
466
467 submit_message_write_task(&self.output_agent, Message::Request(rpc_request));
468 Ok(())
469 }
470
471
472 pub fn handle_incoming_response(&mut self, response: Response) {
474 let id = response.id;
475 let result_or_error = response.result_or_error;
476
477 let entry = self.pending_requests.lock().unwrap().remove(&id);
478
479 match entry {
480 Some(entry) => {
481 entry.complete(result_or_error)
482 }
483 None => {
484 let id = Id::Null;
485 let error = error_JSON_RPC_InvalidResponse(format!("id `{}` not found", id));
486 submit_error_write_task(&self.output_agent, error);
487 }
488 }
489 }
490
491}
492
493pub mod map_request_handler;
494
495
496mod tests_sample_types;
499
500#[cfg(test)]
501mod tests_ {
502
503 use super::*;
504 use util::core::*;
505 use util::tests::*;
506 use tests_sample_types::*;
507 use map_request_handler::MapRequestHandler;
508
509 use std::thread;
510
511 use serde_json::Value;
512 use serde_json;
513
514 use jsonrpc_common::*;
515 use jsonrpc_response::*;
516 use jsonrpc_request::*;
517 use jsonrpc_request::request_tests::check_error;
518 use method_types::*;
519
520 use json_util::JsonObject;
521 use json_util::test_util::to_json;
522 use service_util::WriteLineMessageWriter;
523
524 use futures::task::Unpark;
525 use futures::Async;
526 use std::sync::Arc;
527
528
529 pub fn sample_fn(params: Point) -> MethodResult<String, ()> {
530 let x_str : String = params.x.to_string();
531 let y_str : String = params.y.to_string();
532 Ok(x_str + &y_str)
533 }
534 pub fn no_params_method(_params: ()) -> Result<String, MethodError<()>> {
535 Ok("okay".into())
536 }
537
538 pub fn check_request(result: ResponseResult, expected: ResponseResult) {
539 if let ResponseResult::Error(ref error) = result {
540
541 if let ResponseResult::Error(expected_error) = expected {
542 check_error(error.clone(), expected_error.clone());
543 return;
544 }
545
546 }
547
548 assert_equal(&result, &expected);
549 }
550
551 pub fn async_method(request_params: RequestParams, completable: ResponseCompletable) {
552 thread::spawn(move || {
553 completable.sync_handle_request(request_params, sample_fn);
554 });
555 }
556
557 fn invoke_method<FN>(
558 req_handler: &mut RequestHandler,
559 method_name: &str,
560 request_params: RequestParams,
561 mut and_then: FN
562 )
563 where
564 FN : FnMut(Option<ResponseResult>) + 'static + Send
565 {
566 let on_response : Box<FnMut(Option<Response>) + Send> = new(move |response: Option<Response>| {
567 and_then(response.and_then(|e| Some(e.result_or_error)));
568 });
569
570 let completable = ResponseCompletable::new(Some(Id::Number(123)), on_response);
571 req_handler.handle_request(method_name, request_params, completable);
572 }
573
574 #[test]
575 fn test_Endpoint() {
576
577 {
578 let mut request_handler = MapRequestHandler::new();
580
581 let request = Request::new(1, "unknown_method".to_string(), JsonObject::new());
582 invoke_method(&mut request_handler, &request.method, request.params,
583 |result|
584 check_request(result.unwrap(), ResponseResult::Error(error_JSON_RPC_MethodNotFound()))
585 );
586 }
587
588 let mut request_handler = MapRequestHandler::new();
589 request_handler.add_request("sample_fn", Box::new(sample_fn));
590 request_handler.add_rpc_handler("async_method", Box::new(async_method));
591
592 let request = Request::new(1, "sample_fn".to_string(), JsonObject::new());
594 invoke_method(&mut request_handler, &request.method, request.params,
595 |result|
596 check_request(result.unwrap(), ResponseResult::Error(error_JSON_RPC_InvalidParams(r#"missing field "x""#)))
597 );
598
599 let params_value = match serde_json::to_value(&new_sample_params(10, 20)) {
601 Value::Object(object) => object,
602 _ => panic!("Not serialized into Object")
603 };
604 let request = Request::new(1, "sample_fn".to_string(), params_value);
605 invoke_method(&mut request_handler, &request.method, request.params.clone(),
606 |result|
607 assert_equal(result.unwrap(), ResponseResult::Result(
608 Value::String("1020".to_string())
609 ))
610 );
611
612
613 request_handler.add_request("no_params_method", Box::new(no_params_method));
615
616 let id1 = Some(Id::Number(1));
617 let request = Request { id : id1, method : "no_params_method".into(), params : RequestParams::None, };
618 invoke_method(&mut request_handler, &request.method, request.params.clone(),
619 |result|
620 assert_equal(result.unwrap(), ResponseResult::Result(
621 Value::String("okay".to_string())
622 ))
623 );
624
625 let output = vec![];
627 let mut eh = EndpointHandler::create_with_writer(WriteLineMessageWriter(output), new(request_handler));
628
629 let completable = ResponseCompletable::new(None, new(|_| {}));
631 completable.complete(None);
632
633 let completable = ResponseCompletable::new(None, new(|_| {}));
635 completable.complete(Some(ResponseResult::Result(Value::String("1020".to_string()))));
636
637 let request = Request {
640 id : None,
641 method : "sample_fn".into(),
642 params : request.params.clone(),
643 };
644 eh.handle_incoming_request(request);
645
646 let params = new_sample_params(123, 66);
649 eh.endpoint.send_notification("sample_fn", params.clone()).unwrap();
650
651 eh.endpoint.send_notification("async_method", params.clone()).unwrap();
652
653 assert_eq!(*eh.endpoint.id_counter.lock().unwrap(), 0);
654
655 let my_method = "sample_fn".to_string();
656 let future : RequestFuture<String, ()> = eh.endpoint.send_request(&my_method, params.clone()).unwrap();
657
658 assert_eq!(*eh.endpoint.id_counter.lock().unwrap(), 1);
659
660 let mut spawn = futures::task::spawn(future);
662 let poll = spawn.poll_future(noop_unpark());
663 assert_eq!(poll, Ok(Async::NotReady));
664
665 let expected_result = "sample_fn result".to_string();
667 let id = Id::Number(1);
668 let response = Response::new_result(id, Value::String(expected_result.clone()));
669 eh.handle_incoming_message(&to_json(&response));
670
671 let result : Result<RequestResult<String, ()>, _> = spawn.wait_future();
673 assert_eq!(result.unwrap(), RequestResult::MethodResult(Ok(expected_result)));
674
675 eh.endpoint.request_shutdown();
676 }
677
678 pub fn noop_unpark() -> Arc<Unpark> {
679 struct Foo;
680
681 impl Unpark for Foo {
682 fn unpark(&self) {}
683 }
684
685 Arc::new(Foo)
686 }
687
688}
689
690