1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::types::{
6 BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
7};
8use hotmint_consensus::store::BlockStore;
9use hotmint_mempool::Mempool;
10use hotmint_network::service::PeerStatus;
11use hotmint_types::{BlockHash, Height};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
13use tokio::net::TcpListener;
14use tokio::sync::watch;
15use tracing::{info, warn};
16
17pub struct RpcState {
19 pub validator_id: u64,
20 pub mempool: Arc<Mempool>,
21 pub status_rx: watch::Receiver<(u64, u64, u64, usize)>,
23 pub store: Arc<RwLock<Box<dyn BlockStore>>>,
25 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
27}
28
29pub struct RpcServer {
31 state: Arc<RpcState>,
32 listener: TcpListener,
33}
34
35impl RpcServer {
36 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
37 let listener = TcpListener::bind(addr)
38 .await
39 .c(d!("failed to bind RPC server"))?;
40 info!(addr = addr, "RPC server listening");
41 Ok(Self {
42 state: Arc::new(state),
43 listener,
44 })
45 }
46
47 pub fn local_addr(&self) -> std::net::SocketAddr {
48 self.listener.local_addr().expect("listener has local addr")
49 }
50
51 pub async fn run(self) {
52 loop {
53 match self.listener.accept().await {
54 Ok((stream, _addr)) => {
55 let state = self.state.clone();
56 tokio::spawn(async move {
57 let (reader, mut writer) = stream.into_split();
58 let mut lines = BufReader::new(reader).lines();
59 while let Ok(Some(line)) = lines.next_line().await {
60 let response = handle_request(&state, &line).await;
61 let mut json = serde_json::to_string(&response).unwrap_or_default();
62 json.push('\n');
63 if writer.write_all(json.as_bytes()).await.is_err() {
64 break;
65 }
66 }
67 });
68 }
69 Err(e) => {
70 warn!(error = %e, "failed to accept connection");
71 }
72 }
73 }
74 }
75}
76
77async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
78 let req: RpcRequest = match serde_json::from_str(line) {
79 Ok(r) => r,
80 Err(e) => {
81 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
82 }
83 };
84
85 match req.method.as_str() {
86 "status" => {
87 let (view, height, epoch, validator_count) = *state.status_rx.borrow();
88 let info = StatusInfo {
89 validator_id: state.validator_id,
90 current_view: view,
91 last_committed_height: height,
92 epoch,
93 validator_count,
94 mempool_size: state.mempool.size().await,
95 };
96 json_ok(req.id, &info)
97 }
98
99 "submit_tx" => {
100 let tx_hex = req.params.as_str().unwrap_or_default();
101 let tx_bytes = match hex_decode(tx_hex) {
102 Some(b) => b,
103 None => {
104 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
105 }
106 };
107 let accepted = state.mempool.add_tx(tx_bytes).await;
108 json_ok(req.id, &TxResult { accepted })
109 }
110
111 "get_block" => {
112 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
113 Some(h) => h,
114 None => {
115 return RpcResponse::err(
116 req.id,
117 -32602,
118 "missing or invalid 'height' parameter".to_string(),
119 );
120 }
121 };
122 let store = state.store.read().unwrap();
123 match store.get_block_by_height(Height(height)) {
124 Some(block) => json_ok(req.id, &block_to_info(&block)),
125 None => RpcResponse::err(
126 req.id,
127 -32602,
128 format!("block at height {height} not found"),
129 ),
130 }
131 }
132
133 "get_block_by_hash" => {
134 let hash_hex = req.params.as_str().unwrap_or_default();
135 match hex_to_block_hash(hash_hex) {
136 Some(hash) => {
137 let store = state.store.read().unwrap();
138 match store.get_block(&hash) {
139 Some(block) => json_ok(req.id, &block_to_info(&block)),
140 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
141 }
142 }
143 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
144 }
145 }
146
147 "get_validators" => {
148 let (_, _, _, _) = *state.status_rx.borrow();
149 let store = state.store.read().unwrap();
152 let tip = store.tip_height();
154 drop(store);
155
156 let peers = state.peer_info_rx.borrow().clone();
160 let validators: Vec<ValidatorInfoResponse> = peers
161 .iter()
162 .map(|p| ValidatorInfoResponse {
163 id: p.validator_id.0,
164 power: 0, public_key: String::new(),
166 })
167 .collect();
168
169 let result = if validators.is_empty() {
171 vec![ValidatorInfoResponse {
172 id: state.validator_id,
173 power: 0,
174 public_key: String::new(),
175 }]
176 } else {
177 validators
178 };
179 let _ = tip; json_ok(req.id, &result)
181 }
182
183 "get_epoch" => {
184 let (_, _, epoch, validator_count) = *state.status_rx.borrow();
185 let info = EpochInfo {
186 number: epoch,
187 start_view: 0, validator_count,
189 };
190 json_ok(req.id, &info)
191 }
192
193 "get_peers" => {
194 let peers = state.peer_info_rx.borrow().clone();
195 json_ok(req.id, &peers)
196 }
197
198 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
199 }
200}
201
202fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
203 match serde_json::to_value(val) {
204 Ok(v) => RpcResponse::ok(id, v),
205 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
206 }
207}
208
209fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
210 BlockInfo {
211 height: block.height.as_u64(),
212 hash: hex_encode(&block.hash.0),
213 parent_hash: hex_encode(&block.parent_hash.0),
214 view: block.view.as_u64(),
215 proposer: block.proposer.0,
216 payload_size: block.payload.len(),
217 }
218}
219
220fn hex_encode(bytes: &[u8]) -> String {
221 bytes.iter().map(|b| format!("{b:02x}")).collect()
222}
223
224fn hex_decode(s: &str) -> Option<Vec<u8>> {
225 if !s.len().is_multiple_of(2) {
226 return None;
227 }
228 (0..s.len())
229 .step_by(2)
230 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
231 .collect()
232}
233
234fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
235 let bytes = hex_decode(s)?;
236 if bytes.len() != 32 {
237 return None;
238 }
239 let mut arr = [0u8; 32];
240 arr.copy_from_slice(&bytes);
241 Some(BlockHash(arr))
242}