1use ruc::*;
2
3use std::collections::HashMap;
4use std::io;
5use std::net::{IpAddr, SocketAddr};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, mpsc};
9
10use crate::types::{
11 BlockInfo, BlockResultsInfo, CommitQcInfo, EpochInfo, EventAttributeInfo, EventInfo,
12 HeaderInfo, QueryResponseInfo, RpcRequest, RpcResponse, StatusInfo, TxInfo, TxResult,
13 ValidatorInfoResponse, VerifyHeaderResult,
14};
15use hotmint_consensus::application::Application;
16use hotmint_consensus::commit::decode_payload;
17use hotmint_consensus::store::BlockStore;
18use hotmint_mempool::Mempool;
19use hotmint_network::service::PeerStatus;
20use hotmint_types::{BlockHash, Height};
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::TcpListener;
23use tokio::sync::{Semaphore, watch};
24use tokio::time::{Duration, Instant, timeout};
25use tracing::{info, warn};
26
27const MAX_RPC_CONNECTIONS: usize = 256;
28const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
29const MAX_LINE_BYTES: usize = 1_048_576;
31pub(crate) const TX_RATE_LIMIT_PER_SEC: u32 = 100;
33const IP_LIMITER_PRUNE_INTERVAL: Duration = Duration::from_secs(60);
35
36#[derive(Debug, Clone, Copy)]
38pub struct ConsensusStatus {
39 pub current_view: u64,
40 pub last_committed_height: u64,
41 pub epoch_number: u64,
42 pub validator_count: usize,
43 pub epoch_start_view: u64,
44}
45
46impl ConsensusStatus {
47 pub fn new(
48 current_view: u64,
49 last_committed_height: u64,
50 epoch_number: u64,
51 validator_count: usize,
52 epoch_start_view: u64,
53 ) -> Self {
54 Self {
55 current_view,
56 last_committed_height,
57 epoch_number,
58 validator_count,
59 epoch_start_view,
60 }
61 }
62}
63
64pub struct RpcState {
66 pub validator_id: u64,
67 pub mempool: Arc<Mempool>,
68 pub status_rx: watch::Receiver<ConsensusStatus>,
69 pub store: Arc<parking_lot::RwLock<Box<dyn BlockStore>>>,
71 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
73 pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
75 pub app: Option<Arc<dyn Application>>,
77 pub tx_gossip: Option<mpsc::Sender<Vec<u8>>>,
79 pub chain_id_hash: [u8; 32],
81}
82
83pub struct RpcServer {
85 state: Arc<RpcState>,
86 listener: TcpListener,
87}
88
89impl RpcServer {
90 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
91 Self::bind_arc(addr, Arc::new(state)).await
92 }
93
94 pub async fn bind_arc(addr: &str, state: Arc<RpcState>) -> Result<Self> {
95 let listener = TcpListener::bind(addr)
96 .await
97 .c(d!("failed to bind RPC server"))?;
98 info!(addr = addr, "RPC server listening");
99 Ok(Self { state, listener })
100 }
101
102 pub fn local_addr(&self) -> SocketAddr {
103 self.listener.local_addr().expect("listener has local addr")
104 }
105
106 pub async fn run(self) {
107 let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
108 let ip_limiter = Arc::new(Mutex::new(PerIpRateLimiter::new()));
109 loop {
110 match self.listener.accept().await {
111 Ok((stream, addr)) => {
112 let permit = match semaphore.clone().try_acquire_owned() {
113 Ok(p) => p,
114 Err(_) => {
115 warn!("RPC connection limit reached, rejecting");
116 drop(stream);
117 continue;
118 }
119 };
120 let state = self.state.clone();
121 let ip_limiter = ip_limiter.clone();
122 let peer_ip = addr.ip();
123 tokio::spawn(async move {
124 let _permit = permit;
125 let (reader, mut writer) = stream.into_split();
126 let mut reader = BufReader::with_capacity(65_536, reader);
127 loop {
128 let line = match timeout(
129 RPC_READ_TIMEOUT,
130 read_line_limited(&mut reader, MAX_LINE_BYTES),
131 )
132 .await
133 {
134 Ok(Ok(Some(line))) => line,
135 Ok(Err(e)) => {
136 warn!(error = %e, "RPC read error (line too long?)");
137 break;
138 }
139 _ => break, };
141 let response =
142 handle_request(&state, &line, &ip_limiter, peer_ip).await;
143 let mut json = serde_json::to_string(&response).unwrap_or_default();
144 json.push('\n');
145 if writer.write_all(json.as_bytes()).await.is_err() {
146 break;
147 }
148 }
149 });
150 }
151 Err(e) => {
152 warn!(error = %e, "failed to accept connection");
153 }
154 }
155 }
156 }
157}
158
159pub struct TxRateLimiter {
161 tokens: u32,
162 max_tokens: u32,
163 last_refill: Instant,
164}
165
166impl TxRateLimiter {
167 pub(crate) fn new(rate_per_sec: u32) -> Self {
168 Self {
169 tokens: rate_per_sec,
170 max_tokens: rate_per_sec,
171 last_refill: Instant::now(),
172 }
173 }
174
175 pub(crate) fn allow(&mut self) -> bool {
176 let now = Instant::now();
177 let elapsed = now.duration_since(self.last_refill);
178 if elapsed >= Duration::from_secs(1) {
179 self.tokens = self.max_tokens;
180 self.last_refill = now;
181 }
182 if self.tokens > 0 {
183 self.tokens -= 1;
184 true
185 } else {
186 false
187 }
188 }
189}
190
191const MAX_IP_LIMITER_ENTRIES: usize = 100_000;
195
196pub struct PerIpRateLimiter {
199 buckets: HashMap<IpAddr, TxRateLimiter>,
200 last_prune: Instant,
201}
202
203impl Default for PerIpRateLimiter {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209impl PerIpRateLimiter {
210 pub fn new() -> Self {
211 Self {
212 buckets: HashMap::new(),
213 last_prune: Instant::now(),
214 }
215 }
216
217 pub fn allow(&mut self, ip: IpAddr) -> bool {
219 self.maybe_prune();
220 if !self.buckets.contains_key(&ip) && self.buckets.len() >= MAX_IP_LIMITER_ENTRIES {
222 return false;
223 }
224 let bucket = self
225 .buckets
226 .entry(ip)
227 .or_insert_with(|| TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC));
228 bucket.allow()
229 }
230
231 fn maybe_prune(&mut self) {
233 let now = Instant::now();
234 if now.duration_since(self.last_prune) < IP_LIMITER_PRUNE_INTERVAL {
235 return;
236 }
237 self.last_prune = now;
238 self.buckets
240 .retain(|_, v| now.duration_since(v.last_refill) < Duration::from_secs(30));
241 }
242}
243
244pub(crate) async fn handle_request(
245 state: &RpcState,
246 line: &str,
247 ip_limiter: &Mutex<PerIpRateLimiter>,
248 peer_ip: IpAddr,
249) -> RpcResponse {
250 let req: RpcRequest = match serde_json::from_str(line) {
251 Ok(r) => r,
252 Err(e) => {
253 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
254 }
255 };
256
257 match req.method.as_str() {
258 "status" => {
259 let s = *state.status_rx.borrow();
260 let info = StatusInfo {
261 validator_id: state.validator_id,
262 current_view: s.current_view,
263 last_committed_height: s.last_committed_height,
264 epoch: s.epoch_number,
265 validator_count: s.validator_count,
266 mempool_size: state.mempool.size().await,
267 };
268 json_ok(req.id, &info)
269 }
270
271 "submit_tx" => {
272 {
274 let mut limiter = ip_limiter.lock().await;
275 if !limiter.allow(peer_ip) {
276 return RpcResponse::err(
277 req.id,
278 -32000,
279 "rate limit exceeded for submit_tx".to_string(),
280 );
281 }
282 }
283 let Some(tx_hex) = req.params.as_str() else {
284 return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
285 };
286 if tx_hex.is_empty() {
287 return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
288 }
289 let tx_bytes = match hex_decode(tx_hex) {
290 Some(b) if !b.is_empty() => b,
291 _ => {
292 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
293 }
294 };
295 let (priority, gas_wanted) = if let Some(ref app) = state.app {
297 let result = app.validate_tx(&tx_bytes, None);
298 if !result.valid {
299 return RpcResponse::err(
300 req.id,
301 -32602,
302 "transaction validation failed".to_string(),
303 );
304 }
305 (result.priority, result.gas_wanted)
306 } else {
307 (0, 0)
308 };
309 let accepted = state
310 .mempool
311 .add_tx_with_gas(tx_bytes.clone(), priority, gas_wanted)
312 .await;
313 if accepted && let Some(ref gossip) = state.tx_gossip {
315 let _ = gossip.try_send(tx_bytes);
316 }
317 json_ok(req.id, &TxResult { accepted })
318 }
319
320 "get_block" => {
321 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
322 Some(h) => h,
323 None => {
324 return RpcResponse::err(
325 req.id,
326 -32602,
327 "missing or invalid 'height' parameter".to_string(),
328 );
329 }
330 };
331 let store = state.store.read();
332 match store.get_block_by_height(Height(height)) {
333 Some(block) => json_ok(req.id, &block_to_info(&block)),
334 None => RpcResponse::err(
335 req.id,
336 -32602,
337 format!("block at height {height} not found"),
338 ),
339 }
340 }
341
342 "get_block_by_hash" => {
343 let hash_hex = req.params.as_str().unwrap_or_default();
344 match hex_to_block_hash(hash_hex) {
345 Some(hash) => {
346 let store = state.store.read();
347 match store.get_block(&hash) {
348 Some(block) => json_ok(req.id, &block_to_info(&block)),
349 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
350 }
351 }
352 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
353 }
354 }
355
356 "get_validators" => {
357 let validators = state.validator_set_rx.borrow().clone();
358 json_ok(req.id, &validators)
359 }
360
361 "get_epoch" => {
362 let s = *state.status_rx.borrow();
363 let info = EpochInfo {
364 number: s.epoch_number,
365 start_view: s.epoch_start_view,
366 validator_count: s.validator_count,
367 };
368 json_ok(req.id, &info)
369 }
370
371 "get_peers" => {
372 let peers = state.peer_info_rx.borrow().clone();
373 json_ok(req.id, &peers)
374 }
375
376 "get_header" => {
377 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
378 Some(h) => h,
379 None => {
380 return RpcResponse::err(
381 req.id,
382 -32602,
383 "missing or invalid 'height' parameter".to_string(),
384 );
385 }
386 };
387 let store = state.store.read();
388 match store.get_block_by_height(Height(height)) {
389 Some(block) => {
390 let info = HeaderInfo {
391 height: block.height.as_u64(),
392 hash: hex_encode(&block.hash.0),
393 parent_hash: hex_encode(&block.parent_hash.0),
394 view: block.view.as_u64(),
395 proposer: block.proposer.0,
396 app_hash: hex_encode(&block.app_hash.0),
397 };
398 json_ok(req.id, &info)
399 }
400 None => RpcResponse::err(
401 req.id,
402 -32602,
403 format!("block at height {height} not found"),
404 ),
405 }
406 }
407
408 "get_commit_qc" => {
409 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
410 Some(h) => h,
411 None => {
412 return RpcResponse::err(
413 req.id,
414 -32602,
415 "missing or invalid 'height' parameter".to_string(),
416 );
417 }
418 };
419 let store = state.store.read();
420 match store.get_commit_qc(Height(height)) {
421 Some(qc) => {
422 let info = CommitQcInfo {
423 block_hash: hex_encode(&qc.block_hash.0),
424 view: qc.view.as_u64(),
425 signer_count: qc.aggregate_signature.count(),
426 epoch: qc.epoch.as_u64(),
427 };
428 json_ok(req.id, &info)
429 }
430 None => RpcResponse::err(
431 req.id,
432 -32602,
433 format!("commit QC at height {height} not found"),
434 ),
435 }
436 }
437
438 "get_tx" => {
439 let hash_hex = match req.params.as_str() {
440 Some(h) if !h.is_empty() => h,
441 _ => {
442 return RpcResponse::err(
443 req.id,
444 -32602,
445 "params must be a hex-encoded tx hash".to_string(),
446 );
447 }
448 };
449 let hash_bytes = match hex_decode(hash_hex) {
450 Some(b) if b.len() == 32 => {
451 let mut arr = [0u8; 32];
452 arr.copy_from_slice(&b);
453 arr
454 }
455 _ => {
456 return RpcResponse::err(
457 req.id,
458 -32602,
459 "invalid tx hash (expected 32-byte hex)".to_string(),
460 );
461 }
462 };
463 let store = state.store.read();
464 match store.get_tx_location(&hash_bytes) {
465 Some((height, index)) => match store.get_block_by_height(height) {
466 Some(block) => {
467 let txs = decode_payload(&block.payload);
468 match txs.get(index as usize) {
469 Some(tx_bytes) => {
470 let info = TxInfo {
471 tx_hash: hash_hex.to_string(),
472 height: height.as_u64(),
473 index,
474 data: hex_encode(tx_bytes),
475 };
476 json_ok(req.id, &info)
477 }
478 None => RpcResponse::err(
479 req.id,
480 -32602,
481 "tx index out of range in block".to_string(),
482 ),
483 }
484 }
485 None => RpcResponse::err(
486 req.id,
487 -32602,
488 format!("block at height {} not found", height.as_u64()),
489 ),
490 },
491 None => RpcResponse::err(req.id, -32602, "transaction not found".to_string()),
492 }
493 }
494
495 "get_block_results" => {
496 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
497 Some(h) => h,
498 None => {
499 return RpcResponse::err(
500 req.id,
501 -32602,
502 "missing or invalid 'height' parameter".to_string(),
503 );
504 }
505 };
506 let store = state.store.read();
507 match store.get_block_results(Height(height)) {
508 Some(results) => {
509 let tx_hashes = if let Some(block) = store.get_block_by_height(Height(height)) {
511 decode_payload(&block.payload)
512 .iter()
513 .map(|tx| hex_encode(blake3::hash(tx).as_bytes()))
514 .collect()
515 } else {
516 vec![]
517 };
518 let info = BlockResultsInfo {
519 height,
520 tx_hashes,
521 events: results
522 .events
523 .iter()
524 .map(|e| EventInfo {
525 r#type: e.r#type.clone(),
526 attributes: e
527 .attributes
528 .iter()
529 .map(|a| EventAttributeInfo {
530 key: a.key.clone(),
531 value: a.value.clone(),
532 })
533 .collect(),
534 })
535 .collect(),
536 app_hash: hex_encode(&results.app_hash.0),
537 };
538 json_ok(req.id, &info)
539 }
540 None => RpcResponse::err(
541 req.id,
542 -32602,
543 format!("block results at height {height} not found"),
544 ),
545 }
546 }
547
548 "query" => {
549 let path = match req.params.get("path").and_then(|v| v.as_str()) {
550 Some(p) => p,
551 None => {
552 return RpcResponse::err(
553 req.id,
554 -32602,
555 "missing 'path' parameter".to_string(),
556 );
557 }
558 };
559 let data_hex = req
560 .params
561 .get("data")
562 .and_then(|v| v.as_str())
563 .unwrap_or("");
564 let data = match hex_decode(data_hex) {
565 Some(d) => d,
566 None => {
567 return RpcResponse::err(
568 req.id,
569 -32602,
570 "invalid hex in 'data' parameter".to_string(),
571 );
572 }
573 };
574 match &state.app {
575 Some(app) => match app.query(path, &data) {
576 Ok(resp) => {
577 let info = QueryResponseInfo {
578 data: hex_encode(&resp.data),
579 proof: resp.proof.as_ref().map(|p| hex_encode(p)),
580 height: resp.height,
581 };
582 json_ok(req.id, &info)
583 }
584 Err(e) => RpcResponse::err(req.id, -32602, format!("query failed: {e}")),
585 },
586 None => RpcResponse::err(
587 req.id,
588 -32602,
589 "no application available for queries".to_string(),
590 ),
591 }
592 }
593
594 "verify_header" => {
595 let header_val = match req.params.get("header") {
598 Some(v) => v,
599 None => {
600 return RpcResponse::err(
601 req.id,
602 -32602,
603 "missing 'header' parameter".to_string(),
604 );
605 }
606 };
607 let qc_val = match req.params.get("qc") {
608 Some(v) => v,
609 None => {
610 return RpcResponse::err(req.id, -32602, "missing 'qc' parameter".to_string());
611 }
612 };
613 let header: hotmint_light::BlockHeader =
614 match serde_json::from_value(header_val.clone()) {
615 Ok(h) => h,
616 Err(e) => {
617 return RpcResponse::err(req.id, -32602, format!("invalid header: {e}"));
618 }
619 };
620 let qc: hotmint_types::QuorumCertificate = match serde_json::from_value(qc_val.clone())
621 {
622 Ok(q) => q,
623 Err(e) => {
624 return RpcResponse::err(req.id, -32602, format!("invalid qc: {e}"));
625 }
626 };
627 let validators_info = state.validator_set_rx.borrow().clone();
629 let validators: Vec<hotmint_types::ValidatorInfo> = validators_info
630 .iter()
631 .map(|v| hotmint_types::ValidatorInfo {
632 id: hotmint_types::ValidatorId(v.id),
633 public_key: {
634 let bytes = hex_decode(&v.public_key).unwrap_or_default();
635 let mut arr = [0u8; 32];
636 if bytes.len() == 32 {
637 arr.copy_from_slice(&bytes);
638 }
639 hotmint_types::PublicKey(arr.to_vec())
640 },
641 power: v.power,
642 })
643 .collect();
644 let vs = hotmint_types::ValidatorSet::new(validators);
645 let status = *state.status_rx.borrow();
646 let lc = hotmint_light::LightClient::new(
647 vs.clone(),
648 hotmint_types::Height(status.last_committed_height),
649 state.chain_id_hash,
650 );
651 let verifier = hotmint_crypto::Ed25519Verifier;
652 match lc.verify_header(&header, &qc, &verifier) {
653 Ok(()) => json_ok(
654 req.id,
655 &VerifyHeaderResult {
656 valid: true,
657 error: None,
658 },
659 ),
660 Err(e) => json_ok(
661 req.id,
662 &VerifyHeaderResult {
663 valid: false,
664 error: Some(e.to_string()),
665 },
666 ),
667 }
668 }
669
670 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
671 }
672}
673
674fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
675 match serde_json::to_value(val) {
676 Ok(v) => RpcResponse::ok(id, v),
677 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
678 }
679}
680
681fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
682 BlockInfo {
683 height: block.height.as_u64(),
684 hash: hex_encode(&block.hash.0),
685 parent_hash: hex_encode(&block.parent_hash.0),
686 view: block.view.as_u64(),
687 proposer: block.proposer.0,
688 payload_size: block.payload.len(),
689 }
690}
691
692fn hex_encode(bytes: &[u8]) -> String {
693 bytes.iter().map(|b| format!("{b:02x}")).collect()
694}
695
696fn hex_decode(s: &str) -> Option<Vec<u8>> {
697 if !s.len().is_multiple_of(2) {
698 return None;
699 }
700 (0..s.len())
701 .step_by(2)
702 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
703 .collect()
704}
705
706fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
707 let bytes = hex_decode(s)?;
708 if bytes.len() != 32 {
709 return None;
710 }
711 let mut arr = [0u8; 32];
712 arr.copy_from_slice(&bytes);
713 Some(BlockHash(arr))
714}
715
716async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
722 reader: &mut R,
723 max_bytes: usize,
724) -> io::Result<Option<String>> {
725 let mut buf = Vec::new();
726 loop {
727 let available = reader.fill_buf().await?;
728 if available.is_empty() {
729 return if buf.is_empty() {
730 Ok(None)
731 } else {
732 Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
733 };
734 }
735 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
736 buf.extend_from_slice(&available[..pos]);
737 reader.consume(pos + 1);
738 return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
739 }
740 let to_consume = available.len();
741 buf.extend_from_slice(available);
742 reader.consume(to_consume);
743 if buf.len() > max_bytes {
744 return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
745 }
746 }
747}