lapce_rpc/
stdio.rs

1use std::{
2    io::{self, BufRead, Write},
3    thread,
4};
5
6use anyhow::Result;
7use crossbeam_channel::{Receiver, Sender};
8use serde::{de::DeserializeOwned, Serialize};
9use serde_json::{json, Value};
10
11use crate::{RpcError, RpcMessage, RpcObject};
12
13pub fn stdio_transport<W, R, Req1, Notif1, Resp1, Req2, Notif2, Resp2>(
14    mut writer: W,
15    writer_receiver: Receiver<RpcMessage<Req2, Notif2, Resp2>>,
16    mut reader: R,
17    reader_sender: Sender<RpcMessage<Req1, Notif1, Resp1>>,
18) where
19    W: 'static + Write + Send,
20    R: 'static + BufRead + Send,
21    Req1: 'static + Serialize + DeserializeOwned + Send + Sync,
22    Notif1: 'static + Serialize + DeserializeOwned + Send + Sync,
23    Resp1: 'static + Serialize + DeserializeOwned + Send + Sync,
24    Req2: 'static + Serialize + DeserializeOwned + Send + Sync,
25    Notif2: 'static + Serialize + DeserializeOwned + Send + Sync,
26    Resp2: 'static + Serialize + DeserializeOwned + Send + Sync,
27{
28    thread::spawn(move || {
29        for value in writer_receiver {
30            if write_msg(&mut writer, value).is_err() {
31                return;
32            };
33        }
34    });
35    thread::spawn(move || -> Result<()> {
36        loop {
37            let msg = read_msg(&mut reader)?;
38            reader_sender.send(msg)?;
39        }
40    });
41}
42
43pub fn write_msg<W, Req, Notif, Resp>(
44    out: &mut W,
45    msg: RpcMessage<Req, Notif, Resp>,
46) -> io::Result<()>
47where
48    W: Write,
49    Req: Serialize,
50    Notif: Serialize,
51    Resp: Serialize,
52{
53    let value = match msg {
54        RpcMessage::Request(id, req) => {
55            let mut msg = serde_json::to_value(&req)?;
56            msg.as_object_mut()
57                .ok_or(io::ErrorKind::NotFound)?
58                .insert("id".into(), id.into());
59            msg
60        }
61        RpcMessage::Response(id, resp) => {
62            json!({
63                "id": id,
64                "result": resp,
65            })
66        }
67        RpcMessage::Notification(n) => serde_json::to_value(n)?,
68        RpcMessage::Error(id, err) => {
69            json!({
70                "id": id,
71                "error": err,
72            })
73        }
74    };
75    let msg = format!("{}\n", serde_json::to_string(&value)?);
76    out.write_all(msg.as_bytes())?;
77    out.flush()?;
78    Ok(())
79}
80
81pub fn read_msg<R, Req, Notif, Resp>(
82    inp: &mut R,
83) -> io::Result<RpcMessage<Req, Notif, Resp>>
84where
85    R: BufRead,
86    Req: DeserializeOwned,
87    Notif: DeserializeOwned,
88    Resp: DeserializeOwned,
89{
90    let mut buf = String::new();
91    let _s = inp.read_line(&mut buf)?;
92    let value: Value = serde_json::from_str(&buf)?;
93    let object = RpcObject(value);
94    let is_response = object.is_response();
95    let msg = if is_response {
96        let id = object.get_id().ok_or(io::ErrorKind::NotFound)?;
97        let resp = object
98            .into_response()
99            .map_err(|_| io::ErrorKind::NotFound)?;
100        match resp {
101            Ok(value) => {
102                let resp: Resp = serde_json::from_value(value)?;
103                RpcMessage::Response(id, resp)
104            }
105            Err(value) => {
106                let err: RpcError = serde_json::from_value(value)?;
107                RpcMessage::Error(id, err)
108            }
109        }
110    } else {
111        match object.get_id() {
112            Some(id) => {
113                let req: Req = serde_json::from_value(object.0)?;
114                RpcMessage::Request(id, req)
115            }
116            None => {
117                let notif: Notif = serde_json::from_value(object.0)?;
118                RpcMessage::Notification(notif)
119            }
120        }
121    };
122    Ok(msg)
123}