1use ruc::*;
2
3use std::collections::HashMap;
4use std::io;
5use std::net::{IpAddr, SocketAddr};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, mpsc};
9
10use crate::types::{
11 BlockInfo, BlockResultsInfo, CommitQcInfo, EpochInfo, EventAttributeInfo, EventInfo,
12 HeaderInfo, QueryResponseInfo, RpcRequest, RpcResponse, StatusInfo, TxInfo, TxResult,
13 ValidatorInfoResponse, VerifyHeaderResult,
14};
15use hotmint_consensus::application::Application;
16use hotmint_consensus::commit::decode_payload;
17use hotmint_consensus::store::BlockStore;
18use hotmint_mempool::Mempool;
19use hotmint_network::service::PeerStatus;
20use hotmint_types::{BlockHash, Height};
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::TcpListener;
23use tokio::sync::{Semaphore, watch};
24use tokio::time::{Duration, Instant, timeout};
25use tracing::{info, warn};
26
27const MAX_RPC_CONNECTIONS: usize = 256;
28const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
29const MAX_LINE_BYTES: usize = 1_048_576;
31pub(crate) const TX_RATE_LIMIT_PER_SEC: u32 = 100;
33const IP_LIMITER_PRUNE_INTERVAL: Duration = Duration::from_secs(60);
35
36#[derive(Debug, Clone, Copy)]
38pub struct ConsensusStatus {
39 pub current_view: u64,
40 pub last_committed_height: u64,
41 pub epoch_number: u64,
42 pub validator_count: usize,
43 pub epoch_start_view: u64,
44}
45
46impl ConsensusStatus {
47 pub fn new(
48 current_view: u64,
49 last_committed_height: u64,
50 epoch_number: u64,
51 validator_count: usize,
52 epoch_start_view: u64,
53 ) -> Self {
54 Self {
55 current_view,
56 last_committed_height,
57 epoch_number,
58 validator_count,
59 epoch_start_view,
60 }
61 }
62}
63
64pub struct RpcState {
66 pub validator_id: u64,
67 pub mempool: Arc<Mempool>,
68 pub status_rx: watch::Receiver<ConsensusStatus>,
69 pub store: Arc<parking_lot::RwLock<Box<dyn BlockStore>>>,
71 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
73 pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
75 pub app: Option<Arc<dyn Application>>,
77 pub tx_gossip: Option<mpsc::Sender<Vec<u8>>>,
79 pub chain_id_hash: [u8; 32],
81}
82
83pub struct RpcServer {
85 state: Arc<RpcState>,
86 listener: TcpListener,
87}
88
89impl RpcServer {
90 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
91 let listener = TcpListener::bind(addr)
92 .await
93 .c(d!("failed to bind RPC server"))?;
94 info!(addr = addr, "RPC server listening");
95 Ok(Self {
96 state: Arc::new(state),
97 listener,
98 })
99 }
100
101 pub fn local_addr(&self) -> SocketAddr {
102 self.listener.local_addr().expect("listener has local addr")
103 }
104
105 pub async fn run(self) {
106 let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
107 let ip_limiter = Arc::new(Mutex::new(PerIpRateLimiter::new()));
108 loop {
109 match self.listener.accept().await {
110 Ok((stream, addr)) => {
111 let permit = match semaphore.clone().try_acquire_owned() {
112 Ok(p) => p,
113 Err(_) => {
114 warn!("RPC connection limit reached, rejecting");
115 drop(stream);
116 continue;
117 }
118 };
119 let state = self.state.clone();
120 let ip_limiter = ip_limiter.clone();
121 let peer_ip = addr.ip();
122 tokio::spawn(async move {
123 let _permit = permit;
124 let (reader, mut writer) = stream.into_split();
125 let mut reader = BufReader::with_capacity(65_536, reader);
126 loop {
127 let line = match timeout(
128 RPC_READ_TIMEOUT,
129 read_line_limited(&mut reader, MAX_LINE_BYTES),
130 )
131 .await
132 {
133 Ok(Ok(Some(line))) => line,
134 Ok(Err(e)) => {
135 warn!(error = %e, "RPC read error (line too long?)");
136 break;
137 }
138 _ => break, };
140 let response =
141 handle_request(&state, &line, &ip_limiter, peer_ip).await;
142 let mut json = serde_json::to_string(&response).unwrap_or_default();
143 json.push('\n');
144 if writer.write_all(json.as_bytes()).await.is_err() {
145 break;
146 }
147 }
148 });
149 }
150 Err(e) => {
151 warn!(error = %e, "failed to accept connection");
152 }
153 }
154 }
155 }
156}
157
158pub struct TxRateLimiter {
160 tokens: u32,
161 max_tokens: u32,
162 last_refill: Instant,
163}
164
165impl TxRateLimiter {
166 pub(crate) fn new(rate_per_sec: u32) -> Self {
167 Self {
168 tokens: rate_per_sec,
169 max_tokens: rate_per_sec,
170 last_refill: Instant::now(),
171 }
172 }
173
174 pub(crate) fn allow(&mut self) -> bool {
175 let now = Instant::now();
176 let elapsed = now.duration_since(self.last_refill);
177 if elapsed >= Duration::from_secs(1) {
178 self.tokens = self.max_tokens;
179 self.last_refill = now;
180 }
181 if self.tokens > 0 {
182 self.tokens -= 1;
183 true
184 } else {
185 false
186 }
187 }
188}
189
190const MAX_IP_LIMITER_ENTRIES: usize = 100_000;
194
195pub struct PerIpRateLimiter {
198 buckets: HashMap<IpAddr, TxRateLimiter>,
199 last_prune: Instant,
200}
201
202impl Default for PerIpRateLimiter {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208impl PerIpRateLimiter {
209 pub fn new() -> Self {
210 Self {
211 buckets: HashMap::new(),
212 last_prune: Instant::now(),
213 }
214 }
215
216 pub fn allow(&mut self, ip: IpAddr) -> bool {
218 self.maybe_prune();
219 if !self.buckets.contains_key(&ip) && self.buckets.len() >= MAX_IP_LIMITER_ENTRIES {
221 return false;
222 }
223 let bucket = self
224 .buckets
225 .entry(ip)
226 .or_insert_with(|| TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC));
227 bucket.allow()
228 }
229
230 fn maybe_prune(&mut self) {
232 let now = Instant::now();
233 if now.duration_since(self.last_prune) < IP_LIMITER_PRUNE_INTERVAL {
234 return;
235 }
236 self.last_prune = now;
237 self.buckets
239 .retain(|_, v| now.duration_since(v.last_refill) < Duration::from_secs(30));
240 }
241}
242
243pub(crate) async fn handle_request(
244 state: &RpcState,
245 line: &str,
246 ip_limiter: &Mutex<PerIpRateLimiter>,
247 peer_ip: IpAddr,
248) -> RpcResponse {
249 let req: RpcRequest = match serde_json::from_str(line) {
250 Ok(r) => r,
251 Err(e) => {
252 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
253 }
254 };
255
256 match req.method.as_str() {
257 "status" => {
258 let s = *state.status_rx.borrow();
259 let info = StatusInfo {
260 validator_id: state.validator_id,
261 current_view: s.current_view,
262 last_committed_height: s.last_committed_height,
263 epoch: s.epoch_number,
264 validator_count: s.validator_count,
265 mempool_size: state.mempool.size().await,
266 };
267 json_ok(req.id, &info)
268 }
269
270 "submit_tx" => {
271 {
273 let mut limiter = ip_limiter.lock().await;
274 if !limiter.allow(peer_ip) {
275 return RpcResponse::err(
276 req.id,
277 -32000,
278 "rate limit exceeded for submit_tx".to_string(),
279 );
280 }
281 }
282 let Some(tx_hex) = req.params.as_str() else {
283 return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
284 };
285 if tx_hex.is_empty() {
286 return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
287 }
288 let tx_bytes = match hex_decode(tx_hex) {
289 Some(b) if !b.is_empty() => b,
290 _ => {
291 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
292 }
293 };
294 let (priority, gas_wanted) = if let Some(ref app) = state.app {
296 let result = app.validate_tx(&tx_bytes, None);
297 if !result.valid {
298 return RpcResponse::err(
299 req.id,
300 -32602,
301 "transaction validation failed".to_string(),
302 );
303 }
304 (result.priority, result.gas_wanted)
305 } else {
306 (0, 0)
307 };
308 let accepted = state
309 .mempool
310 .add_tx_with_gas(tx_bytes.clone(), priority, gas_wanted)
311 .await;
312 if accepted && let Some(ref gossip) = state.tx_gossip {
314 let _ = gossip.try_send(tx_bytes);
315 }
316 json_ok(req.id, &TxResult { accepted })
317 }
318
319 "get_block" => {
320 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
321 Some(h) => h,
322 None => {
323 return RpcResponse::err(
324 req.id,
325 -32602,
326 "missing or invalid 'height' parameter".to_string(),
327 );
328 }
329 };
330 let store = state.store.read();
331 match store.get_block_by_height(Height(height)) {
332 Some(block) => json_ok(req.id, &block_to_info(&block)),
333 None => RpcResponse::err(
334 req.id,
335 -32602,
336 format!("block at height {height} not found"),
337 ),
338 }
339 }
340
341 "get_block_by_hash" => {
342 let hash_hex = req.params.as_str().unwrap_or_default();
343 match hex_to_block_hash(hash_hex) {
344 Some(hash) => {
345 let store = state.store.read();
346 match store.get_block(&hash) {
347 Some(block) => json_ok(req.id, &block_to_info(&block)),
348 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
349 }
350 }
351 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
352 }
353 }
354
355 "get_validators" => {
356 let validators = state.validator_set_rx.borrow().clone();
357 json_ok(req.id, &validators)
358 }
359
360 "get_epoch" => {
361 let s = *state.status_rx.borrow();
362 let info = EpochInfo {
363 number: s.epoch_number,
364 start_view: s.epoch_start_view,
365 validator_count: s.validator_count,
366 };
367 json_ok(req.id, &info)
368 }
369
370 "get_peers" => {
371 let peers = state.peer_info_rx.borrow().clone();
372 json_ok(req.id, &peers)
373 }
374
375 "get_header" => {
376 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
377 Some(h) => h,
378 None => {
379 return RpcResponse::err(
380 req.id,
381 -32602,
382 "missing or invalid 'height' parameter".to_string(),
383 );
384 }
385 };
386 let store = state.store.read();
387 match store.get_block_by_height(Height(height)) {
388 Some(block) => {
389 let info = HeaderInfo {
390 height: block.height.as_u64(),
391 hash: hex_encode(&block.hash.0),
392 parent_hash: hex_encode(&block.parent_hash.0),
393 view: block.view.as_u64(),
394 proposer: block.proposer.0,
395 app_hash: hex_encode(&block.app_hash.0),
396 };
397 json_ok(req.id, &info)
398 }
399 None => RpcResponse::err(
400 req.id,
401 -32602,
402 format!("block at height {height} not found"),
403 ),
404 }
405 }
406
407 "get_commit_qc" => {
408 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
409 Some(h) => h,
410 None => {
411 return RpcResponse::err(
412 req.id,
413 -32602,
414 "missing or invalid 'height' parameter".to_string(),
415 );
416 }
417 };
418 let store = state.store.read();
419 match store.get_commit_qc(Height(height)) {
420 Some(qc) => {
421 let info = CommitQcInfo {
422 block_hash: hex_encode(&qc.block_hash.0),
423 view: qc.view.as_u64(),
424 signer_count: qc.aggregate_signature.count(),
425 epoch: qc.epoch.as_u64(),
426 };
427 json_ok(req.id, &info)
428 }
429 None => RpcResponse::err(
430 req.id,
431 -32602,
432 format!("commit QC at height {height} not found"),
433 ),
434 }
435 }
436
437 "get_tx" => {
438 let hash_hex = match req.params.as_str() {
439 Some(h) if !h.is_empty() => h,
440 _ => {
441 return RpcResponse::err(
442 req.id,
443 -32602,
444 "params must be a hex-encoded tx hash".to_string(),
445 );
446 }
447 };
448 let hash_bytes = match hex_decode(hash_hex) {
449 Some(b) if b.len() == 32 => {
450 let mut arr = [0u8; 32];
451 arr.copy_from_slice(&b);
452 arr
453 }
454 _ => {
455 return RpcResponse::err(
456 req.id,
457 -32602,
458 "invalid tx hash (expected 32-byte hex)".to_string(),
459 );
460 }
461 };
462 let store = state.store.read();
463 match store.get_tx_location(&hash_bytes) {
464 Some((height, index)) => match store.get_block_by_height(height) {
465 Some(block) => {
466 let txs = decode_payload(&block.payload);
467 match txs.get(index as usize) {
468 Some(tx_bytes) => {
469 let info = TxInfo {
470 tx_hash: hash_hex.to_string(),
471 height: height.as_u64(),
472 index,
473 data: hex_encode(tx_bytes),
474 };
475 json_ok(req.id, &info)
476 }
477 None => RpcResponse::err(
478 req.id,
479 -32602,
480 "tx index out of range in block".to_string(),
481 ),
482 }
483 }
484 None => RpcResponse::err(
485 req.id,
486 -32602,
487 format!("block at height {} not found", height.as_u64()),
488 ),
489 },
490 None => RpcResponse::err(req.id, -32602, "transaction not found".to_string()),
491 }
492 }
493
494 "get_block_results" => {
495 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
496 Some(h) => h,
497 None => {
498 return RpcResponse::err(
499 req.id,
500 -32602,
501 "missing or invalid 'height' parameter".to_string(),
502 );
503 }
504 };
505 let store = state.store.read();
506 match store.get_block_results(Height(height)) {
507 Some(results) => {
508 let tx_hashes = if let Some(block) = store.get_block_by_height(Height(height)) {
510 decode_payload(&block.payload)
511 .iter()
512 .map(|tx| hex_encode(blake3::hash(tx).as_bytes()))
513 .collect()
514 } else {
515 vec![]
516 };
517 let info = BlockResultsInfo {
518 height,
519 tx_hashes,
520 events: results
521 .events
522 .iter()
523 .map(|e| EventInfo {
524 r#type: e.r#type.clone(),
525 attributes: e
526 .attributes
527 .iter()
528 .map(|a| EventAttributeInfo {
529 key: a.key.clone(),
530 value: a.value.clone(),
531 })
532 .collect(),
533 })
534 .collect(),
535 app_hash: hex_encode(&results.app_hash.0),
536 };
537 json_ok(req.id, &info)
538 }
539 None => RpcResponse::err(
540 req.id,
541 -32602,
542 format!("block results at height {height} not found"),
543 ),
544 }
545 }
546
547 "query" => {
548 let path = match req.params.get("path").and_then(|v| v.as_str()) {
549 Some(p) => p,
550 None => {
551 return RpcResponse::err(
552 req.id,
553 -32602,
554 "missing 'path' parameter".to_string(),
555 );
556 }
557 };
558 let data_hex = req
559 .params
560 .get("data")
561 .and_then(|v| v.as_str())
562 .unwrap_or("");
563 let data = hex_decode(data_hex).unwrap_or_default();
564 match &state.app {
565 Some(app) => match app.query(path, &data) {
566 Ok(resp) => {
567 let info = QueryResponseInfo {
568 data: hex_encode(&resp.data),
569 proof: resp.proof.as_ref().map(|p| hex_encode(p)),
570 height: resp.height,
571 };
572 json_ok(req.id, &info)
573 }
574 Err(e) => RpcResponse::err(req.id, -32602, format!("query failed: {e}")),
575 },
576 None => RpcResponse::err(
577 req.id,
578 -32602,
579 "no application available for queries".to_string(),
580 ),
581 }
582 }
583
584 "verify_header" => {
585 let header_val = match req.params.get("header") {
588 Some(v) => v,
589 None => {
590 return RpcResponse::err(
591 req.id,
592 -32602,
593 "missing 'header' parameter".to_string(),
594 );
595 }
596 };
597 let qc_val = match req.params.get("qc") {
598 Some(v) => v,
599 None => {
600 return RpcResponse::err(req.id, -32602, "missing 'qc' parameter".to_string());
601 }
602 };
603 let header: hotmint_light::BlockHeader =
604 match serde_json::from_value(header_val.clone()) {
605 Ok(h) => h,
606 Err(e) => {
607 return RpcResponse::err(req.id, -32602, format!("invalid header: {e}"));
608 }
609 };
610 let qc: hotmint_types::QuorumCertificate = match serde_json::from_value(qc_val.clone())
611 {
612 Ok(q) => q,
613 Err(e) => {
614 return RpcResponse::err(req.id, -32602, format!("invalid qc: {e}"));
615 }
616 };
617 let validators_info = state.validator_set_rx.borrow().clone();
619 let validators: Vec<hotmint_types::ValidatorInfo> = validators_info
620 .iter()
621 .map(|v| hotmint_types::ValidatorInfo {
622 id: hotmint_types::ValidatorId(v.id),
623 public_key: {
624 let bytes = hex_decode(&v.public_key).unwrap_or_default();
625 let mut arr = [0u8; 32];
626 if bytes.len() == 32 {
627 arr.copy_from_slice(&bytes);
628 }
629 hotmint_types::PublicKey(arr.to_vec())
630 },
631 power: v.power,
632 })
633 .collect();
634 let vs = hotmint_types::ValidatorSet::new(validators);
635 let status = *state.status_rx.borrow();
636 let lc = hotmint_light::LightClient::new(
637 vs.clone(),
638 hotmint_types::Height(status.last_committed_height),
639 state.chain_id_hash,
640 );
641 let verifier = hotmint_crypto::Ed25519Verifier;
642 match lc.verify_header(&header, &qc, &verifier) {
643 Ok(()) => json_ok(
644 req.id,
645 &VerifyHeaderResult {
646 valid: true,
647 error: None,
648 },
649 ),
650 Err(e) => json_ok(
651 req.id,
652 &VerifyHeaderResult {
653 valid: false,
654 error: Some(e.to_string()),
655 },
656 ),
657 }
658 }
659
660 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
661 }
662}
663
664fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
665 match serde_json::to_value(val) {
666 Ok(v) => RpcResponse::ok(id, v),
667 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
668 }
669}
670
671fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
672 BlockInfo {
673 height: block.height.as_u64(),
674 hash: hex_encode(&block.hash.0),
675 parent_hash: hex_encode(&block.parent_hash.0),
676 view: block.view.as_u64(),
677 proposer: block.proposer.0,
678 payload_size: block.payload.len(),
679 }
680}
681
682fn hex_encode(bytes: &[u8]) -> String {
683 bytes.iter().map(|b| format!("{b:02x}")).collect()
684}
685
686fn hex_decode(s: &str) -> Option<Vec<u8>> {
687 if !s.len().is_multiple_of(2) {
688 return None;
689 }
690 (0..s.len())
691 .step_by(2)
692 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
693 .collect()
694}
695
696fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
697 let bytes = hex_decode(s)?;
698 if bytes.len() != 32 {
699 return None;
700 }
701 let mut arr = [0u8; 32];
702 arr.copy_from_slice(&bytes);
703 Some(BlockHash(arr))
704}
705
706async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
712 reader: &mut R,
713 max_bytes: usize,
714) -> io::Result<Option<String>> {
715 let mut buf = Vec::new();
716 loop {
717 let available = reader.fill_buf().await?;
718 if available.is_empty() {
719 return if buf.is_empty() {
720 Ok(None)
721 } else {
722 Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
723 };
724 }
725 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
726 buf.extend_from_slice(&available[..pos]);
727 reader.consume(pos + 1);
728 return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
729 }
730 let to_consume = available.len();
731 buf.extend_from_slice(available);
732 reader.consume(to_consume);
733 if buf.len() > max_bytes {
734 return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
735 }
736 }
737}