1use ruc::*;
2
3use std::io;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use tokio::sync::RwLock;
8
9use crate::types::{
10 BlockInfo, CommitQcInfo, EpochInfo, HeaderInfo, RpcRequest, RpcResponse, StatusInfo, TxResult,
11 ValidatorInfoResponse,
12};
13use hotmint_consensus::application::Application;
14use hotmint_consensus::store::BlockStore;
15use hotmint_mempool::Mempool;
16use hotmint_network::service::PeerStatus;
17use hotmint_types::{BlockHash, Height};
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::net::TcpListener;
20use tokio::sync::{Semaphore, watch};
21use tokio::time::{Duration, timeout};
22use tracing::{info, warn};
23
24const MAX_RPC_CONNECTIONS: usize = 256;
25const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
26const MAX_LINE_BYTES: usize = 1_048_576;
28pub(crate) const TX_RATE_LIMIT_PER_SEC: u32 = 100;
30
31#[derive(Debug, Clone, Copy)]
33pub struct ConsensusStatus {
34 pub current_view: u64,
35 pub last_committed_height: u64,
36 pub epoch_number: u64,
37 pub validator_count: usize,
38 pub epoch_start_view: u64,
39}
40
41impl ConsensusStatus {
42 pub fn new(
43 current_view: u64,
44 last_committed_height: u64,
45 epoch_number: u64,
46 validator_count: usize,
47 epoch_start_view: u64,
48 ) -> Self {
49 Self {
50 current_view,
51 last_committed_height,
52 epoch_number,
53 validator_count,
54 epoch_start_view,
55 }
56 }
57}
58
59pub struct RpcState {
61 pub validator_id: u64,
62 pub mempool: Arc<Mempool>,
63 pub status_rx: watch::Receiver<ConsensusStatus>,
64 pub store: Arc<RwLock<Box<dyn BlockStore>>>,
66 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
68 pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
70 pub app: Option<Arc<dyn Application>>,
72}
73
74pub struct RpcServer {
76 state: Arc<RpcState>,
77 listener: TcpListener,
78}
79
80impl RpcServer {
81 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
82 let listener = TcpListener::bind(addr)
83 .await
84 .c(d!("failed to bind RPC server"))?;
85 info!(addr = addr, "RPC server listening");
86 Ok(Self {
87 state: Arc::new(state),
88 listener,
89 })
90 }
91
92 pub fn local_addr(&self) -> SocketAddr {
93 self.listener.local_addr().expect("listener has local addr")
94 }
95
96 pub async fn run(self) {
97 let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
98 loop {
99 match self.listener.accept().await {
100 Ok((stream, _addr)) => {
101 let permit = match semaphore.clone().try_acquire_owned() {
102 Ok(p) => p,
103 Err(_) => {
104 warn!("RPC connection limit reached, rejecting");
105 drop(stream);
106 continue;
107 }
108 };
109 let state = self.state.clone();
110 tokio::spawn(async move {
111 let _permit = permit;
112 let (reader, mut writer) = stream.into_split();
113 let mut reader = BufReader::with_capacity(65_536, reader);
114 let mut tx_limiter = TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC);
115 loop {
116 let line = match timeout(
117 RPC_READ_TIMEOUT,
118 read_line_limited(&mut reader, MAX_LINE_BYTES),
119 )
120 .await
121 {
122 Ok(Ok(Some(line))) => line,
123 Ok(Err(e)) => {
124 warn!(error = %e, "RPC read error (line too long?)");
125 break;
126 }
127 _ => break, };
129 let response = handle_request(&state, &line, &mut tx_limiter).await;
130 let mut json = serde_json::to_string(&response).unwrap_or_default();
131 json.push('\n');
132 if writer.write_all(json.as_bytes()).await.is_err() {
133 break;
134 }
135 }
136 });
137 }
138 Err(e) => {
139 warn!(error = %e, "failed to accept connection");
140 }
141 }
142 }
143 }
144}
145
146pub struct TxRateLimiter {
148 tokens: u32,
149 max_tokens: u32,
150 last_refill: tokio::time::Instant,
151}
152
153impl TxRateLimiter {
154 pub(crate) fn new(rate_per_sec: u32) -> Self {
155 Self {
156 tokens: rate_per_sec,
157 max_tokens: rate_per_sec,
158 last_refill: tokio::time::Instant::now(),
159 }
160 }
161
162 pub(crate) fn allow(&mut self) -> bool {
163 let now = tokio::time::Instant::now();
164 let elapsed = now.duration_since(self.last_refill);
165 if elapsed >= Duration::from_secs(1) {
166 self.tokens = self.max_tokens;
167 self.last_refill = now;
168 }
169 if self.tokens > 0 {
170 self.tokens -= 1;
171 true
172 } else {
173 false
174 }
175 }
176}
177
178pub(crate) async fn handle_request(
179 state: &RpcState,
180 line: &str,
181 tx_limiter: &mut TxRateLimiter,
182) -> RpcResponse {
183 let req: RpcRequest = match serde_json::from_str(line) {
184 Ok(r) => r,
185 Err(e) => {
186 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
187 }
188 };
189
190 match req.method.as_str() {
191 "status" => {
192 let s = *state.status_rx.borrow();
193 let info = StatusInfo {
194 validator_id: state.validator_id,
195 current_view: s.current_view,
196 last_committed_height: s.last_committed_height,
197 epoch: s.epoch_number,
198 validator_count: s.validator_count,
199 mempool_size: state.mempool.size().await,
200 };
201 json_ok(req.id, &info)
202 }
203
204 "submit_tx" => {
205 if !tx_limiter.allow() {
207 return RpcResponse::err(
208 req.id,
209 -32000,
210 "rate limit exceeded for submit_tx".to_string(),
211 );
212 }
213 let Some(tx_hex) = req.params.as_str() else {
214 return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
215 };
216 if tx_hex.is_empty() {
217 return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
218 }
219 let tx_bytes = match hex_decode(tx_hex) {
220 Some(b) if !b.is_empty() => b,
221 _ => {
222 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
223 }
224 };
225 let (priority, gas_wanted) = if let Some(ref app) = state.app {
227 let result = app.validate_tx(&tx_bytes, None);
228 if !result.valid {
229 return RpcResponse::err(
230 req.id,
231 -32602,
232 "transaction validation failed".to_string(),
233 );
234 }
235 (result.priority, result.gas_wanted)
236 } else {
237 (0, 0)
238 };
239 let accepted = state
240 .mempool
241 .add_tx_with_gas(tx_bytes, priority, gas_wanted)
242 .await;
243 json_ok(req.id, &TxResult { accepted })
244 }
245
246 "get_block" => {
247 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
248 Some(h) => h,
249 None => {
250 return RpcResponse::err(
251 req.id,
252 -32602,
253 "missing or invalid 'height' parameter".to_string(),
254 );
255 }
256 };
257 let store = state.store.read().await;
258 match store.get_block_by_height(Height(height)) {
259 Some(block) => json_ok(req.id, &block_to_info(&block)),
260 None => RpcResponse::err(
261 req.id,
262 -32602,
263 format!("block at height {height} not found"),
264 ),
265 }
266 }
267
268 "get_block_by_hash" => {
269 let hash_hex = req.params.as_str().unwrap_or_default();
270 match hex_to_block_hash(hash_hex) {
271 Some(hash) => {
272 let store = state.store.read().await;
273 match store.get_block(&hash) {
274 Some(block) => json_ok(req.id, &block_to_info(&block)),
275 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
276 }
277 }
278 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
279 }
280 }
281
282 "get_validators" => {
283 let validators = state.validator_set_rx.borrow().clone();
284 json_ok(req.id, &validators)
285 }
286
287 "get_epoch" => {
288 let s = *state.status_rx.borrow();
289 let info = EpochInfo {
290 number: s.epoch_number,
291 start_view: s.epoch_start_view,
292 validator_count: s.validator_count,
293 };
294 json_ok(req.id, &info)
295 }
296
297 "get_peers" => {
298 let peers = state.peer_info_rx.borrow().clone();
299 json_ok(req.id, &peers)
300 }
301
302 "get_header" => {
303 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
304 Some(h) => h,
305 None => {
306 return RpcResponse::err(
307 req.id,
308 -32602,
309 "missing or invalid 'height' parameter".to_string(),
310 );
311 }
312 };
313 let store = state.store.read().await;
314 match store.get_block_by_height(Height(height)) {
315 Some(block) => {
316 let info = HeaderInfo {
317 height: block.height.as_u64(),
318 hash: hex_encode(&block.hash.0),
319 parent_hash: hex_encode(&block.parent_hash.0),
320 view: block.view.as_u64(),
321 proposer: block.proposer.0,
322 app_hash: hex_encode(&block.app_hash.0),
323 };
324 json_ok(req.id, &info)
325 }
326 None => RpcResponse::err(
327 req.id,
328 -32602,
329 format!("block at height {height} not found"),
330 ),
331 }
332 }
333
334 "get_commit_qc" => {
335 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
336 Some(h) => h,
337 None => {
338 return RpcResponse::err(
339 req.id,
340 -32602,
341 "missing or invalid 'height' parameter".to_string(),
342 );
343 }
344 };
345 let store = state.store.read().await;
346 match store.get_commit_qc(Height(height)) {
347 Some(qc) => {
348 let info = CommitQcInfo {
349 block_hash: hex_encode(&qc.block_hash.0),
350 view: qc.view.as_u64(),
351 signer_count: qc.aggregate_signature.count(),
352 epoch: qc.epoch.as_u64(),
353 };
354 json_ok(req.id, &info)
355 }
356 None => RpcResponse::err(
357 req.id,
358 -32602,
359 format!("commit QC at height {height} not found"),
360 ),
361 }
362 }
363
364 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
365 }
366}
367
368fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
369 match serde_json::to_value(val) {
370 Ok(v) => RpcResponse::ok(id, v),
371 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
372 }
373}
374
375fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
376 BlockInfo {
377 height: block.height.as_u64(),
378 hash: hex_encode(&block.hash.0),
379 parent_hash: hex_encode(&block.parent_hash.0),
380 view: block.view.as_u64(),
381 proposer: block.proposer.0,
382 payload_size: block.payload.len(),
383 }
384}
385
386fn hex_encode(bytes: &[u8]) -> String {
387 bytes.iter().map(|b| format!("{b:02x}")).collect()
388}
389
390fn hex_decode(s: &str) -> Option<Vec<u8>> {
391 if !s.len().is_multiple_of(2) {
392 return None;
393 }
394 (0..s.len())
395 .step_by(2)
396 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
397 .collect()
398}
399
400fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
401 let bytes = hex_decode(s)?;
402 if bytes.len() != 32 {
403 return None;
404 }
405 let mut arr = [0u8; 32];
406 arr.copy_from_slice(&bytes);
407 Some(BlockHash(arr))
408}
409
410async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
416 reader: &mut R,
417 max_bytes: usize,
418) -> io::Result<Option<String>> {
419 let mut buf = Vec::new();
420 loop {
421 let available = reader.fill_buf().await?;
422 if available.is_empty() {
423 return if buf.is_empty() {
424 Ok(None)
425 } else {
426 Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
427 };
428 }
429 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
430 buf.extend_from_slice(&available[..pos]);
431 reader.consume(pos + 1);
432 return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
433 }
434 let to_consume = available.len();
435 buf.extend_from_slice(available);
436 reader.consume(to_consume);
437 if buf.len() > max_bytes {
438 return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
439 }
440 }
441}