1use std::{
2 io::{self, BufRead, Write},
3 thread,
4};
5
6use anyhow::Result;
7use crossbeam_channel::{Receiver, Sender};
8use serde::{de::DeserializeOwned, Serialize};
9use serde_json::{json, Value};
10
11use crate::{RpcError, RpcMessage, RpcObject};
12
13pub fn stdio_transport<W, R, Req1, Notif1, Resp1, Req2, Notif2, Resp2>(
14 mut writer: W,
15 writer_receiver: Receiver<RpcMessage<Req2, Notif2, Resp2>>,
16 mut reader: R,
17 reader_sender: Sender<RpcMessage<Req1, Notif1, Resp1>>,
18) where
19 W: 'static + Write + Send,
20 R: 'static + BufRead + Send,
21 Req1: 'static + Serialize + DeserializeOwned + Send + Sync,
22 Notif1: 'static + Serialize + DeserializeOwned + Send + Sync,
23 Resp1: 'static + Serialize + DeserializeOwned + Send + Sync,
24 Req2: 'static + Serialize + DeserializeOwned + Send + Sync,
25 Notif2: 'static + Serialize + DeserializeOwned + Send + Sync,
26 Resp2: 'static + Serialize + DeserializeOwned + Send + Sync,
27{
28 thread::spawn(move || {
29 for value in writer_receiver {
30 if write_msg(&mut writer, value).is_err() {
31 return;
32 };
33 }
34 });
35 thread::spawn(move || -> Result<()> {
36 loop {
37 let msg = read_msg(&mut reader)?;
38 reader_sender.send(msg)?;
39 }
40 });
41}
42
43pub fn write_msg<W, Req, Notif, Resp>(
44 out: &mut W,
45 msg: RpcMessage<Req, Notif, Resp>,
46) -> io::Result<()>
47where
48 W: Write,
49 Req: Serialize,
50 Notif: Serialize,
51 Resp: Serialize,
52{
53 let value = match msg {
54 RpcMessage::Request(id, req) => {
55 let mut msg = serde_json::to_value(&req)?;
56 msg.as_object_mut()
57 .ok_or(io::ErrorKind::NotFound)?
58 .insert("id".into(), id.into());
59 msg
60 }
61 RpcMessage::Response(id, resp) => {
62 json!({
63 "id": id,
64 "result": resp,
65 })
66 }
67 RpcMessage::Notification(n) => serde_json::to_value(n)?,
68 RpcMessage::Error(id, err) => {
69 json!({
70 "id": id,
71 "error": err,
72 })
73 }
74 };
75 let msg = format!("{}\n", serde_json::to_string(&value)?);
76 out.write_all(msg.as_bytes())?;
77 out.flush()?;
78 Ok(())
79}
80
81pub fn read_msg<R, Req, Notif, Resp>(
82 inp: &mut R,
83) -> io::Result<RpcMessage<Req, Notif, Resp>>
84where
85 R: BufRead,
86 Req: DeserializeOwned,
87 Notif: DeserializeOwned,
88 Resp: DeserializeOwned,
89{
90 let mut buf = String::new();
91 let _s = inp.read_line(&mut buf)?;
92 let value: Value = serde_json::from_str(&buf)?;
93 let object = RpcObject(value);
94 let is_response = object.is_response();
95 let msg = if is_response {
96 let id = object.get_id().ok_or(io::ErrorKind::NotFound)?;
97 let resp = object
98 .into_response()
99 .map_err(|_| io::ErrorKind::NotFound)?;
100 match resp {
101 Ok(value) => {
102 let resp: Resp = serde_json::from_value(value)?;
103 RpcMessage::Response(id, resp)
104 }
105 Err(value) => {
106 let err: RpcError = serde_json::from_value(value)?;
107 RpcMessage::Error(id, err)
108 }
109 }
110 } else {
111 match object.get_id() {
112 Some(id) => {
113 let req: Req = serde_json::from_value(object.0)?;
114 RpcMessage::Request(id, req)
115 }
116 None => {
117 let notif: Notif = serde_json::from_value(object.0)?;
118 RpcMessage::Notification(notif)
119 }
120 }
121 };
122 Ok(msg)
123}