Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::collections::HashMap;
4use std::io;
5use std::net::{IpAddr, SocketAddr};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, mpsc};
9
10use crate::types::{
11    BlockInfo, BlockResultsInfo, CommitQcInfo, EpochInfo, EventAttributeInfo, EventInfo,
12    HeaderInfo, QueryResponseInfo, RpcRequest, RpcResponse, StatusInfo, TxInfo, TxResult,
13    ValidatorInfoResponse, VerifyHeaderResult,
14};
15use hotmint_consensus::application::Application;
16use hotmint_consensus::commit::decode_payload;
17use hotmint_consensus::store::BlockStore;
18use hotmint_mempool::Mempool;
19use hotmint_network::service::PeerStatus;
20use hotmint_types::{BlockHash, Height};
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::TcpListener;
23use tokio::sync::{Semaphore, watch};
24use tokio::time::{Duration, Instant, timeout};
25use tracing::{info, warn};
26
27const MAX_RPC_CONNECTIONS: usize = 256;
28const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
29/// Maximum bytes per RPC line. Prevents OOM from clients sending huge data without newlines.
30const MAX_LINE_BYTES: usize = 1_048_576;
31/// Maximum submit_tx calls per second per IP address.
32pub(crate) const TX_RATE_LIMIT_PER_SEC: u32 = 100;
33/// How often to prune stale per-IP rate limiter entries.
34const IP_LIMITER_PRUNE_INTERVAL: Duration = Duration::from_secs(60);
35
36/// Named consensus status shared via watch channel.
37#[derive(Debug, Clone, Copy)]
38pub struct ConsensusStatus {
39    pub current_view: u64,
40    pub last_committed_height: u64,
41    pub epoch_number: u64,
42    pub validator_count: usize,
43    pub epoch_start_view: u64,
44}
45
46impl ConsensusStatus {
47    pub fn new(
48        current_view: u64,
49        last_committed_height: u64,
50        epoch_number: u64,
51        validator_count: usize,
52        epoch_start_view: u64,
53    ) -> Self {
54        Self {
55            current_view,
56            last_committed_height,
57            epoch_number,
58            validator_count,
59            epoch_start_view,
60        }
61    }
62}
63
64/// Shared state accessible by the RPC server
65pub struct RpcState {
66    pub validator_id: u64,
67    pub mempool: Arc<Mempool>,
68    pub status_rx: watch::Receiver<ConsensusStatus>,
69    /// Shared block store for block queries
70    pub store: Arc<parking_lot::RwLock<Box<dyn BlockStore>>>,
71    /// Peer info channel
72    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
73    /// Live validator set for get_validators
74    pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
75    /// Application reference for tx validation (optional for backward compatibility).
76    pub app: Option<Arc<dyn Application>>,
77    /// Optional sender to gossip accepted transactions to peers.
78    pub tx_gossip: Option<mpsc::Sender<Vec<u8>>>,
79    /// Chain ID hash for light client verification.
80    pub chain_id_hash: [u8; 32],
81}
82
83/// Simple JSON-RPC server over TCP (one JSON object per line)
84pub struct RpcServer {
85    state: Arc<RpcState>,
86    listener: TcpListener,
87}
88
89impl RpcServer {
90    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
91        let listener = TcpListener::bind(addr)
92            .await
93            .c(d!("failed to bind RPC server"))?;
94        info!(addr = addr, "RPC server listening");
95        Ok(Self {
96            state: Arc::new(state),
97            listener,
98        })
99    }
100
101    pub fn local_addr(&self) -> SocketAddr {
102        self.listener.local_addr().expect("listener has local addr")
103    }
104
105    pub async fn run(self) {
106        let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
107        let ip_limiter = Arc::new(Mutex::new(PerIpRateLimiter::new()));
108        loop {
109            match self.listener.accept().await {
110                Ok((stream, addr)) => {
111                    let permit = match semaphore.clone().try_acquire_owned() {
112                        Ok(p) => p,
113                        Err(_) => {
114                            warn!("RPC connection limit reached, rejecting");
115                            drop(stream);
116                            continue;
117                        }
118                    };
119                    let state = self.state.clone();
120                    let ip_limiter = ip_limiter.clone();
121                    let peer_ip = addr.ip();
122                    tokio::spawn(async move {
123                        let _permit = permit;
124                        let (reader, mut writer) = stream.into_split();
125                        let mut reader = BufReader::with_capacity(65_536, reader);
126                        loop {
127                            let line = match timeout(
128                                RPC_READ_TIMEOUT,
129                                read_line_limited(&mut reader, MAX_LINE_BYTES),
130                            )
131                            .await
132                            {
133                                Ok(Ok(Some(line))) => line,
134                                Ok(Err(e)) => {
135                                    warn!(error = %e, "RPC read error (line too long?)");
136                                    break;
137                                }
138                                _ => break, // EOF or timeout
139                            };
140                            let response =
141                                handle_request(&state, &line, &ip_limiter, peer_ip).await;
142                            let mut json = serde_json::to_string(&response).unwrap_or_default();
143                            json.push('\n');
144                            if writer.write_all(json.as_bytes()).await.is_err() {
145                                break;
146                            }
147                        }
148                    });
149                }
150                Err(e) => {
151                    warn!(error = %e, "failed to accept connection");
152                }
153            }
154        }
155    }
156}
157
158/// Token-bucket rate limiter for submit_tx.
159pub struct TxRateLimiter {
160    tokens: u32,
161    max_tokens: u32,
162    last_refill: Instant,
163}
164
165impl TxRateLimiter {
166    pub(crate) fn new(rate_per_sec: u32) -> Self {
167        Self {
168            tokens: rate_per_sec,
169            max_tokens: rate_per_sec,
170            last_refill: Instant::now(),
171        }
172    }
173
174    pub(crate) fn allow(&mut self) -> bool {
175        let now = Instant::now();
176        let elapsed = now.duration_since(self.last_refill);
177        if elapsed >= Duration::from_secs(1) {
178            self.tokens = self.max_tokens;
179            self.last_refill = now;
180        }
181        if self.tokens > 0 {
182            self.tokens -= 1;
183            true
184        } else {
185            false
186        }
187    }
188}
189
190/// Per-IP rate limiter that tracks token buckets per source IP.
191///
192/// Maximum number of tracked IPs in the per-IP rate limiter.
193const MAX_IP_LIMITER_ENTRIES: usize = 100_000;
194
195/// Prevents a single IP from monopolising `submit_tx` even when opening
196/// many TCP connections or HTTP requests.
197pub struct PerIpRateLimiter {
198    buckets: HashMap<IpAddr, TxRateLimiter>,
199    last_prune: Instant,
200}
201
202impl Default for PerIpRateLimiter {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208impl PerIpRateLimiter {
209    pub fn new() -> Self {
210        Self {
211            buckets: HashMap::new(),
212            last_prune: Instant::now(),
213        }
214    }
215
216    /// Check whether `ip` is allowed to submit a transaction.
217    pub fn allow(&mut self, ip: IpAddr) -> bool {
218        self.maybe_prune();
219        // Cap the number of tracked IPs to prevent memory exhaustion.
220        if !self.buckets.contains_key(&ip) && self.buckets.len() >= MAX_IP_LIMITER_ENTRIES {
221            return false;
222        }
223        let bucket = self
224            .buckets
225            .entry(ip)
226            .or_insert_with(|| TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC));
227        bucket.allow()
228    }
229
230    /// Remove entries that have not been touched for a while to avoid unbounded growth.
231    fn maybe_prune(&mut self) {
232        let now = Instant::now();
233        if now.duration_since(self.last_prune) < IP_LIMITER_PRUNE_INTERVAL {
234            return;
235        }
236        self.last_prune = now;
237        // Remove buckets that are fully refilled (idle for ≥1 s)
238        self.buckets
239            .retain(|_, v| now.duration_since(v.last_refill) < Duration::from_secs(30));
240    }
241}
242
243pub(crate) async fn handle_request(
244    state: &RpcState,
245    line: &str,
246    ip_limiter: &Mutex<PerIpRateLimiter>,
247    peer_ip: IpAddr,
248) -> RpcResponse {
249    let req: RpcRequest = match serde_json::from_str(line) {
250        Ok(r) => r,
251        Err(e) => {
252            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
253        }
254    };
255
256    match req.method.as_str() {
257        "status" => {
258            let s = *state.status_rx.borrow();
259            let info = StatusInfo {
260                validator_id: state.validator_id,
261                current_view: s.current_view,
262                last_committed_height: s.last_committed_height,
263                epoch: s.epoch_number,
264                validator_count: s.validator_count,
265                mempool_size: state.mempool.size().await,
266            };
267            json_ok(req.id, &info)
268        }
269
270        "submit_tx" => {
271            // Per-IP rate limiting (C-2: prevents bypass via multiple connections)
272            {
273                let mut limiter = ip_limiter.lock().await;
274                if !limiter.allow(peer_ip) {
275                    return RpcResponse::err(
276                        req.id,
277                        -32000,
278                        "rate limit exceeded for submit_tx".to_string(),
279                    );
280                }
281            }
282            let Some(tx_hex) = req.params.as_str() else {
283                return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
284            };
285            if tx_hex.is_empty() {
286                return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
287            }
288            let tx_bytes = match hex_decode(tx_hex) {
289                Some(b) if !b.is_empty() => b,
290                _ => {
291                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
292                }
293            };
294            // Validate via Application if available
295            let (priority, gas_wanted) = if let Some(ref app) = state.app {
296                let result = app.validate_tx(&tx_bytes, None);
297                if !result.valid {
298                    return RpcResponse::err(
299                        req.id,
300                        -32602,
301                        "transaction validation failed".to_string(),
302                    );
303                }
304                (result.priority, result.gas_wanted)
305            } else {
306                (0, 0)
307            };
308            let accepted = state
309                .mempool
310                .add_tx_with_gas(tx_bytes.clone(), priority, gas_wanted)
311                .await;
312            // Gossip accepted transactions to peers.
313            if accepted && let Some(ref gossip) = state.tx_gossip {
314                let _ = gossip.try_send(tx_bytes);
315            }
316            json_ok(req.id, &TxResult { accepted })
317        }
318
319        "get_block" => {
320            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
321                Some(h) => h,
322                None => {
323                    return RpcResponse::err(
324                        req.id,
325                        -32602,
326                        "missing or invalid 'height' parameter".to_string(),
327                    );
328                }
329            };
330            let store = state.store.read();
331            match store.get_block_by_height(Height(height)) {
332                Some(block) => json_ok(req.id, &block_to_info(&block)),
333                None => RpcResponse::err(
334                    req.id,
335                    -32602,
336                    format!("block at height {height} not found"),
337                ),
338            }
339        }
340
341        "get_block_by_hash" => {
342            let hash_hex = req.params.as_str().unwrap_or_default();
343            match hex_to_block_hash(hash_hex) {
344                Some(hash) => {
345                    let store = state.store.read();
346                    match store.get_block(&hash) {
347                        Some(block) => json_ok(req.id, &block_to_info(&block)),
348                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
349                    }
350                }
351                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
352            }
353        }
354
355        "get_validators" => {
356            let validators = state.validator_set_rx.borrow().clone();
357            json_ok(req.id, &validators)
358        }
359
360        "get_epoch" => {
361            let s = *state.status_rx.borrow();
362            let info = EpochInfo {
363                number: s.epoch_number,
364                start_view: s.epoch_start_view,
365                validator_count: s.validator_count,
366            };
367            json_ok(req.id, &info)
368        }
369
370        "get_peers" => {
371            let peers = state.peer_info_rx.borrow().clone();
372            json_ok(req.id, &peers)
373        }
374
375        "get_header" => {
376            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
377                Some(h) => h,
378                None => {
379                    return RpcResponse::err(
380                        req.id,
381                        -32602,
382                        "missing or invalid 'height' parameter".to_string(),
383                    );
384                }
385            };
386            let store = state.store.read();
387            match store.get_block_by_height(Height(height)) {
388                Some(block) => {
389                    let info = HeaderInfo {
390                        height: block.height.as_u64(),
391                        hash: hex_encode(&block.hash.0),
392                        parent_hash: hex_encode(&block.parent_hash.0),
393                        view: block.view.as_u64(),
394                        proposer: block.proposer.0,
395                        app_hash: hex_encode(&block.app_hash.0),
396                    };
397                    json_ok(req.id, &info)
398                }
399                None => RpcResponse::err(
400                    req.id,
401                    -32602,
402                    format!("block at height {height} not found"),
403                ),
404            }
405        }
406
407        "get_commit_qc" => {
408            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
409                Some(h) => h,
410                None => {
411                    return RpcResponse::err(
412                        req.id,
413                        -32602,
414                        "missing or invalid 'height' parameter".to_string(),
415                    );
416                }
417            };
418            let store = state.store.read();
419            match store.get_commit_qc(Height(height)) {
420                Some(qc) => {
421                    let info = CommitQcInfo {
422                        block_hash: hex_encode(&qc.block_hash.0),
423                        view: qc.view.as_u64(),
424                        signer_count: qc.aggregate_signature.count(),
425                        epoch: qc.epoch.as_u64(),
426                    };
427                    json_ok(req.id, &info)
428                }
429                None => RpcResponse::err(
430                    req.id,
431                    -32602,
432                    format!("commit QC at height {height} not found"),
433                ),
434            }
435        }
436
437        "get_tx" => {
438            let hash_hex = match req.params.as_str() {
439                Some(h) if !h.is_empty() => h,
440                _ => {
441                    return RpcResponse::err(
442                        req.id,
443                        -32602,
444                        "params must be a hex-encoded tx hash".to_string(),
445                    );
446                }
447            };
448            let hash_bytes = match hex_decode(hash_hex) {
449                Some(b) if b.len() == 32 => {
450                    let mut arr = [0u8; 32];
451                    arr.copy_from_slice(&b);
452                    arr
453                }
454                _ => {
455                    return RpcResponse::err(
456                        req.id,
457                        -32602,
458                        "invalid tx hash (expected 32-byte hex)".to_string(),
459                    );
460                }
461            };
462            let store = state.store.read();
463            match store.get_tx_location(&hash_bytes) {
464                Some((height, index)) => match store.get_block_by_height(height) {
465                    Some(block) => {
466                        let txs = decode_payload(&block.payload);
467                        match txs.get(index as usize) {
468                            Some(tx_bytes) => {
469                                let info = TxInfo {
470                                    tx_hash: hash_hex.to_string(),
471                                    height: height.as_u64(),
472                                    index,
473                                    data: hex_encode(tx_bytes),
474                                };
475                                json_ok(req.id, &info)
476                            }
477                            None => RpcResponse::err(
478                                req.id,
479                                -32602,
480                                "tx index out of range in block".to_string(),
481                            ),
482                        }
483                    }
484                    None => RpcResponse::err(
485                        req.id,
486                        -32602,
487                        format!("block at height {} not found", height.as_u64()),
488                    ),
489                },
490                None => RpcResponse::err(req.id, -32602, "transaction not found".to_string()),
491            }
492        }
493
494        "get_block_results" => {
495            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
496                Some(h) => h,
497                None => {
498                    return RpcResponse::err(
499                        req.id,
500                        -32602,
501                        "missing or invalid 'height' parameter".to_string(),
502                    );
503                }
504            };
505            let store = state.store.read();
506            match store.get_block_results(Height(height)) {
507                Some(results) => {
508                    // Also compute tx hashes from the block payload.
509                    let tx_hashes = if let Some(block) = store.get_block_by_height(Height(height)) {
510                        decode_payload(&block.payload)
511                            .iter()
512                            .map(|tx| hex_encode(blake3::hash(tx).as_bytes()))
513                            .collect()
514                    } else {
515                        vec![]
516                    };
517                    let info = BlockResultsInfo {
518                        height,
519                        tx_hashes,
520                        events: results
521                            .events
522                            .iter()
523                            .map(|e| EventInfo {
524                                r#type: e.r#type.clone(),
525                                attributes: e
526                                    .attributes
527                                    .iter()
528                                    .map(|a| EventAttributeInfo {
529                                        key: a.key.clone(),
530                                        value: a.value.clone(),
531                                    })
532                                    .collect(),
533                            })
534                            .collect(),
535                        app_hash: hex_encode(&results.app_hash.0),
536                    };
537                    json_ok(req.id, &info)
538                }
539                None => RpcResponse::err(
540                    req.id,
541                    -32602,
542                    format!("block results at height {height} not found"),
543                ),
544            }
545        }
546
547        "query" => {
548            let path = match req.params.get("path").and_then(|v| v.as_str()) {
549                Some(p) => p,
550                None => {
551                    return RpcResponse::err(
552                        req.id,
553                        -32602,
554                        "missing 'path' parameter".to_string(),
555                    );
556                }
557            };
558            let data_hex = req
559                .params
560                .get("data")
561                .and_then(|v| v.as_str())
562                .unwrap_or("");
563            let data = hex_decode(data_hex).unwrap_or_default();
564            match &state.app {
565                Some(app) => match app.query(path, &data) {
566                    Ok(resp) => {
567                        let info = QueryResponseInfo {
568                            data: hex_encode(&resp.data),
569                            proof: resp.proof.as_ref().map(|p| hex_encode(p)),
570                            height: resp.height,
571                        };
572                        json_ok(req.id, &info)
573                    }
574                    Err(e) => RpcResponse::err(req.id, -32602, format!("query failed: {e}")),
575                },
576                None => RpcResponse::err(
577                    req.id,
578                    -32602,
579                    "no application available for queries".to_string(),
580                ),
581            }
582        }
583
584        "verify_header" => {
585            // Accepts { header: BlockHeader JSON, qc: CommitQC JSON }.
586            // Uses the light client to verify the header.
587            let header_val = match req.params.get("header") {
588                Some(v) => v,
589                None => {
590                    return RpcResponse::err(
591                        req.id,
592                        -32602,
593                        "missing 'header' parameter".to_string(),
594                    );
595                }
596            };
597            let qc_val = match req.params.get("qc") {
598                Some(v) => v,
599                None => {
600                    return RpcResponse::err(req.id, -32602, "missing 'qc' parameter".to_string());
601                }
602            };
603            let header: hotmint_light::BlockHeader =
604                match serde_json::from_value(header_val.clone()) {
605                    Ok(h) => h,
606                    Err(e) => {
607                        return RpcResponse::err(req.id, -32602, format!("invalid header: {e}"));
608                    }
609                };
610            let qc: hotmint_types::QuorumCertificate = match serde_json::from_value(qc_val.clone())
611            {
612                Ok(q) => q,
613                Err(e) => {
614                    return RpcResponse::err(req.id, -32602, format!("invalid qc: {e}"));
615                }
616            };
617            // Build a LightClient from the current validator set.
618            let validators_info = state.validator_set_rx.borrow().clone();
619            let validators: Vec<hotmint_types::ValidatorInfo> = validators_info
620                .iter()
621                .map(|v| hotmint_types::ValidatorInfo {
622                    id: hotmint_types::ValidatorId(v.id),
623                    public_key: {
624                        let bytes = hex_decode(&v.public_key).unwrap_or_default();
625                        let mut arr = [0u8; 32];
626                        if bytes.len() == 32 {
627                            arr.copy_from_slice(&bytes);
628                        }
629                        hotmint_types::PublicKey(arr.to_vec())
630                    },
631                    power: v.power,
632                })
633                .collect();
634            let vs = hotmint_types::ValidatorSet::new(validators);
635            let status = *state.status_rx.borrow();
636            let lc = hotmint_light::LightClient::new(
637                vs.clone(),
638                hotmint_types::Height(status.last_committed_height),
639                state.chain_id_hash,
640            );
641            let verifier = hotmint_crypto::Ed25519Verifier;
642            match lc.verify_header(&header, &qc, &verifier) {
643                Ok(()) => json_ok(
644                    req.id,
645                    &VerifyHeaderResult {
646                        valid: true,
647                        error: None,
648                    },
649                ),
650                Err(e) => json_ok(
651                    req.id,
652                    &VerifyHeaderResult {
653                        valid: false,
654                        error: Some(e.to_string()),
655                    },
656                ),
657            }
658        }
659
660        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
661    }
662}
663
664fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
665    match serde_json::to_value(val) {
666        Ok(v) => RpcResponse::ok(id, v),
667        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
668    }
669}
670
671fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
672    BlockInfo {
673        height: block.height.as_u64(),
674        hash: hex_encode(&block.hash.0),
675        parent_hash: hex_encode(&block.parent_hash.0),
676        view: block.view.as_u64(),
677        proposer: block.proposer.0,
678        payload_size: block.payload.len(),
679    }
680}
681
682fn hex_encode(bytes: &[u8]) -> String {
683    bytes.iter().map(|b| format!("{b:02x}")).collect()
684}
685
686fn hex_decode(s: &str) -> Option<Vec<u8>> {
687    if !s.len().is_multiple_of(2) {
688        return None;
689    }
690    (0..s.len())
691        .step_by(2)
692        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
693        .collect()
694}
695
696fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
697    let bytes = hex_decode(s)?;
698    if bytes.len() != 32 {
699        return None;
700    }
701    let mut arr = [0u8; 32];
702    arr.copy_from_slice(&bytes);
703    Some(BlockHash(arr))
704}
705
706/// Read a line from `reader`, failing fast if it exceeds `max_bytes`.
707///
708/// Uses `fill_buf` + incremental scanning so memory allocation is bounded.
709/// Returns `Ok(None)` on EOF, `Ok(Some(line))` on success, or an error
710/// if the line exceeds the limit.
711async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
712    reader: &mut R,
713    max_bytes: usize,
714) -> io::Result<Option<String>> {
715    let mut buf = Vec::new();
716    loop {
717        let available = reader.fill_buf().await?;
718        if available.is_empty() {
719            return if buf.is_empty() {
720                Ok(None)
721            } else {
722                Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
723            };
724        }
725        if let Some(pos) = available.iter().position(|&b| b == b'\n') {
726            buf.extend_from_slice(&available[..pos]);
727            reader.consume(pos + 1);
728            return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
729        }
730        let to_consume = available.len();
731        buf.extend_from_slice(available);
732        reader.consume(to_consume);
733        if buf.len() > max_bytes {
734            return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
735        }
736    }
737}