psup_json_rpc/
lib.rs

1//! Adapter for the process supervisor (psup) to serve JSON-RPC over a split socket.
2#![deny(missing_docs)]
3use futures::stream::StreamExt;
4use json_rpc2::{futures::Server, Request, Response};
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
8use tokio_util::codec::{FramedRead, LinesCodec};
9
10/// Generic error type.
11type Error = Box<dyn std::error::Error + Send + Sync>;
12
13/// Result type.
14type Result<T> = std::result::Result<T, Error>;
15
16/// Encodes whether a packet is a request or
17/// a response so that we can do bi-directional
18/// communication over the same socket.
19#[derive(Debug, Serialize, Deserialize)]
20pub enum Message {
21    /// RPC request message.
22    #[serde(rename = "request")]
23    Request(Request),
24    /// RPC response message.
25    #[serde(rename = "response")]
26    Response(Response),
27}
28
29/// Worker identity payload.
30#[derive(Debug, Serialize, Deserialize)]
31pub struct Identity {
32    /// Worker identifier.
33    pub id: String,
34}
35
36/// Prepare a JSON RPC method call wrapped as a [Message](crate::Message).
37pub fn call(method: &str, params: Option<Value>) -> Message {
38    Message::Request(Request::new_reply(method, params))
39}
40
41/// Prepare a JSON RPC notification wrapped as a [Message](crate::Message).
42pub fn notify(method: &str, params: Option<Value>) -> Message {
43    Message::Request(Request::new_notification(method, params))
44}
45
46/// Write a message to the writer as a JSON encoded line.
47pub async fn write<W>(
48    writer: &mut W,
49    msg: &Message,
50) -> Result<()> 
51    where W: AsyncWrite + Unpin {
52    writer
53        .write(
54            serde_json::to_vec(msg)
55                .map_err(Box::new)?
56                .as_slice(),
57        )
58        .await?;
59    writer.write(b"\n").await?;
60    writer.flush().await?;
61    Ok(())
62}
63
64/// Read and write line-delimited JSON from a stream executing
65/// via a JSON RPC server.
66///
67/// Request and response functions are useful for logging service 
68/// calls; the answer function can be used to handle replies from 
69/// a remote method call.
70pub async fn serve<S, R, W, I, O, A>(
71    server: Server<'_, S>,
72    state: &S,
73    reader: ReadHalf<R>,
74    mut writer: WriteHalf<W>,
75    request: I,
76    response: O,
77    answer: A,
78) -> Result<()>
79where
80    R: AsyncRead,
81    W: AsyncWrite,
82    S: Send + Sync,
83    I: Fn(&Request),
84    O: Fn(&Response),
85    A: Fn(Response) -> Result<Option<Message>>,
86{
87    let mut lines = FramedRead::new(reader, LinesCodec::new());
88    while let Some(line) = lines.next().await {
89        let line = line.map_err(Box::new)?;
90        match serde_json::from_str::<Message>(&line).map_err(Box::new)? {
91            Message::Request(mut req) => {
92                (request)(&req);
93                let res = server.serve(&mut req, state).await;
94                if let Some(res) = res {
95                    (response)(&res);
96                    let msg = Message::Response(res);
97                    write(&mut writer, &msg).await?;
98                }
99            }
100            Message::Response(reply) => {
101                let msg = (answer)(reply)?;
102                if let Some(msg) = msg {
103                    write(&mut writer, &msg).await?; 
104                }
105            }
106        }
107    }
108    Ok(())
109}