xi_rpc/
lib.rs

1// Copyright 2016 The xi-editor Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Generic RPC handling (used for both front end and plugin communication).
16//!
17//! The RPC protocol is based on [JSON-RPC](http://www.jsonrpc.org/specification),
18//! but with some modifications. Unlike JSON-RPC 2.0, requests and notifications
19//! are allowed in both directions, rather than imposing client and server roles.
20//! Further, the batch form is not supported.
21//!
22//! Because these changes make the protocol not fully compliant with the spec,
23//! the `"jsonrpc"` member is omitted from request and response objects.
24#![allow(clippy::boxed_local, clippy::or_fun_call)]
25
26#[macro_use]
27extern crate serde_json;
28#[macro_use]
29extern crate serde_derive;
30extern crate crossbeam;
31extern crate serde;
32extern crate xi_trace;
33
34#[macro_use]
35extern crate log;
36
37mod error;
38mod parse;
39
40pub mod test_utils;
41
42use std::cmp;
43use std::collections::{BTreeMap, BinaryHeap, VecDeque};
44use std::io::{self, BufRead, Write};
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46use std::sync::mpsc;
47use std::sync::{Arc, Condvar, Mutex};
48use std::thread;
49use std::time::{Duration, Instant};
50
51use serde::de::DeserializeOwned;
52use serde_json::Value;
53
54use xi_trace::{trace, trace_block, trace_block_payload, trace_payload};
55
56pub use crate::error::{Error, ReadError, RemoteError};
57use crate::parse::{Call, MessageReader, Response, RpcObject};
58
59/// The maximum duration we will block on a reader before checking for an task.
60const MAX_IDLE_WAIT: Duration = Duration::from_millis(5);
61
62/// An interface to access the other side of the RPC channel. The main purpose
63/// is to send RPC requests and notifications to the peer.
64///
65/// A single shared `RawPeer` exists for each `RpcLoop`; a reference can
66/// be taken with `RpcLoop::get_peer()`.
67///
68/// In general, `RawPeer` shouldn't be used directly, but behind a pointer as
69/// the `Peer` trait object.
70pub struct RawPeer<W: Write + 'static>(Arc<RpcState<W>>);
71
72/// The `Peer` trait represents the interface for the other side of the RPC
73/// channel. It is intended to be used behind a pointer, a trait object.
74pub trait Peer: Send + 'static {
75    /// Used to implement `clone` in an object-safe way.
76    /// For an explanation on this approach, see
77    /// [this thread](https://users.rust-lang.org/t/solved-is-it-possible-to-clone-a-boxed-trait-object/1714/6).
78    fn box_clone(&self) -> Box<dyn Peer>;
79    /// Sends a notification (asynchronous RPC) to the peer.
80    fn send_rpc_notification(&self, method: &str, params: &Value);
81    /// Sends a request asynchronously, and the supplied callback will
82    /// be called when the response arrives.
83    ///
84    /// `Callback` is an alias for `FnOnce(Result<Value, Error>)`; it must
85    /// be boxed because trait objects cannot use generic paramaters.
86    fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>);
87    /// Sends a request (synchronous RPC) to the peer, and waits for the result.
88    fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error>;
89    /// Determines whether an incoming request (or notification) is
90    /// pending. This is intended to reduce latency for bulk operations
91    /// done in the background.
92    fn request_is_pending(&self) -> bool;
93    /// Adds a token to the idle queue. When the runloop is idle and the
94    /// queue is not empty, the handler's `idle` fn will be called
95    /// with the earliest added token.
96    fn schedule_idle(&self, token: usize);
97    /// Like `schedule_idle`, with the guarantee that the handler's `idle`
98    /// fn will not be called _before_ the provided `Instant`.
99    ///
100    /// # Note
101    ///
102    /// This is not intended as a high-fidelity timer. Regular RPC messages
103    /// will always take priority over an idle task.
104    fn schedule_timer(&self, after: Instant, token: usize);
105}
106
107/// The `Peer` trait object.
108pub type RpcPeer = Box<dyn Peer>;
109
110pub struct RpcCtx {
111    peer: RpcPeer,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115/// An RPC command.
116///
117/// This type is used as a placeholder in various places, and can be
118/// used by clients as a catchall type for implementing `MethodHandler`.
119pub struct RpcCall {
120    pub method: String,
121    pub params: Value,
122}
123
124/// A trait for types which can handle RPCs.
125///
126/// Types which implement `MethodHandler` are also responsible for implementing
127/// `Parser`; `Parser` is provided when Self::Notification and Self::Request
128/// can be used with serde::DeserializeOwned.
129pub trait Handler {
130    type Notification: DeserializeOwned;
131    type Request: DeserializeOwned;
132    fn handle_notification(&mut self, ctx: &RpcCtx, rpc: Self::Notification);
133    fn handle_request(&mut self, ctx: &RpcCtx, rpc: Self::Request) -> Result<Value, RemoteError>;
134    #[allow(unused_variables)]
135    fn idle(&mut self, ctx: &RpcCtx, token: usize) {}
136}
137
138pub trait Callback: Send {
139    fn call(self: Box<Self>, result: Result<Value, Error>);
140}
141
142impl<F: Send + FnOnce(Result<Value, Error>)> Callback for F {
143    fn call(self: Box<F>, result: Result<Value, Error>) {
144        (*self)(result)
145    }
146}
147
148/// A helper type which shuts down the runloop if a panic occurs while
149/// handling an RPC.
150struct PanicGuard<'a, W: Write + 'static>(&'a RawPeer<W>);
151
152impl<'a, W: Write + 'static> Drop for PanicGuard<'a, W> {
153    fn drop(&mut self) {
154        if thread::panicking() {
155            error!("panic guard hit, closing runloop");
156            self.0.disconnect();
157        }
158    }
159}
160
161trait IdleProc: Send {
162    fn call(self: Box<Self>, token: usize);
163}
164
165impl<F: Send + FnOnce(usize)> IdleProc for F {
166    fn call(self: Box<F>, token: usize) {
167        (*self)(token)
168    }
169}
170
171enum ResponseHandler {
172    Chan(mpsc::Sender<Result<Value, Error>>),
173    Callback(Box<dyn Callback>),
174}
175
176impl ResponseHandler {
177    fn invoke(self, result: Result<Value, Error>) {
178        match self {
179            ResponseHandler::Chan(tx) => {
180                let _ = tx.send(result);
181            }
182            ResponseHandler::Callback(f) => f.call(result),
183        }
184    }
185}
186
187#[derive(Debug, PartialEq, Eq)]
188struct Timer {
189    fire_after: Instant,
190    token: usize,
191}
192
193struct RpcState<W: Write> {
194    rx_queue: Mutex<VecDeque<Result<RpcObject, ReadError>>>,
195    rx_cvar: Condvar,
196    writer: Mutex<W>,
197    id: AtomicUsize,
198    pending: Mutex<BTreeMap<usize, ResponseHandler>>,
199    idle_queue: Mutex<VecDeque<usize>>,
200    timers: Mutex<BinaryHeap<Timer>>,
201    needs_exit: AtomicBool,
202    is_blocked: AtomicBool,
203}
204
205/// A structure holding the state of a main loop for handling RPC's.
206pub struct RpcLoop<W: Write + 'static> {
207    reader: MessageReader,
208    peer: RawPeer<W>,
209}
210
211impl<W: Write + Send> RpcLoop<W> {
212    /// Creates a new `RpcLoop` with the given output stream (which is used for
213    /// sending requests and notifications, as well as responses).
214    pub fn new(writer: W) -> Self {
215        let rpc_peer = RawPeer(Arc::new(RpcState {
216            rx_queue: Mutex::new(VecDeque::new()),
217            rx_cvar: Condvar::new(),
218            writer: Mutex::new(writer),
219            id: AtomicUsize::new(0),
220            pending: Mutex::new(BTreeMap::new()),
221            idle_queue: Mutex::new(VecDeque::new()),
222            timers: Mutex::new(BinaryHeap::new()),
223            needs_exit: AtomicBool::new(false),
224            is_blocked: AtomicBool::new(false),
225        }));
226        RpcLoop { reader: MessageReader::default(), peer: rpc_peer }
227    }
228
229    /// Gets a reference to the peer.
230    pub fn get_raw_peer(&self) -> RawPeer<W> {
231        self.peer.clone()
232    }
233
234    /// Starts the event loop, reading lines from the reader until EOF,
235    /// or an error occurs.
236    ///
237    /// Returns `Ok()` in the EOF case, otherwise returns the
238    /// underlying `ReadError`.
239    ///
240    /// # Note:
241    /// The reader is supplied via a closure, as basically a workaround
242    /// so that the reader doesn't have to be `Send`. Internally, the
243    /// main loop starts a separate thread for I/O, and at startup that
244    /// thread calls the given closure.
245    ///
246    /// Calls to the handler happen on the caller's thread.
247    ///
248    /// Calls to the handler are guaranteed to preserve the order as
249    /// they appear on on the channel. At the moment, there is no way
250    /// for there to be more than one incoming request to be outstanding.
251    pub fn mainloop<R, RF, H>(&mut self, rf: RF, handler: &mut H) -> Result<(), ReadError>
252    where
253        R: BufRead,
254        RF: Send + FnOnce() -> R,
255        H: Handler,
256    {
257        let exit = crossbeam::scope(|scope| {
258            let peer = self.get_raw_peer();
259            peer.reset_needs_exit();
260
261            let ctx = RpcCtx { peer: Box::new(peer.clone()) };
262            scope.spawn(move |_| {
263                let mut stream = rf();
264                loop {
265                    // The main thread cannot return while this thread is active;
266                    // when the main thread wants to exit it sets this flag.
267                    if self.peer.needs_exit() {
268                        trace("read loop exit", &["rpc"]);
269                        break;
270                    }
271
272                    let json = match self.reader.next(&mut stream) {
273                        Ok(json) => json,
274                        Err(err) => {
275                            if self.peer.0.is_blocked.load(Ordering::Acquire) {
276                                error!("failed to parse response json: {}", err);
277                                self.peer.disconnect();
278                            }
279                            self.peer.put_rx(Err(err));
280                            break;
281                        }
282                    };
283                    if json.is_response() {
284                        let id = json.get_id().unwrap();
285                        let _resp =
286                            trace_block_payload("read loop response", &["rpc"], format!("{}", id));
287                        match json.into_response() {
288                            Ok(resp) => {
289                                let resp = resp.map_err(Error::from);
290                                self.peer.handle_response(id, resp);
291                            }
292                            Err(msg) => {
293                                error!("failed to parse response: {}", msg);
294                                self.peer.handle_response(id, Err(Error::InvalidResponse));
295                            }
296                        }
297                    } else {
298                        self.peer.put_rx(Ok(json));
299                    }
300                }
301            });
302
303            loop {
304                let _guard = PanicGuard(&peer);
305                let read_result = next_read(&peer, handler, &ctx);
306                let _trace = trace_block("main got msg", &["rpc"]);
307
308                let json = match read_result {
309                    Ok(json) => json,
310                    Err(err) => {
311                        trace_payload("main loop err", &["rpc"], err.to_string());
312                        // finish idle work before disconnecting;
313                        // this is mostly useful for integration tests.
314                        if let Some(idle_token) = peer.try_get_idle() {
315                            handler.idle(&ctx, idle_token);
316                        }
317                        peer.disconnect();
318                        return err;
319                    }
320                };
321
322                let method = json.get_method().map(String::from);
323                match json.into_rpc::<H::Notification, H::Request>() {
324                    Ok(Call::Request(id, cmd)) => {
325                        let _t = trace_block_payload("handle request", &["rpc"], method.unwrap());
326                        let result = handler.handle_request(&ctx, cmd);
327                        peer.respond(result, id);
328                    }
329                    Ok(Call::Notification(cmd)) => {
330                        let _t = trace_block_payload("handle notif", &["rpc"], method.unwrap());
331                        handler.handle_notification(&ctx, cmd);
332                    }
333                    Ok(Call::InvalidRequest(id, err)) => peer.respond(Err(err), id),
334                    Err(err) => {
335                        trace_payload("read loop exit", &["rpc"], err.to_string());
336                        peer.disconnect();
337                        return ReadError::UnknownRequest(err);
338                    }
339                }
340            }
341        })
342        .unwrap();
343
344        if exit.is_disconnect() {
345            Ok(())
346        } else {
347            Err(exit)
348        }
349    }
350}
351
352/// Returns the next read result, checking for idle work when no
353/// result is available.
354fn next_read<W, H>(peer: &RawPeer<W>, handler: &mut H, ctx: &RpcCtx) -> Result<RpcObject, ReadError>
355where
356    W: Write + Send,
357    H: Handler,
358{
359    loop {
360        if let Some(result) = peer.try_get_rx() {
361            return result;
362        }
363        // handle timers before general idle work
364        let time_to_next_timer = match peer.check_timers() {
365            Some(Ok(token)) => {
366                do_idle(handler, ctx, token);
367                continue;
368            }
369            Some(Err(duration)) => Some(duration),
370            None => None,
371        };
372
373        if let Some(idle_token) = peer.try_get_idle() {
374            do_idle(handler, ctx, idle_token);
375            continue;
376        }
377
378        // we don't want to block indefinitely if there's no current idle work,
379        // because idle work could be scheduled from another thread.
380        let idle_timeout = time_to_next_timer.unwrap_or(MAX_IDLE_WAIT).min(MAX_IDLE_WAIT);
381
382        if let Some(result) = peer.get_rx_timeout(idle_timeout) {
383            return result;
384        }
385    }
386}
387
388fn do_idle<H: Handler>(handler: &mut H, ctx: &RpcCtx, token: usize) {
389    let _trace = trace_block_payload("do_idle", &["rpc"], format!("token: {}", token));
390    handler.idle(ctx, token);
391}
392
393impl RpcCtx {
394    pub fn get_peer(&self) -> &RpcPeer {
395        &self.peer
396    }
397
398    /// Schedule the idle handler to be run when there are no requests pending.
399    pub fn schedule_idle(&self, token: usize) {
400        self.peer.schedule_idle(token)
401    }
402}
403
404impl<W: Write + Send + 'static> Peer for RawPeer<W> {
405    fn box_clone(&self) -> Box<dyn Peer> {
406        Box::new((*self).clone())
407    }
408
409    fn send_rpc_notification(&self, method: &str, params: &Value) {
410        let _trace = trace_block_payload("send notif", &["rpc"], method.to_owned());
411        if let Err(e) = self.send(&json!({
412            "method": method,
413            "params": params,
414        })) {
415            error!("send error on send_rpc_notification method {}: {}", method, e);
416        }
417    }
418
419    fn send_rpc_request_async(&self, method: &str, params: &Value, f: Box<dyn Callback>) {
420        let _trace = trace_block_payload("send req async", &["rpc"], method.to_owned());
421        self.send_rpc_request_common(method, params, ResponseHandler::Callback(f));
422    }
423
424    fn send_rpc_request(&self, method: &str, params: &Value) -> Result<Value, Error> {
425        let _trace = trace_block_payload("send req sync", &["rpc"], method.to_owned());
426        self.0.is_blocked.store(true, Ordering::Release);
427        let (tx, rx) = mpsc::channel();
428        self.send_rpc_request_common(method, params, ResponseHandler::Chan(tx));
429        rx.recv().unwrap_or(Err(Error::PeerDisconnect))
430    }
431
432    fn request_is_pending(&self) -> bool {
433        let queue = self.0.rx_queue.lock().unwrap();
434        !queue.is_empty()
435    }
436
437    fn schedule_idle(&self, token: usize) {
438        self.0.idle_queue.lock().unwrap().push_back(token);
439    }
440
441    fn schedule_timer(&self, after: Instant, token: usize) {
442        self.0.timers.lock().unwrap().push(Timer { fire_after: after, token });
443    }
444}
445
446impl<W: Write> RawPeer<W> {
447    fn send(&self, v: &Value) -> Result<(), io::Error> {
448        let _trace = trace_block("send", &["rpc"]);
449        let mut s = serde_json::to_string(v).unwrap();
450        s.push('\n');
451        self.0.writer.lock().unwrap().write_all(s.as_bytes())
452        // Technically, maybe we should flush here, but doesn't seem to be required.
453    }
454
455    fn respond(&self, result: Response, id: u64) {
456        let mut response = json!({ "id": id });
457        match result {
458            Ok(result) => response["result"] = result,
459            Err(error) => response["error"] = json!(error),
460        };
461        if let Err(e) = self.send(&response) {
462            error!("error {} sending response to RPC {:?}", e, id);
463        }
464    }
465
466    fn send_rpc_request_common(&self, method: &str, params: &Value, rh: ResponseHandler) {
467        let id = self.0.id.fetch_add(1, Ordering::Relaxed);
468        {
469            let mut pending = self.0.pending.lock().unwrap();
470            pending.insert(id, rh);
471        }
472        if let Err(e) = self.send(&json!({
473            "id": id,
474            "method": method,
475            "params": params,
476        })) {
477            let mut pending = self.0.pending.lock().unwrap();
478            if let Some(rh) = pending.remove(&id) {
479                rh.invoke(Err(Error::Io(e)));
480            }
481        }
482    }
483
484    fn handle_response(&self, id: u64, resp: Result<Value, Error>) {
485        let id = id as usize;
486        let handler = {
487            let mut pending = self.0.pending.lock().unwrap();
488            pending.remove(&id)
489        };
490        match handler {
491            Some(responsehandler) => responsehandler.invoke(resp),
492            None => warn!("id {} not found in pending", id),
493        }
494    }
495
496    /// Get a message from the receive queue if available.
497    fn try_get_rx(&self) -> Option<Result<RpcObject, ReadError>> {
498        let mut queue = self.0.rx_queue.lock().unwrap();
499        queue.pop_front()
500    }
501
502    /// Get a message from the receive queue, waiting for at most `Duration`
503    /// and returning `None` if no message is available.
504    fn get_rx_timeout(&self, dur: Duration) -> Option<Result<RpcObject, ReadError>> {
505        let mut queue = self.0.rx_queue.lock().unwrap();
506        let result = self.0.rx_cvar.wait_timeout(queue, dur).unwrap();
507        queue = result.0;
508        queue.pop_front()
509    }
510
511    /// Adds a message to the receive queue. The message should only
512    /// be `None` if the read thread is exiting.
513    fn put_rx(&self, json: Result<RpcObject, ReadError>) {
514        let mut queue = self.0.rx_queue.lock().unwrap();
515        queue.push_back(json);
516        self.0.rx_cvar.notify_one();
517    }
518
519    fn try_get_idle(&self) -> Option<usize> {
520        self.0.idle_queue.lock().unwrap().pop_front()
521    }
522
523    /// Checks status of the most imminent timer. If that timer has expired,
524    /// returns `Some(Ok(_))`, with the corresponding token.
525    /// If a timer exists but has not expired, returns `Some(Err(_))`,
526    /// with the error value being the `Duration` until the timer is ready.
527    /// Returns `None` if no timers are registered.
528    fn check_timers(&self) -> Option<Result<usize, Duration>> {
529        let mut timers = self.0.timers.lock().unwrap();
530        match timers.peek() {
531            None => return None,
532            Some(t) => {
533                let now = Instant::now();
534                if t.fire_after > now {
535                    return Some(Err(t.fire_after - now));
536                }
537            }
538        }
539        Some(Ok(timers.pop().unwrap().token))
540    }
541
542    /// send disconnect error to pending requests.
543    fn disconnect(&self) {
544        let mut pending = self.0.pending.lock().unwrap();
545        let ids = pending.keys().cloned().collect::<Vec<_>>();
546        for id in &ids {
547            let callback = pending.remove(id).unwrap();
548            callback.invoke(Err(Error::PeerDisconnect));
549        }
550        self.0.needs_exit.store(true, Ordering::Relaxed);
551    }
552
553    /// Returns `true` if an error has occured in the main thread.
554    fn needs_exit(&self) -> bool {
555        self.0.needs_exit.load(Ordering::Relaxed)
556    }
557
558    fn reset_needs_exit(&self) {
559        self.0.needs_exit.store(false, Ordering::SeqCst);
560    }
561}
562
563impl Clone for Box<dyn Peer> {
564    fn clone(&self) -> Box<dyn Peer> {
565        self.box_clone()
566    }
567}
568
569impl<W: Write> Clone for RawPeer<W> {
570    fn clone(&self) -> Self {
571        RawPeer(self.0.clone())
572    }
573}
574
575//NOTE: for our timers to work with Rust's BinaryHeap we want to reverse
576//the default comparison; smaller `Instant`'s are considered 'greater'.
577impl Ord for Timer {
578    fn cmp(&self, other: &Timer) -> cmp::Ordering {
579        other.fire_after.cmp(&self.fire_after)
580    }
581}
582
583impl PartialOrd for Timer {
584    fn partial_cmp(&self, other: &Timer) -> Option<cmp::Ordering> {
585        Some(self.cmp(other))
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[test]
594    fn test_parse_notif() {
595        let reader = MessageReader::default();
596        let json = reader.parse(r#"{"method": "hi", "params": {"words": "plz"}}"#).unwrap();
597        assert!(!json.is_response());
598        let rpc = json.into_rpc::<Value, Value>().unwrap();
599        match rpc {
600            Call::Notification(_) => (),
601            _ => panic!("parse failed"),
602        }
603    }
604
605    #[test]
606    fn test_parse_req() {
607        let reader = MessageReader::default();
608        let json =
609            reader.parse(r#"{"id": 5, "method": "hi", "params": {"words": "plz"}}"#).unwrap();
610        assert!(!json.is_response());
611        let rpc = json.into_rpc::<Value, Value>().unwrap();
612        match rpc {
613            Call::Request(..) => (),
614            _ => panic!("parse failed"),
615        }
616    }
617
618    #[test]
619    fn test_parse_bad_json() {
620        // missing "" around params
621        let reader = MessageReader::default();
622        let json =
623            reader.parse(r#"{"id": 5, "method": "hi", params: {"words": "plz"}}"#).err().unwrap();
624
625        match json {
626            ReadError::Json(..) => (),
627            _ => panic!("parse failed"),
628        }
629        // not an object
630        let json = reader.parse(r#"[5, "hi", {"arg": "val"}]"#).err().unwrap();
631
632        match json {
633            ReadError::NotObject => (),
634            _ => panic!("parse failed"),
635        }
636    }
637}