1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::{convert::Infallible, time::Duration};
use futures::future;
use http::{Response, StatusCode};
use hyper::{
server::{conn::AddrIncoming, Builder},
Body,
};
use serde::Serialize;
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;
use tracing::{info, trace};
use warp::{Filter, Rejection};
use super::{
rpcs::{self, RpcWithOptionalParamsExt, RpcWithParamsExt, RpcWithoutParamsExt, RPC_API_PATH},
ReactorEventT,
};
use crate::effect::EffectBuilder;
fn new_error_response(error: warp_json_rpc::Error) -> Response<Body> {
#[derive(Serialize)]
struct JsonRpcErrorResponse {
jsonrpc: String,
id: Option<()>,
error: warp_json_rpc::Error,
}
let json_response = JsonRpcErrorResponse {
jsonrpc: "2.0".to_string(),
id: None,
error,
};
let body = Body::from(serde_json::to_vec(&json_response).unwrap());
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(body)
.unwrap()
}
pub(super) async fn run<REv: ReactorEventT>(
builder: Builder<AddrIncoming>,
effect_builder: EffectBuilder<REv>,
qps_limit: u64,
) {
let rpc_put_deploy = rpcs::account::PutDeploy::create_filter(effect_builder);
let rpc_get_block = rpcs::chain::GetBlock::create_filter(effect_builder);
let rpc_get_block_transfers = rpcs::chain::GetBlockTransfers::create_filter(effect_builder);
let rpc_get_state_root_hash = rpcs::chain::GetStateRootHash::create_filter(effect_builder);
let rpc_get_item = rpcs::state::GetItem::create_filter(effect_builder);
let rpc_get_balance = rpcs::state::GetBalance::create_filter(effect_builder);
let rpc_get_deploy = rpcs::info::GetDeploy::create_filter(effect_builder);
let rpc_get_peers = rpcs::info::GetPeers::create_filter(effect_builder);
let rpc_get_status = rpcs::info::GetStatus::create_filter(effect_builder);
let rpc_get_era_info = rpcs::chain::GetEraInfoBySwitchBlock::create_filter(effect_builder);
let rpc_get_auction_info = rpcs::state::GetAuctionInfo::create_filter(effect_builder);
let rpc_get_rpcs = rpcs::docs::ListRpcs::create_filter(effect_builder);
let unknown_method = warp::path(RPC_API_PATH)
.and(warp_json_rpc::filters::json_rpc())
.and_then(move |response_builder: warp_json_rpc::Builder| async move {
response_builder
.error(warp_json_rpc::Error::METHOD_NOT_FOUND)
.map_err(|_| warp::reject())
});
let parse_failure = warp::path(RPC_API_PATH).and_then(move || async move {
let error_response = new_error_response(warp_json_rpc::Error::PARSE_ERROR);
Ok::<_, Rejection>(error_response)
});
let service = warp_json_rpc::service(
rpc_put_deploy
.or(rpc_get_block)
.or(rpc_get_block_transfers)
.or(rpc_get_state_root_hash)
.or(rpc_get_item)
.or(rpc_get_balance)
.or(rpc_get_deploy)
.or(rpc_get_peers)
.or(rpc_get_status)
.or(rpc_get_era_info)
.or(rpc_get_auction_info)
.or(rpc_get_rpcs)
.or(unknown_method)
.or(parse_failure),
);
let make_svc =
hyper::service::make_service_fn(move |_| future::ok::<_, Infallible>(service.clone()));
let make_svc = ServiceBuilder::new()
.rate_limit(qps_limit, Duration::from_secs(1))
.service(make_svc);
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let server = builder.serve(make_svc);
info!(address = %server.local_addr(), "started JSON-RPC server");
let server_with_shutdown = server.with_graceful_shutdown(async {
shutdown_receiver.await.ok();
});
let server_joiner = tokio::spawn(server_with_shutdown);
let _ = server_joiner.await;
let _ = shutdown_sender.send(());
trace!("JSON-RPC server stopped");
}