Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::io;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use tokio::sync::RwLock;
8
9use crate::types::{
10    BlockInfo, CommitQcInfo, EpochInfo, HeaderInfo, RpcRequest, RpcResponse, StatusInfo, TxResult,
11    ValidatorInfoResponse,
12};
13use hotmint_consensus::application::Application;
14use hotmint_consensus::store::BlockStore;
15use hotmint_mempool::Mempool;
16use hotmint_network::service::PeerStatus;
17use hotmint_types::{BlockHash, Height};
18use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
19use tokio::net::TcpListener;
20use tokio::sync::{Semaphore, watch};
21use tokio::time::{Duration, timeout};
22use tracing::{info, warn};
23
24const MAX_RPC_CONNECTIONS: usize = 256;
25const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
26/// Maximum bytes per RPC line. Prevents OOM from clients sending huge data without newlines.
27const MAX_LINE_BYTES: usize = 1_048_576;
28/// Maximum submit_tx calls per second per connection (token bucket).
29pub(crate) const TX_RATE_LIMIT_PER_SEC: u32 = 100;
30
31/// Named consensus status shared via watch channel.
32#[derive(Debug, Clone, Copy)]
33pub struct ConsensusStatus {
34    pub current_view: u64,
35    pub last_committed_height: u64,
36    pub epoch_number: u64,
37    pub validator_count: usize,
38    pub epoch_start_view: u64,
39}
40
41impl ConsensusStatus {
42    pub fn new(
43        current_view: u64,
44        last_committed_height: u64,
45        epoch_number: u64,
46        validator_count: usize,
47        epoch_start_view: u64,
48    ) -> Self {
49        Self {
50            current_view,
51            last_committed_height,
52            epoch_number,
53            validator_count,
54            epoch_start_view,
55        }
56    }
57}
58
59/// Shared state accessible by the RPC server
60pub struct RpcState {
61    pub validator_id: u64,
62    pub mempool: Arc<Mempool>,
63    pub status_rx: watch::Receiver<ConsensusStatus>,
64    /// Shared block store for block queries
65    pub store: Arc<RwLock<Box<dyn BlockStore>>>,
66    /// Peer info channel
67    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
68    /// Live validator set for get_validators
69    pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
70    /// Application reference for tx validation (optional for backward compatibility).
71    pub app: Option<Arc<dyn Application>>,
72}
73
74/// Simple JSON-RPC server over TCP (one JSON object per line)
75pub struct RpcServer {
76    state: Arc<RpcState>,
77    listener: TcpListener,
78}
79
80impl RpcServer {
81    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
82        let listener = TcpListener::bind(addr)
83            .await
84            .c(d!("failed to bind RPC server"))?;
85        info!(addr = addr, "RPC server listening");
86        Ok(Self {
87            state: Arc::new(state),
88            listener,
89        })
90    }
91
92    pub fn local_addr(&self) -> SocketAddr {
93        self.listener.local_addr().expect("listener has local addr")
94    }
95
96    pub async fn run(self) {
97        let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
98        loop {
99            match self.listener.accept().await {
100                Ok((stream, _addr)) => {
101                    let permit = match semaphore.clone().try_acquire_owned() {
102                        Ok(p) => p,
103                        Err(_) => {
104                            warn!("RPC connection limit reached, rejecting");
105                            drop(stream);
106                            continue;
107                        }
108                    };
109                    let state = self.state.clone();
110                    tokio::spawn(async move {
111                        let _permit = permit;
112                        let (reader, mut writer) = stream.into_split();
113                        let mut reader = BufReader::with_capacity(65_536, reader);
114                        let mut tx_limiter = TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC);
115                        loop {
116                            let line = match timeout(
117                                RPC_READ_TIMEOUT,
118                                read_line_limited(&mut reader, MAX_LINE_BYTES),
119                            )
120                            .await
121                            {
122                                Ok(Ok(Some(line))) => line,
123                                Ok(Err(e)) => {
124                                    warn!(error = %e, "RPC read error (line too long?)");
125                                    break;
126                                }
127                                _ => break, // EOF or timeout
128                            };
129                            let response = handle_request(&state, &line, &mut tx_limiter).await;
130                            let mut json = serde_json::to_string(&response).unwrap_or_default();
131                            json.push('\n');
132                            if writer.write_all(json.as_bytes()).await.is_err() {
133                                break;
134                            }
135                        }
136                    });
137                }
138                Err(e) => {
139                    warn!(error = %e, "failed to accept connection");
140                }
141            }
142        }
143    }
144}
145
146/// Token-bucket rate limiter for submit_tx.
147pub struct TxRateLimiter {
148    tokens: u32,
149    max_tokens: u32,
150    last_refill: tokio::time::Instant,
151}
152
153impl TxRateLimiter {
154    pub(crate) fn new(rate_per_sec: u32) -> Self {
155        Self {
156            tokens: rate_per_sec,
157            max_tokens: rate_per_sec,
158            last_refill: tokio::time::Instant::now(),
159        }
160    }
161
162    pub(crate) fn allow(&mut self) -> bool {
163        let now = tokio::time::Instant::now();
164        let elapsed = now.duration_since(self.last_refill);
165        if elapsed >= Duration::from_secs(1) {
166            self.tokens = self.max_tokens;
167            self.last_refill = now;
168        }
169        if self.tokens > 0 {
170            self.tokens -= 1;
171            true
172        } else {
173            false
174        }
175    }
176}
177
178pub(crate) async fn handle_request(
179    state: &RpcState,
180    line: &str,
181    tx_limiter: &mut TxRateLimiter,
182) -> RpcResponse {
183    let req: RpcRequest = match serde_json::from_str(line) {
184        Ok(r) => r,
185        Err(e) => {
186            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
187        }
188    };
189
190    match req.method.as_str() {
191        "status" => {
192            let s = *state.status_rx.borrow();
193            let info = StatusInfo {
194                validator_id: state.validator_id,
195                current_view: s.current_view,
196                last_committed_height: s.last_committed_height,
197                epoch: s.epoch_number,
198                validator_count: s.validator_count,
199                mempool_size: state.mempool.size().await,
200            };
201            json_ok(req.id, &info)
202        }
203
204        "submit_tx" => {
205            // Per-connection rate limiting
206            if !tx_limiter.allow() {
207                return RpcResponse::err(
208                    req.id,
209                    -32000,
210                    "rate limit exceeded for submit_tx".to_string(),
211                );
212            }
213            let Some(tx_hex) = req.params.as_str() else {
214                return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
215            };
216            if tx_hex.is_empty() {
217                return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
218            }
219            let tx_bytes = match hex_decode(tx_hex) {
220                Some(b) if !b.is_empty() => b,
221                _ => {
222                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
223                }
224            };
225            // Validate via Application if available
226            let (priority, gas_wanted) = if let Some(ref app) = state.app {
227                let result = app.validate_tx(&tx_bytes, None);
228                if !result.valid {
229                    return RpcResponse::err(
230                        req.id,
231                        -32602,
232                        "transaction validation failed".to_string(),
233                    );
234                }
235                (result.priority, result.gas_wanted)
236            } else {
237                (0, 0)
238            };
239            let accepted = state
240                .mempool
241                .add_tx_with_gas(tx_bytes, priority, gas_wanted)
242                .await;
243            json_ok(req.id, &TxResult { accepted })
244        }
245
246        "get_block" => {
247            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
248                Some(h) => h,
249                None => {
250                    return RpcResponse::err(
251                        req.id,
252                        -32602,
253                        "missing or invalid 'height' parameter".to_string(),
254                    );
255                }
256            };
257            let store = state.store.read().await;
258            match store.get_block_by_height(Height(height)) {
259                Some(block) => json_ok(req.id, &block_to_info(&block)),
260                None => RpcResponse::err(
261                    req.id,
262                    -32602,
263                    format!("block at height {height} not found"),
264                ),
265            }
266        }
267
268        "get_block_by_hash" => {
269            let hash_hex = req.params.as_str().unwrap_or_default();
270            match hex_to_block_hash(hash_hex) {
271                Some(hash) => {
272                    let store = state.store.read().await;
273                    match store.get_block(&hash) {
274                        Some(block) => json_ok(req.id, &block_to_info(&block)),
275                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
276                    }
277                }
278                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
279            }
280        }
281
282        "get_validators" => {
283            let validators = state.validator_set_rx.borrow().clone();
284            json_ok(req.id, &validators)
285        }
286
287        "get_epoch" => {
288            let s = *state.status_rx.borrow();
289            let info = EpochInfo {
290                number: s.epoch_number,
291                start_view: s.epoch_start_view,
292                validator_count: s.validator_count,
293            };
294            json_ok(req.id, &info)
295        }
296
297        "get_peers" => {
298            let peers = state.peer_info_rx.borrow().clone();
299            json_ok(req.id, &peers)
300        }
301
302        "get_header" => {
303            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
304                Some(h) => h,
305                None => {
306                    return RpcResponse::err(
307                        req.id,
308                        -32602,
309                        "missing or invalid 'height' parameter".to_string(),
310                    );
311                }
312            };
313            let store = state.store.read().await;
314            match store.get_block_by_height(Height(height)) {
315                Some(block) => {
316                    let info = HeaderInfo {
317                        height: block.height.as_u64(),
318                        hash: hex_encode(&block.hash.0),
319                        parent_hash: hex_encode(&block.parent_hash.0),
320                        view: block.view.as_u64(),
321                        proposer: block.proposer.0,
322                        app_hash: hex_encode(&block.app_hash.0),
323                    };
324                    json_ok(req.id, &info)
325                }
326                None => RpcResponse::err(
327                    req.id,
328                    -32602,
329                    format!("block at height {height} not found"),
330                ),
331            }
332        }
333
334        "get_commit_qc" => {
335            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
336                Some(h) => h,
337                None => {
338                    return RpcResponse::err(
339                        req.id,
340                        -32602,
341                        "missing or invalid 'height' parameter".to_string(),
342                    );
343                }
344            };
345            let store = state.store.read().await;
346            match store.get_commit_qc(Height(height)) {
347                Some(qc) => {
348                    let info = CommitQcInfo {
349                        block_hash: hex_encode(&qc.block_hash.0),
350                        view: qc.view.as_u64(),
351                        signer_count: qc.aggregate_signature.count(),
352                        epoch: qc.epoch.as_u64(),
353                    };
354                    json_ok(req.id, &info)
355                }
356                None => RpcResponse::err(
357                    req.id,
358                    -32602,
359                    format!("commit QC at height {height} not found"),
360                ),
361            }
362        }
363
364        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
365    }
366}
367
368fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
369    match serde_json::to_value(val) {
370        Ok(v) => RpcResponse::ok(id, v),
371        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
372    }
373}
374
375fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
376    BlockInfo {
377        height: block.height.as_u64(),
378        hash: hex_encode(&block.hash.0),
379        parent_hash: hex_encode(&block.parent_hash.0),
380        view: block.view.as_u64(),
381        proposer: block.proposer.0,
382        payload_size: block.payload.len(),
383    }
384}
385
386fn hex_encode(bytes: &[u8]) -> String {
387    bytes.iter().map(|b| format!("{b:02x}")).collect()
388}
389
390fn hex_decode(s: &str) -> Option<Vec<u8>> {
391    if !s.len().is_multiple_of(2) {
392        return None;
393    }
394    (0..s.len())
395        .step_by(2)
396        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
397        .collect()
398}
399
400fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
401    let bytes = hex_decode(s)?;
402    if bytes.len() != 32 {
403        return None;
404    }
405    let mut arr = [0u8; 32];
406    arr.copy_from_slice(&bytes);
407    Some(BlockHash(arr))
408}
409
410/// Read a line from `reader`, failing fast if it exceeds `max_bytes`.
411///
412/// Uses `fill_buf` + incremental scanning so memory allocation is bounded.
413/// Returns `Ok(None)` on EOF, `Ok(Some(line))` on success, or an error
414/// if the line exceeds the limit.
415async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
416    reader: &mut R,
417    max_bytes: usize,
418) -> io::Result<Option<String>> {
419    let mut buf = Vec::new();
420    loop {
421        let available = reader.fill_buf().await?;
422        if available.is_empty() {
423            return if buf.is_empty() {
424                Ok(None)
425            } else {
426                Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
427            };
428        }
429        if let Some(pos) = available.iter().position(|&b| b == b'\n') {
430            buf.extend_from_slice(&available[..pos]);
431            reader.consume(pos + 1);
432            return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
433        }
434        let to_consume = available.len();
435        buf.extend_from_slice(available);
436        reader.consume(to_consume);
437        if buf.len() > max_bytes {
438            return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
439        }
440    }
441}