Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::collections::HashMap;
4use std::io;
5use std::net::{IpAddr, SocketAddr};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, mpsc};
9
10use crate::types::{
11    BlockInfo, BlockResultsInfo, CommitQcInfo, EpochInfo, EventAttributeInfo, EventInfo,
12    HeaderInfo, QueryResponseInfo, RpcRequest, RpcResponse, StatusInfo, TxInfo, TxResult,
13    ValidatorInfoResponse, VerifyHeaderResult,
14};
15use hotmint_consensus::application::Application;
16use hotmint_consensus::commit::decode_payload;
17use hotmint_consensus::store::BlockStore;
18use hotmint_mempool::Mempool;
19use hotmint_network::service::PeerStatus;
20use hotmint_types::{BlockHash, Height};
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::TcpListener;
23use tokio::sync::{Semaphore, watch};
24use tokio::time::{Duration, Instant, timeout};
25use tracing::{info, warn};
26
27const MAX_RPC_CONNECTIONS: usize = 256;
28const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
29/// Maximum bytes per RPC line. Prevents OOM from clients sending huge data without newlines.
30const MAX_LINE_BYTES: usize = 1_048_576;
31/// Maximum submit_tx calls per second per IP address.
32pub(crate) const TX_RATE_LIMIT_PER_SEC: u32 = 100;
33/// How often to prune stale per-IP rate limiter entries.
34const IP_LIMITER_PRUNE_INTERVAL: Duration = Duration::from_secs(60);
35
36/// Named consensus status shared via watch channel.
37#[derive(Debug, Clone, Copy)]
38pub struct ConsensusStatus {
39    pub current_view: u64,
40    pub last_committed_height: u64,
41    pub epoch_number: u64,
42    pub validator_count: usize,
43    pub epoch_start_view: u64,
44}
45
46impl ConsensusStatus {
47    pub fn new(
48        current_view: u64,
49        last_committed_height: u64,
50        epoch_number: u64,
51        validator_count: usize,
52        epoch_start_view: u64,
53    ) -> Self {
54        Self {
55            current_view,
56            last_committed_height,
57            epoch_number,
58            validator_count,
59            epoch_start_view,
60        }
61    }
62}
63
64/// Shared state accessible by the RPC server
65pub struct RpcState {
66    pub validator_id: u64,
67    pub mempool: Arc<Mempool>,
68    pub status_rx: watch::Receiver<ConsensusStatus>,
69    /// Shared block store for block queries
70    pub store: Arc<parking_lot::RwLock<Box<dyn BlockStore>>>,
71    /// Peer info channel
72    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
73    /// Live validator set for get_validators
74    pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
75    /// Application reference for tx validation (optional for backward compatibility).
76    pub app: Option<Arc<dyn Application>>,
77    /// Optional sender to gossip accepted transactions to peers.
78    pub tx_gossip: Option<mpsc::Sender<Vec<u8>>>,
79    /// Chain ID hash for light client verification.
80    pub chain_id_hash: [u8; 32],
81}
82
83/// Simple JSON-RPC server over TCP (one JSON object per line)
84pub struct RpcServer {
85    state: Arc<RpcState>,
86    listener: TcpListener,
87}
88
89impl RpcServer {
90    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
91        Self::bind_arc(addr, Arc::new(state)).await
92    }
93
94    pub async fn bind_arc(addr: &str, state: Arc<RpcState>) -> Result<Self> {
95        let listener = TcpListener::bind(addr)
96            .await
97            .c(d!("failed to bind RPC server"))?;
98        info!(addr = addr, "RPC server listening");
99        Ok(Self { state, listener })
100    }
101
102    pub fn local_addr(&self) -> SocketAddr {
103        self.listener.local_addr().expect("listener has local addr")
104    }
105
106    pub async fn run(self) {
107        let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
108        let ip_limiter = Arc::new(Mutex::new(PerIpRateLimiter::new()));
109        loop {
110            match self.listener.accept().await {
111                Ok((stream, addr)) => {
112                    let permit = match semaphore.clone().try_acquire_owned() {
113                        Ok(p) => p,
114                        Err(_) => {
115                            warn!("RPC connection limit reached, rejecting");
116                            drop(stream);
117                            continue;
118                        }
119                    };
120                    let state = self.state.clone();
121                    let ip_limiter = ip_limiter.clone();
122                    let peer_ip = addr.ip();
123                    tokio::spawn(async move {
124                        let _permit = permit;
125                        let (reader, mut writer) = stream.into_split();
126                        let mut reader = BufReader::with_capacity(65_536, reader);
127                        loop {
128                            let line = match timeout(
129                                RPC_READ_TIMEOUT,
130                                read_line_limited(&mut reader, MAX_LINE_BYTES),
131                            )
132                            .await
133                            {
134                                Ok(Ok(Some(line))) => line,
135                                Ok(Err(e)) => {
136                                    warn!(error = %e, "RPC read error (line too long?)");
137                                    break;
138                                }
139                                _ => break, // EOF or timeout
140                            };
141                            let response =
142                                handle_request(&state, &line, &ip_limiter, peer_ip).await;
143                            let mut json = serde_json::to_string(&response).unwrap_or_default();
144                            json.push('\n');
145                            if writer.write_all(json.as_bytes()).await.is_err() {
146                                break;
147                            }
148                        }
149                    });
150                }
151                Err(e) => {
152                    warn!(error = %e, "failed to accept connection");
153                }
154            }
155        }
156    }
157}
158
159/// Token-bucket rate limiter for submit_tx.
160pub struct TxRateLimiter {
161    tokens: u32,
162    max_tokens: u32,
163    last_refill: Instant,
164}
165
166impl TxRateLimiter {
167    pub(crate) fn new(rate_per_sec: u32) -> Self {
168        Self {
169            tokens: rate_per_sec,
170            max_tokens: rate_per_sec,
171            last_refill: Instant::now(),
172        }
173    }
174
175    pub(crate) fn allow(&mut self) -> bool {
176        let now = Instant::now();
177        let elapsed = now.duration_since(self.last_refill);
178        if elapsed >= Duration::from_secs(1) {
179            self.tokens = self.max_tokens;
180            self.last_refill = now;
181        }
182        if self.tokens > 0 {
183            self.tokens -= 1;
184            true
185        } else {
186            false
187        }
188    }
189}
190
191/// Per-IP rate limiter that tracks token buckets per source IP.
192///
193/// Maximum number of tracked IPs in the per-IP rate limiter.
194const MAX_IP_LIMITER_ENTRIES: usize = 100_000;
195
196/// Prevents a single IP from monopolising `submit_tx` even when opening
197/// many TCP connections or HTTP requests.
198pub struct PerIpRateLimiter {
199    buckets: HashMap<IpAddr, TxRateLimiter>,
200    last_prune: Instant,
201}
202
203impl Default for PerIpRateLimiter {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209impl PerIpRateLimiter {
210    pub fn new() -> Self {
211        Self {
212            buckets: HashMap::new(),
213            last_prune: Instant::now(),
214        }
215    }
216
217    /// Check whether `ip` is allowed to submit a transaction.
218    pub fn allow(&mut self, ip: IpAddr) -> bool {
219        self.maybe_prune();
220        // Cap the number of tracked IPs to prevent memory exhaustion.
221        if !self.buckets.contains_key(&ip) && self.buckets.len() >= MAX_IP_LIMITER_ENTRIES {
222            return false;
223        }
224        let bucket = self
225            .buckets
226            .entry(ip)
227            .or_insert_with(|| TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC));
228        bucket.allow()
229    }
230
231    /// Remove entries that have not been touched for a while to avoid unbounded growth.
232    fn maybe_prune(&mut self) {
233        let now = Instant::now();
234        if now.duration_since(self.last_prune) < IP_LIMITER_PRUNE_INTERVAL {
235            return;
236        }
237        self.last_prune = now;
238        // Remove buckets that are fully refilled (idle for ≥1 s)
239        self.buckets
240            .retain(|_, v| now.duration_since(v.last_refill) < Duration::from_secs(30));
241    }
242}
243
244pub(crate) async fn handle_request(
245    state: &RpcState,
246    line: &str,
247    ip_limiter: &Mutex<PerIpRateLimiter>,
248    peer_ip: IpAddr,
249) -> RpcResponse {
250    let req: RpcRequest = match serde_json::from_str(line) {
251        Ok(r) => r,
252        Err(e) => {
253            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
254        }
255    };
256
257    match req.method.as_str() {
258        "status" => {
259            let s = *state.status_rx.borrow();
260            let info = StatusInfo {
261                validator_id: state.validator_id,
262                current_view: s.current_view,
263                last_committed_height: s.last_committed_height,
264                epoch: s.epoch_number,
265                validator_count: s.validator_count,
266                mempool_size: state.mempool.size().await,
267            };
268            json_ok(req.id, &info)
269        }
270
271        "submit_tx" => {
272            // Per-IP rate limiting (C-2: prevents bypass via multiple connections)
273            {
274                let mut limiter = ip_limiter.lock().await;
275                if !limiter.allow(peer_ip) {
276                    return RpcResponse::err(
277                        req.id,
278                        -32000,
279                        "rate limit exceeded for submit_tx".to_string(),
280                    );
281                }
282            }
283            let Some(tx_hex) = req.params.as_str() else {
284                return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
285            };
286            if tx_hex.is_empty() {
287                return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
288            }
289            let tx_bytes = match hex_decode(tx_hex) {
290                Some(b) if !b.is_empty() => b,
291                _ => {
292                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
293                }
294            };
295            // Validate via Application if available
296            let (priority, gas_wanted) = if let Some(ref app) = state.app {
297                let result = app.validate_tx(&tx_bytes, None);
298                if !result.valid {
299                    return RpcResponse::err(
300                        req.id,
301                        -32602,
302                        "transaction validation failed".to_string(),
303                    );
304                }
305                (result.priority, result.gas_wanted)
306            } else {
307                (0, 0)
308            };
309            let accepted = state
310                .mempool
311                .add_tx_with_gas(tx_bytes.clone(), priority, gas_wanted)
312                .await;
313            // Gossip accepted transactions to peers.
314            if accepted && let Some(ref gossip) = state.tx_gossip {
315                let _ = gossip.try_send(tx_bytes);
316            }
317            json_ok(req.id, &TxResult { accepted })
318        }
319
320        "get_block" => {
321            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
322                Some(h) => h,
323                None => {
324                    return RpcResponse::err(
325                        req.id,
326                        -32602,
327                        "missing or invalid 'height' parameter".to_string(),
328                    );
329                }
330            };
331            let store = state.store.read();
332            match store.get_block_by_height(Height(height)) {
333                Some(block) => json_ok(req.id, &block_to_info(&block)),
334                None => RpcResponse::err(
335                    req.id,
336                    -32602,
337                    format!("block at height {height} not found"),
338                ),
339            }
340        }
341
342        "get_block_by_hash" => {
343            let hash_hex = req.params.as_str().unwrap_or_default();
344            match hex_to_block_hash(hash_hex) {
345                Some(hash) => {
346                    let store = state.store.read();
347                    match store.get_block(&hash) {
348                        Some(block) => json_ok(req.id, &block_to_info(&block)),
349                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
350                    }
351                }
352                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
353            }
354        }
355
356        "get_validators" => {
357            let validators = state.validator_set_rx.borrow().clone();
358            json_ok(req.id, &validators)
359        }
360
361        "get_epoch" => {
362            let s = *state.status_rx.borrow();
363            let info = EpochInfo {
364                number: s.epoch_number,
365                start_view: s.epoch_start_view,
366                validator_count: s.validator_count,
367            };
368            json_ok(req.id, &info)
369        }
370
371        "get_peers" => {
372            let peers = state.peer_info_rx.borrow().clone();
373            json_ok(req.id, &peers)
374        }
375
376        "get_header" => {
377            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
378                Some(h) => h,
379                None => {
380                    return RpcResponse::err(
381                        req.id,
382                        -32602,
383                        "missing or invalid 'height' parameter".to_string(),
384                    );
385                }
386            };
387            let store = state.store.read();
388            match store.get_block_by_height(Height(height)) {
389                Some(block) => {
390                    let info = HeaderInfo {
391                        height: block.height.as_u64(),
392                        hash: hex_encode(&block.hash.0),
393                        parent_hash: hex_encode(&block.parent_hash.0),
394                        view: block.view.as_u64(),
395                        proposer: block.proposer.0,
396                        app_hash: hex_encode(&block.app_hash.0),
397                    };
398                    json_ok(req.id, &info)
399                }
400                None => RpcResponse::err(
401                    req.id,
402                    -32602,
403                    format!("block at height {height} not found"),
404                ),
405            }
406        }
407
408        "get_commit_qc" => {
409            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
410                Some(h) => h,
411                None => {
412                    return RpcResponse::err(
413                        req.id,
414                        -32602,
415                        "missing or invalid 'height' parameter".to_string(),
416                    );
417                }
418            };
419            let store = state.store.read();
420            match store.get_commit_qc(Height(height)) {
421                Some(qc) => {
422                    let info = CommitQcInfo {
423                        block_hash: hex_encode(&qc.block_hash.0),
424                        view: qc.view.as_u64(),
425                        signer_count: qc.aggregate_signature.count(),
426                        epoch: qc.epoch.as_u64(),
427                    };
428                    json_ok(req.id, &info)
429                }
430                None => RpcResponse::err(
431                    req.id,
432                    -32602,
433                    format!("commit QC at height {height} not found"),
434                ),
435            }
436        }
437
438        "get_tx" => {
439            let hash_hex = match req.params.as_str() {
440                Some(h) if !h.is_empty() => h,
441                _ => {
442                    return RpcResponse::err(
443                        req.id,
444                        -32602,
445                        "params must be a hex-encoded tx hash".to_string(),
446                    );
447                }
448            };
449            let hash_bytes = match hex_decode(hash_hex) {
450                Some(b) if b.len() == 32 => {
451                    let mut arr = [0u8; 32];
452                    arr.copy_from_slice(&b);
453                    arr
454                }
455                _ => {
456                    return RpcResponse::err(
457                        req.id,
458                        -32602,
459                        "invalid tx hash (expected 32-byte hex)".to_string(),
460                    );
461                }
462            };
463            let store = state.store.read();
464            match store.get_tx_location(&hash_bytes) {
465                Some((height, index)) => match store.get_block_by_height(height) {
466                    Some(block) => {
467                        let txs = decode_payload(&block.payload);
468                        match txs.get(index as usize) {
469                            Some(tx_bytes) => {
470                                let info = TxInfo {
471                                    tx_hash: hash_hex.to_string(),
472                                    height: height.as_u64(),
473                                    index,
474                                    data: hex_encode(tx_bytes),
475                                };
476                                json_ok(req.id, &info)
477                            }
478                            None => RpcResponse::err(
479                                req.id,
480                                -32602,
481                                "tx index out of range in block".to_string(),
482                            ),
483                        }
484                    }
485                    None => RpcResponse::err(
486                        req.id,
487                        -32602,
488                        format!("block at height {} not found", height.as_u64()),
489                    ),
490                },
491                None => RpcResponse::err(req.id, -32602, "transaction not found".to_string()),
492            }
493        }
494
495        "get_block_results" => {
496            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
497                Some(h) => h,
498                None => {
499                    return RpcResponse::err(
500                        req.id,
501                        -32602,
502                        "missing or invalid 'height' parameter".to_string(),
503                    );
504                }
505            };
506            let store = state.store.read();
507            match store.get_block_results(Height(height)) {
508                Some(results) => {
509                    // Also compute tx hashes from the block payload.
510                    let tx_hashes = if let Some(block) = store.get_block_by_height(Height(height)) {
511                        decode_payload(&block.payload)
512                            .iter()
513                            .map(|tx| hex_encode(blake3::hash(tx).as_bytes()))
514                            .collect()
515                    } else {
516                        vec![]
517                    };
518                    let info = BlockResultsInfo {
519                        height,
520                        tx_hashes,
521                        events: results
522                            .events
523                            .iter()
524                            .map(|e| EventInfo {
525                                r#type: e.r#type.clone(),
526                                attributes: e
527                                    .attributes
528                                    .iter()
529                                    .map(|a| EventAttributeInfo {
530                                        key: a.key.clone(),
531                                        value: a.value.clone(),
532                                    })
533                                    .collect(),
534                            })
535                            .collect(),
536                        app_hash: hex_encode(&results.app_hash.0),
537                    };
538                    json_ok(req.id, &info)
539                }
540                None => RpcResponse::err(
541                    req.id,
542                    -32602,
543                    format!("block results at height {height} not found"),
544                ),
545            }
546        }
547
548        "query" => {
549            let path = match req.params.get("path").and_then(|v| v.as_str()) {
550                Some(p) => p,
551                None => {
552                    return RpcResponse::err(
553                        req.id,
554                        -32602,
555                        "missing 'path' parameter".to_string(),
556                    );
557                }
558            };
559            let data_hex = req
560                .params
561                .get("data")
562                .and_then(|v| v.as_str())
563                .unwrap_or("");
564            let data = match hex_decode(data_hex) {
565                Some(d) => d,
566                None => {
567                    return RpcResponse::err(
568                        req.id,
569                        -32602,
570                        "invalid hex in 'data' parameter".to_string(),
571                    );
572                }
573            };
574            match &state.app {
575                Some(app) => match app.query(path, &data) {
576                    Ok(resp) => {
577                        let info = QueryResponseInfo {
578                            data: hex_encode(&resp.data),
579                            proof: resp.proof.as_ref().map(|p| hex_encode(p)),
580                            height: resp.height,
581                        };
582                        json_ok(req.id, &info)
583                    }
584                    Err(e) => RpcResponse::err(req.id, -32602, format!("query failed: {e}")),
585                },
586                None => RpcResponse::err(
587                    req.id,
588                    -32602,
589                    "no application available for queries".to_string(),
590                ),
591            }
592        }
593
594        "verify_header" => {
595            // Accepts { header: BlockHeader JSON, qc: CommitQC JSON }.
596            // Uses the light client to verify the header.
597            let header_val = match req.params.get("header") {
598                Some(v) => v,
599                None => {
600                    return RpcResponse::err(
601                        req.id,
602                        -32602,
603                        "missing 'header' parameter".to_string(),
604                    );
605                }
606            };
607            let qc_val = match req.params.get("qc") {
608                Some(v) => v,
609                None => {
610                    return RpcResponse::err(req.id, -32602, "missing 'qc' parameter".to_string());
611                }
612            };
613            let header: hotmint_light::BlockHeader =
614                match serde_json::from_value(header_val.clone()) {
615                    Ok(h) => h,
616                    Err(e) => {
617                        return RpcResponse::err(req.id, -32602, format!("invalid header: {e}"));
618                    }
619                };
620            let qc: hotmint_types::QuorumCertificate = match serde_json::from_value(qc_val.clone())
621            {
622                Ok(q) => q,
623                Err(e) => {
624                    return RpcResponse::err(req.id, -32602, format!("invalid qc: {e}"));
625                }
626            };
627            // Build a LightClient from the current validator set.
628            let validators_info = state.validator_set_rx.borrow().clone();
629            let validators: Vec<hotmint_types::ValidatorInfo> = validators_info
630                .iter()
631                .map(|v| hotmint_types::ValidatorInfo {
632                    id: hotmint_types::ValidatorId(v.id),
633                    public_key: {
634                        let bytes = hex_decode(&v.public_key).unwrap_or_default();
635                        let mut arr = [0u8; 32];
636                        if bytes.len() == 32 {
637                            arr.copy_from_slice(&bytes);
638                        }
639                        hotmint_types::PublicKey(arr.to_vec())
640                    },
641                    power: v.power,
642                })
643                .collect();
644            let vs = hotmint_types::ValidatorSet::new(validators);
645            let status = *state.status_rx.borrow();
646            let lc = hotmint_light::LightClient::new(
647                vs.clone(),
648                hotmint_types::Height(status.last_committed_height),
649                state.chain_id_hash,
650            );
651            let verifier = hotmint_crypto::Ed25519Verifier;
652            match lc.verify_header(&header, &qc, &verifier) {
653                Ok(()) => json_ok(
654                    req.id,
655                    &VerifyHeaderResult {
656                        valid: true,
657                        error: None,
658                    },
659                ),
660                Err(e) => json_ok(
661                    req.id,
662                    &VerifyHeaderResult {
663                        valid: false,
664                        error: Some(e.to_string()),
665                    },
666                ),
667            }
668        }
669
670        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
671    }
672}
673
674fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
675    match serde_json::to_value(val) {
676        Ok(v) => RpcResponse::ok(id, v),
677        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
678    }
679}
680
681fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
682    BlockInfo {
683        height: block.height.as_u64(),
684        hash: hex_encode(&block.hash.0),
685        parent_hash: hex_encode(&block.parent_hash.0),
686        view: block.view.as_u64(),
687        proposer: block.proposer.0,
688        payload_size: block.payload.len(),
689    }
690}
691
692fn hex_encode(bytes: &[u8]) -> String {
693    bytes.iter().map(|b| format!("{b:02x}")).collect()
694}
695
696fn hex_decode(s: &str) -> Option<Vec<u8>> {
697    if !s.len().is_multiple_of(2) {
698        return None;
699    }
700    (0..s.len())
701        .step_by(2)
702        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
703        .collect()
704}
705
706fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
707    let bytes = hex_decode(s)?;
708    if bytes.len() != 32 {
709        return None;
710    }
711    let mut arr = [0u8; 32];
712    arr.copy_from_slice(&bytes);
713    Some(BlockHash(arr))
714}
715
716/// Read a line from `reader`, failing fast if it exceeds `max_bytes`.
717///
718/// Uses `fill_buf` + incremental scanning so memory allocation is bounded.
719/// Returns `Ok(None)` on EOF, `Ok(Some(line))` on success, or an error
720/// if the line exceeds the limit.
721async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
722    reader: &mut R,
723    max_bytes: usize,
724) -> io::Result<Option<String>> {
725    let mut buf = Vec::new();
726    loop {
727        let available = reader.fill_buf().await?;
728        if available.is_empty() {
729            return if buf.is_empty() {
730                Ok(None)
731            } else {
732                Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
733            };
734        }
735        if let Some(pos) = available.iter().position(|&b| b == b'\n') {
736            buf.extend_from_slice(&available[..pos]);
737            reader.consume(pos + 1);
738            return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
739        }
740        let to_consume = available.len();
741        buf.extend_from_slice(available);
742        reader.consume(to_consume);
743        if buf.len() > max_bytes {
744            return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
745        }
746    }
747}