1#![allow(clippy::boxed_local, clippy::or_fun_call)]
25
26#[macro_use]
27extern crate serde_json;
28#[macro_use]
29extern crate serde_derive;
30extern crate crossbeam;
31extern crate serde;
32extern crate xi_trace;
33
34#[macro_use]
35extern crate log;
36
37mod error;
38mod parse;
39
40pub mod test_utils;
41
42use std::cmp;
43use std::collections::{BTreeMap, BinaryHeap, VecDeque};
44use std::io::{self, BufRead, Write};
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46use std::sync::mpsc;
47use std::sync::{Arc, Condvar, Mutex};
48use std::thread;
49use std::time::{Duration, Instant};
50
51use serde::de::DeserializeOwned;
52use serde_json::Value;
53
54use xi_trace::{trace, trace_block, trace_block_payload, trace_payload};
55
56pub use crate::error::{Error, ReadError, RemoteError};
57use crate::parse::{Call, MessageReader, Response, RpcObject};
58
59const MAX_IDLE_WAIT: Duration = Duration::from_millis(5);
61
62pub struct RawPeer<W: Write + 'static>(Arc<RpcState<W>>);
71
72pub trait Peer: Send + 'static {
75 fn box_clone(&self) -> Box<dyn Peer>;
79 fn send_rpc_notification(&self, method: &str, params: &Value);
81 fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>);
87 fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>;
89 fn request_is_pending(&self) -> bool;
93 fn schedule_idle(&self, token: usize);
97 fn schedule_timer(&self, after: Instant, token: usize);
105}
106
107pub type RpcPeer = Box<dyn Peer>;
109
110pub struct RpcCtx {
111 peer: RpcPeer,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct RpcCall {
120 pub method: String,
121 pub params: Value,
122}
123
124pub trait Handler {
130 type Notification: DeserializeOwned;
131 type Request: DeserializeOwned;
132 fn handle_notification(&mut self, ctx: &RpcCtx, rpc: Self::Notification);
133 fn handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>;
134 #[allow(unused_variables)]
135 fn idle(&mut self, ctx: &RpcCtx, token: usize) {}
136}
137
138pub trait Callback: Send {
139 fn call(self: Box<Self>, result: Result<Value, Error>);
140}
141
142impl<F: Send + FnOnce(Result<Value, Error>)> Callback for F {
143 fn call(self: Box<F>, result: Result<Value, Error>) {
144 (*self)(result)
145 }
146}
147
148struct PanicGuard<'a, W: Write + 'static>(&'a RawPeer<W>);
151
152impl<'a, W: Write + 'static> Drop for PanicGuard<'a, W> {
153 fn drop(&mut self) {
154 if thread::panicking() {
155 error!("panic guard hit, closing runloop");
156 self.0.disconnect();
157 }
158 }
159}
160
161trait IdleProc: Send {
162 fn call(self: Box<Self>, token: usize);
163}
164
165impl<F: Send + FnOnce(usize)> IdleProc for F {
166 fn call(self: Box<F>, token: usize) {
167 (*self)(token)
168 }
169}
170
171enum ResponseHandler {
172 Chan(mpsc::Sender<Result<Value, Error>>),
173 Callback(Box<dyn Callback>),
174}
175
176impl ResponseHandler {
177 fn invoke(self, result: Result<Value, Error>) {
178 match self {
179 ResponseHandler::Chan(tx) => {
180 let _ = tx.send(result);
181 }
182 ResponseHandler::Callback(f) => f.call(result),
183 }
184 }
185}
186
187#[derive(Debug, PartialEq, Eq)]
188struct Timer {
189 fire_after: Instant,
190 token: usize,
191}
192
193struct RpcState<W: Write> {
194 rx_queue: Mutex<VecDeque<Result<RpcObject, ReadError>>>,
195 rx_cvar: Condvar,
196 writer: Mutex<W>,
197 id: AtomicUsize,
198 pending: Mutex<BTreeMap<usize, ResponseHandler>>,
199 idle_queue: Mutex<VecDeque<usize>>,
200 timers: Mutex<BinaryHeap<Timer>>,
201 needs_exit: AtomicBool,
202 is_blocked: AtomicBool,
203}
204
205pub struct RpcLoop<W: Write + 'static> {
207 reader: MessageReader,
208 peer: RawPeer<W>,
209}
210
211impl<W: Write + Send> RpcLoop<W> {
212 pub fn new(writer: W) -> Self {
215 let rpc_peer = RawPeer(Arc::new(RpcState {
216 rx_queue: Mutex::new(VecDeque::new()),
217 rx_cvar: Condvar::new(),
218 writer: Mutex::new(writer),
219 id: AtomicUsize::new(0),
220 pending: Mutex::new(BTreeMap::new()),
221 idle_queue: Mutex::new(VecDeque::new()),
222 timers: Mutex::new(BinaryHeap::new()),
223 needs_exit: AtomicBool::new(false),
224 is_blocked: AtomicBool::new(false),
225 }));
226 RpcLoop { reader: MessageReader::default(), peer: rpc_peer }
227 }
228
229 pub fn get_raw_peer(&self) -> RawPeer<W> {
231 self.peer.clone()
232 }
233
234 pub fn mainloop<R, RF, H>(&mut self, rf: RF, handler: &mut H) -> Result<(), ReadError>
252 where
253 R: BufRead,
254 RF: Send + FnOnce() -> R,
255 H: Handler,
256 {
257 let exit = crossbeam::scope(|scope| {
258 let peer = self.get_raw_peer();
259 peer.reset_needs_exit();
260
261 let ctx = RpcCtx { peer: Box::new(peer.clone()) };
262 scope.spawn(move |_| {
263 let mut stream = rf();
264 loop {
265 if self.peer.needs_exit() {
268 trace("read loop exit", &["rpc"]);
269 break;
270 }
271
272 let json = match self.reader.next(&mut stream) {
273 Ok(json) => json,
274 Err(err) => {
275 if self.peer.0.is_blocked.load(Ordering::Acquire) {
276 error!("failed to parse response json: {}", err);
277 self.peer.disconnect();
278 }
279 self.peer.put_rx(Err(err));
280 break;
281 }
282 };
283 if json.is_response() {
284 let id = json.get_id().unwrap();
285 let _resp =
286 trace_block_payload("read loop response", &["rpc"], format!("{}", id));
287 match json.into_response() {
288 Ok(resp) => {
289 let resp = resp.map_err(Error::from);
290 self.peer.handle_response(id, resp);
291 }
292 Err(msg) => {
293 error!("failed to parse response: {}", msg);
294 self.peer.handle_response(id, Err(Error::InvalidResponse));
295 }
296 }
297 } else {
298 self.peer.put_rx(Ok(json));
299 }
300 }
301 });
302
303 loop {
304 let _guard = PanicGuard(&peer);
305 let read_result = next_read(&peer, handler, &ctx);
306 let _trace = trace_block("main got msg", &["rpc"]);
307
308 let json = match read_result {
309 Ok(json) => json,
310 Err(err) => {
311 trace_payload("main loop err", &["rpc"], err.to_string());
312 if let Some(idle_token) = peer.try_get_idle() {
315 handler.idle(&ctx, idle_token);
316 }
317 peer.disconnect();
318 return err;
319 }
320 };
321
322 let method = json.get_method().map(String::from);
323 match json.into_rpc::<H::Notification, H::Request>() {
324 Ok(Call::Request(id, cmd)) => {
325 let _t = trace_block_payload("handle request", &["rpc"], method.unwrap());
326 let result = handler.handle_request(&ctx, cmd);
327 peer.respond(result, id);
328 }
329 Ok(Call::Notification(cmd)) => {
330 let _t = trace_block_payload("handle notif", &["rpc"], method.unwrap());
331 handler.handle_notification(&ctx, cmd);
332 }
333 Ok(Call::InvalidRequest(id, err)) => peer.respond(Err(err), id),
334 Err(err) => {
335 trace_payload("read loop exit", &["rpc"], err.to_string());
336 peer.disconnect();
337 return ReadError::UnknownRequest(err);
338 }
339 }
340 }
341 })
342 .unwrap();
343
344 if exit.is_disconnect() {
345 Ok(())
346 } else {
347 Err(exit)
348 }
349 }
350}
351
352fn next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError>
355where
356 W: Write + Send,
357 H: Handler,
358{
359 loop {
360 if let Some(result) = peer.try_get_rx() {
361 return result;
362 }
363 let time_to_next_timer = match peer.check_timers() {
365 Some(Ok(token)) => {
366 do_idle(handler, ctx, token);
367 continue;
368 }
369 Some(Err(duration)) => Some(duration),
370 None => None,
371 };
372
373 if let Some(idle_token) = peer.try_get_idle() {
374 do_idle(handler, ctx, idle_token);
375 continue;
376 }
377
378 let idle_timeout = time_to_next_timer.unwrap_or(MAX_IDLE_WAIT).min(MAX_IDLE_WAIT);
381
382 if let Some(result) = peer.get_rx_timeout(idle_timeout) {
383 return result;
384 }
385 }
386}
387
388fn do_idle<H: Handler>(handler: &mut H, ctx: &RpcCtx, token: usize) {
389 let _trace = trace_block_payload("do_idle", &["rpc"], format!("token: {}", token));
390 handler.idle(ctx, token);
391}
392
393impl RpcCtx {
394 pub fn get_peer(&self) -> &RpcPeer {
395 &self.peer
396 }
397
398 pub fn schedule_idle(&self, token: usize) {
400 self.peer.schedule_idle(token)
401 }
402}
403
404impl<W: Write + Send + 'static> Peer for RawPeer<W> {
405 fn box_clone(&self) -> Box<dyn Peer> {
406 Box::new((*self).clone())
407 }
408
409 fn send_rpc_notification(&self, method: &str, params: &Value) {
410 let _trace = trace_block_payload("send notif", &["rpc"], method.to_owned());
411 if let Err(e) = self.send(&json!({
412 "method": method,
413 "params": params,
414 })) {
415 error!("send error on send_rpc_notification method {}: {}", method, e);
416 }
417 }
418
419 fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>) {
420 let _trace = trace_block_payload("send req async", &["rpc"], method.to_owned());
421 self.send_rpc_request_common(method, params, ResponseHandler::Callback(f));
422 }
423
424 fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
425 let _trace = trace_block_payload("send req sync", &["rpc"], method.to_owned());
426 self.0.is_blocked.store(true, Ordering::Release);
427 let (tx, rx) = mpsc::channel();
428 self.send_rpc_request_common(method, params, ResponseHandler::Chan(tx));
429 rx.recv().unwrap_or(Err(Error::PeerDisconnect))
430 }
431
432 fn request_is_pending(&self) -> bool {
433 let queue = self.0.rx_queue.lock().unwrap();
434 !queue.is_empty()
435 }
436
437 fn schedule_idle(&self, token: usize) {
438 self.0.idle_queue.lock().unwrap().push_back(token);
439 }
440
441 fn schedule_timer(&self, after: Instant, token: usize) {
442 self.0.timers.lock().unwrap().push(Timer { fire_after: after, token });
443 }
444}
445
446impl<W: Write> RawPeer<W> {
447 fn send(&self, v: &Value) -> Result<(), io::Error> {
448 let _trace = trace_block("send", &["rpc"]);
449 let mut s = serde_json::to_string(v).unwrap();
450 s.push('\n');
451 self.0.writer.lock().unwrap().write_all(s.as_bytes())
452 }
454
455 fn respond(&self, result: Response, id: u64) {
456 let mut response = json!({ "id": id });
457 match result {
458 Ok(result) => response["result"] = result,
459 Err(error) => response["error"] = json!(error),
460 };
461 if let Err(e) = self.send(&response) {
462 error!("error {} sending response to RPC {:?}", e, id);
463 }
464 }
465
466 fn send_rpc_request_common(&self, method: &str, params: &Value, rh: ResponseHandler) {
467 let id = self.0.id.fetch_add(1, Ordering::Relaxed);
468 {
469 let mut pending = self.0.pending.lock().unwrap();
470 pending.insert(id, rh);
471 }
472 if let Err(e) = self.send(&json!({
473 "id": id,
474 "method": method,
475 "params": params,
476 })) {
477 let mut pending = self.0.pending.lock().unwrap();
478 if let Some(rh) = pending.remove(&id) {
479 rh.invoke(Err(Error::Io(e)));
480 }
481 }
482 }
483
484 fn handle_response(&self, id: u64, resp: Result<Value, Error>) {
485 let id = id as usize;
486 let handler = {
487 let mut pending = self.0.pending.lock().unwrap();
488 pending.remove(&id)
489 };
490 match handler {
491 Some(responsehandler) => responsehandler.invoke(resp),
492 None => warn!("id {} not found in pending", id),
493 }
494 }
495
496 fn try_get_rx(&self) -> Option<Result<RpcObject, ReadError>> {
498 let mut queue = self.0.rx_queue.lock().unwrap();
499 queue.pop_front()
500 }
501
502 fn get_rx_timeout(&self, dur: Duration) -> Option<Result<RpcObject, ReadError>> {
505 let mut queue = self.0.rx_queue.lock().unwrap();
506 let result = self.0.rx_cvar.wait_timeout(queue, dur).unwrap();
507 queue = result.0;
508 queue.pop_front()
509 }
510
511 fn put_rx(&self, json: Result<RpcObject, ReadError>) {
514 let mut queue = self.0.rx_queue.lock().unwrap();
515 queue.push_back(json);
516 self.0.rx_cvar.notify_one();
517 }
518
519 fn try_get_idle(&self) -> Option<usize> {
520 self.0.idle_queue.lock().unwrap().pop_front()
521 }
522
523 fn check_timers(&self) -> Option<Result<usize, Duration>> {
529 let mut timers = self.0.timers.lock().unwrap();
530 match timers.peek() {
531 None => return None,
532 Some(t) => {
533 let now = Instant::now();
534 if t.fire_after > now {
535 return Some(Err(t.fire_after - now));
536 }
537 }
538 }
539 Some(Ok(timers.pop().unwrap().token))
540 }
541
542 fn disconnect(&self) {
544 let mut pending = self.0.pending.lock().unwrap();
545 let ids = pending.keys().cloned().collect::<Vec<_>>();
546 for id in &ids {
547 let callback = pending.remove(id).unwrap();
548 callback.invoke(Err(Error::PeerDisconnect));
549 }
550 self.0.needs_exit.store(true, Ordering::Relaxed);
551 }
552
553 fn needs_exit(&self) -> bool {
555 self.0.needs_exit.load(Ordering::Relaxed)
556 }
557
558 fn reset_needs_exit(&self) {
559 self.0.needs_exit.store(false, Ordering::SeqCst);
560 }
561}
562
563impl Clone for Box<dyn Peer> {
564 fn clone(&self) -> Box<dyn Peer> {
565 self.box_clone()
566 }
567}
568
569impl<W: Write> Clone for RawPeer<W> {
570 fn clone(&self) -> Self {
571 RawPeer(self.0.clone())
572 }
573}
574
575impl Ord for Timer {
578 fn cmp(&self, other: &Timer) -> cmp::Ordering {
579 other.fire_after.cmp(&self.fire_after)
580 }
581}
582
583impl PartialOrd for Timer {
584 fn partial_cmp(&self, other: &Timer) -> Option<cmp::Ordering> {
585 Some(self.cmp(other))
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_parse_notif() {
595 let reader = MessageReader::default();
596 let json = reader.parse(r#"{"method": "hi", "params": {"words": "plz"}}"#).unwrap();
597 assert!(!json.is_response());
598 let rpc = json.into_rpc::<Value, Value>().unwrap();
599 match rpc {
600 Call::Notification(_) => (),
601 _ => panic!("parse failed"),
602 }
603 }
604
605 #[test]
606 fn test_parse_req() {
607 let reader = MessageReader::default();
608 let json =
609 reader.parse(r#"{"id": 5, "method": "hi", "params": {"words": "plz"}}"#).unwrap();
610 assert!(!json.is_response());
611 let rpc = json.into_rpc::<Value, Value>().unwrap();
612 match rpc {
613 Call::Request(..) => (),
614 _ => panic!("parse failed"),
615 }
616 }
617
618 #[test]
619 fn test_parse_bad_json() {
620 let reader = MessageReader::default();
622 let json =
623 reader.parse(r#"{"id": 5, "method": "hi", params: {"words": "plz"}}"#).err().unwrap();
624
625 match json {
626 ReadError::Json(..) => (),
627 _ => panic!("parse failed"),
628 }
629 let json = reader.parse(r#"[5, "hi", {"arg": "val"}]"#).err().unwrap();
631
632 match json {
633 ReadError::NotObject => (),
634 _ => panic!("parse failed"),
635 }
636 }
637}