balius_runtime/drivers/
jsonrpc.rs1use serde::{Deserialize, Serialize};
14use serde_json::json;
15use std::net::SocketAddr;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error};
18use warp::Filter as _;
19
20use crate::{wit, Error, Runtime};
21
22#[derive(Serialize, Deserialize, Clone, Debug)]
23pub struct Config {
24 pub listen_address: String,
25}
26
27#[derive(Deserialize)]
28struct Request {
29 pub id: Option<String>,
30 pub method: String,
31 pub params: serde_json::Value,
32}
33
34#[derive(Serialize)]
35struct ErrorResponse {
36 error: String,
37}
38
39fn parse_request(body: serde_json::Value) -> Result<Request, ErrorResponse> {
40 match serde_json::from_value(body) {
41 Ok(x) => Ok(x),
42 Err(x) => Err(ErrorResponse {
43 error: x.to_string(),
44 }),
45 }
46}
47
48pub async fn handle_request(
49 runtime: Runtime,
50 worker: String,
51 body: serde_json::Value,
52) -> warp::reply::Json {
53 let request = match parse_request(body) {
54 Ok(x) => x,
55 Err(err) => return warp::reply::json(&err),
56 };
57
58 debug!(
59 worker,
60 id = request.id,
61 method = request.method,
62 "handling request"
63 );
64
65 let params = serde_json::to_vec(&request.params).unwrap();
66
67 let reply = runtime
68 .handle_request(&worker, &request.method, params)
69 .await;
70
71 match reply {
72 Ok(x) => {
73 debug!(worker, id = request.id, "request successful");
74
75 let x = match x {
76 wit::Response::Acknowledge => json!({}),
77 wit::Response::Json(x) => serde_json::from_slice(&x).unwrap(),
78 wit::Response::Cbor(x) => json!({ "cbor": hex::encode(x) }),
79 wit::Response::PartialTx(x) => json!({ "tx": hex::encode(x) }),
80 };
81
82 warp::reply::json(&x)
83 }
84 Err(err) => {
85 error!(worker, id = request.id, "request failed");
86 warp::reply::json(&ErrorResponse {
87 error: err.to_string(),
88 })
89 }
90 }
91}
92
93pub async fn serve(
94 config: Config,
95 runtime: Runtime,
96 cancel: CancellationToken,
97) -> Result<(), Error> {
98 let filter = warp::any()
99 .map(move || runtime.clone())
100 .and(warp::path::param())
101 .and(warp::post())
102 .and(warp::body::json())
103 .then(handle_request);
104
105 let address: SocketAddr = config
106 .listen_address
107 .parse()
108 .map_err(|x: std::net::AddrParseError| Error::Config(x.to_string()))?;
109
110 let (addr, server) =
111 warp::serve(filter).bind_with_graceful_shutdown(address, cancel.cancelled_owned());
112
113 tracing::info!(%addr, "Json-RPC server listening");
114
115 server.await;
116
117 Ok(())
118}