balius_runtime/drivers/
jsonrpc.rs

1//! Driver to serve JSON-RPC requests.
2//!
3//! This driver implements an HTTP server that listens for JSON-RPC requests
4//! and funnels them into the Runtime. The path of the request is used as the
5//! key to identify the worker that should handle the request. The JSON-RPC
6//! method field is used as the key to identify the particular Balius request
7//! for the worker. JSON-RPC params are mapped directly into Balius request
8//! params.
9//!
10//! The JSON-RPC server is implemented as a Warp application and adheres to
11//! the JSON-RPC 2.0 spec.
12
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use std::net::SocketAddr;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error};
18use warp::Filter as _;
19
20use crate::{wit, Error, Runtime};
21
22#[derive(Serialize, Deserialize, Clone, Debug)]
23pub struct Config {
24    pub listen_address: String,
25}
26
27#[derive(Deserialize)]
28struct Request {
29    pub id: Option<String>,
30    pub method: String,
31    pub params: serde_json::Value,
32}
33
34#[derive(Serialize)]
35struct ErrorResponse {
36    error: String,
37}
38
39fn parse_request(body: serde_json::Value) -> Result<Request, ErrorResponse> {
40    match serde_json::from_value(body) {
41        Ok(x) => Ok(x),
42        Err(x) => Err(ErrorResponse {
43            error: x.to_string(),
44        }),
45    }
46}
47
48pub async fn handle_request(
49    runtime: Runtime,
50    worker: String,
51    body: serde_json::Value,
52) -> warp::reply::Json {
53    let request = match parse_request(body) {
54        Ok(x) => x,
55        Err(err) => return warp::reply::json(&err),
56    };
57
58    debug!(
59        worker,
60        id = request.id,
61        method = request.method,
62        "handling request"
63    );
64
65    let params = serde_json::to_vec(&request.params).unwrap();
66
67    let reply = runtime
68        .handle_request(&worker, &request.method, params)
69        .await;
70
71    match reply {
72        Ok(x) => {
73            debug!(worker, id = request.id, "request successful");
74
75            let x = match x {
76                wit::Response::Acknowledge => json!({}),
77                wit::Response::Json(x) => serde_json::from_slice(&x).unwrap(),
78                wit::Response::Cbor(x) => json!({ "cbor": hex::encode(x) }),
79                wit::Response::PartialTx(x) => json!({ "tx": hex::encode(x) }),
80            };
81
82            warp::reply::json(&x)
83        }
84        Err(err) => {
85            error!(worker, id = request.id, "request failed");
86            warp::reply::json(&ErrorResponse {
87                error: err.to_string(),
88            })
89        }
90    }
91}
92
93pub async fn serve(
94    config: Config,
95    runtime: Runtime,
96    cancel: CancellationToken,
97) -> Result<(), Error> {
98    let filter = warp::any()
99        .map(move || runtime.clone())
100        .and(warp::path::param())
101        .and(warp::post())
102        .and(warp::body::json())
103        .then(handle_request);
104
105    let address: SocketAddr = config
106        .listen_address
107        .parse()
108        .map_err(|x: std::net::AddrParseError| Error::Config(x.to_string()))?;
109
110    let (addr, server) =
111        warp::serve(filter).bind_with_graceful_shutdown(address, cancel.cancelled_owned());
112
113    tracing::info!(%addr, "Json-RPC server listening");
114
115    server.await;
116
117    Ok(())
118}