1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::types::{
6 BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
7};
8use hotmint_consensus::application::Application;
9use hotmint_consensus::store::BlockStore;
10use hotmint_mempool::Mempool;
11use hotmint_network::service::PeerStatus;
12use hotmint_types::{BlockHash, Height};
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::net::TcpListener;
15use tokio::sync::{Semaphore, watch};
16use tokio::time::{Duration, timeout};
17use tracing::{info, warn};
18
19const MAX_RPC_CONNECTIONS: usize = 256;
20const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
21const MAX_LINE_BYTES: usize = 1_048_576;
23
24pub type StatusTuple = (u64, u64, u64, usize, u64);
26
27pub struct RpcState {
29 pub validator_id: u64,
30 pub mempool: Arc<Mempool>,
31 pub status_rx: watch::Receiver<StatusTuple>,
32 pub store: Arc<RwLock<Box<dyn BlockStore>>>,
34 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
36 pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
38 pub app: Option<Arc<dyn Application>>,
40}
41
42pub struct RpcServer {
44 state: Arc<RpcState>,
45 listener: TcpListener,
46}
47
48impl RpcServer {
49 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
50 let listener = TcpListener::bind(addr)
51 .await
52 .c(d!("failed to bind RPC server"))?;
53 info!(addr = addr, "RPC server listening");
54 Ok(Self {
55 state: Arc::new(state),
56 listener,
57 })
58 }
59
60 pub fn local_addr(&self) -> std::net::SocketAddr {
61 self.listener.local_addr().expect("listener has local addr")
62 }
63
64 pub async fn run(self) {
65 let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
66 loop {
67 match self.listener.accept().await {
68 Ok((stream, _addr)) => {
69 let permit = match semaphore.clone().try_acquire_owned() {
70 Ok(p) => p,
71 Err(_) => {
72 warn!("RPC connection limit reached, rejecting");
73 drop(stream);
74 continue;
75 }
76 };
77 let state = self.state.clone();
78 tokio::spawn(async move {
79 let _permit = permit;
80 let (reader, mut writer) = stream.into_split();
81 let mut reader = BufReader::with_capacity(65_536, reader);
82 loop {
83 let line = match timeout(
84 RPC_READ_TIMEOUT,
85 read_line_limited(&mut reader, MAX_LINE_BYTES),
86 )
87 .await
88 {
89 Ok(Ok(Some(line))) => line,
90 Ok(Err(e)) => {
91 warn!(error = %e, "RPC read error (line too long?)");
92 break;
93 }
94 _ => break, };
96 let response = handle_request(&state, &line).await;
97 let mut json = serde_json::to_string(&response).unwrap_or_default();
98 json.push('\n');
99 if writer.write_all(json.as_bytes()).await.is_err() {
100 break;
101 }
102 }
103 });
104 }
105 Err(e) => {
106 warn!(error = %e, "failed to accept connection");
107 }
108 }
109 }
110 }
111}
112
113async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
114 let req: RpcRequest = match serde_json::from_str(line) {
115 Ok(r) => r,
116 Err(e) => {
117 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
118 }
119 };
120
121 match req.method.as_str() {
122 "status" => {
123 let (view, height, epoch, validator_count, _) = *state.status_rx.borrow();
124 let info = StatusInfo {
125 validator_id: state.validator_id,
126 current_view: view,
127 last_committed_height: height,
128 epoch,
129 validator_count,
130 mempool_size: state.mempool.size().await,
131 };
132 json_ok(req.id, &info)
133 }
134
135 "submit_tx" => {
136 let Some(tx_hex) = req.params.as_str() else {
137 return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
138 };
139 if tx_hex.is_empty() {
140 return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
141 }
142 let tx_bytes = match hex_decode(tx_hex) {
143 Some(b) if !b.is_empty() => b,
144 _ => {
145 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
146 }
147 };
148 if let Some(ref app) = state.app
150 && !app.validate_tx(&tx_bytes, None)
151 {
152 return RpcResponse::err(
153 req.id,
154 -32602,
155 "transaction validation failed".to_string(),
156 );
157 }
158 let accepted = state.mempool.add_tx(tx_bytes).await;
159 json_ok(req.id, &TxResult { accepted })
160 }
161
162 "get_block" => {
163 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
164 Some(h) => h,
165 None => {
166 return RpcResponse::err(
167 req.id,
168 -32602,
169 "missing or invalid 'height' parameter".to_string(),
170 );
171 }
172 };
173 let store = state.store.read().unwrap();
174 match store.get_block_by_height(Height(height)) {
175 Some(block) => json_ok(req.id, &block_to_info(&block)),
176 None => RpcResponse::err(
177 req.id,
178 -32602,
179 format!("block at height {height} not found"),
180 ),
181 }
182 }
183
184 "get_block_by_hash" => {
185 let hash_hex = req.params.as_str().unwrap_or_default();
186 match hex_to_block_hash(hash_hex) {
187 Some(hash) => {
188 let store = state.store.read().unwrap();
189 match store.get_block(&hash) {
190 Some(block) => json_ok(req.id, &block_to_info(&block)),
191 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
192 }
193 }
194 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
195 }
196 }
197
198 "get_validators" => {
199 let validators = state.validator_set_rx.borrow().clone();
200 json_ok(req.id, &validators)
201 }
202
203 "get_epoch" => {
204 let (_, _, epoch, validator_count, start_view) = *state.status_rx.borrow();
205 let info = EpochInfo {
206 number: epoch,
207 start_view,
208 validator_count,
209 };
210 json_ok(req.id, &info)
211 }
212
213 "get_peers" => {
214 let peers = state.peer_info_rx.borrow().clone();
215 json_ok(req.id, &peers)
216 }
217
218 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
219 }
220}
221
222fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
223 match serde_json::to_value(val) {
224 Ok(v) => RpcResponse::ok(id, v),
225 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
226 }
227}
228
229fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
230 BlockInfo {
231 height: block.height.as_u64(),
232 hash: hex_encode(&block.hash.0),
233 parent_hash: hex_encode(&block.parent_hash.0),
234 view: block.view.as_u64(),
235 proposer: block.proposer.0,
236 payload_size: block.payload.len(),
237 }
238}
239
240fn hex_encode(bytes: &[u8]) -> String {
241 bytes.iter().map(|b| format!("{b:02x}")).collect()
242}
243
244fn hex_decode(s: &str) -> Option<Vec<u8>> {
245 if !s.len().is_multiple_of(2) {
246 return None;
247 }
248 (0..s.len())
249 .step_by(2)
250 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
251 .collect()
252}
253
254fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
255 let bytes = hex_decode(s)?;
256 if bytes.len() != 32 {
257 return None;
258 }
259 let mut arr = [0u8; 32];
260 arr.copy_from_slice(&bytes);
261 Some(BlockHash(arr))
262}
263
264async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
270 reader: &mut R,
271 max_bytes: usize,
272) -> std::io::Result<Option<String>> {
273 let mut buf = Vec::new();
274 loop {
275 let available = reader.fill_buf().await?;
276 if available.is_empty() {
277 return if buf.is_empty() {
278 Ok(None)
279 } else {
280 Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
281 };
282 }
283 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
284 buf.extend_from_slice(&available[..pos]);
285 reader.consume(pos + 1);
286 return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
287 }
288 let to_consume = available.len();
289 buf.extend_from_slice(available);
290 reader.consume(to_consume);
291 if buf.len() > max_bytes {
292 return Err(std::io::Error::new(
293 std::io::ErrorKind::InvalidData,
294 "line too long",
295 ));
296 }
297 }
298}