Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::types::{
6    BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
7};
8use hotmint_consensus::application::Application;
9use hotmint_consensus::store::BlockStore;
10use hotmint_mempool::Mempool;
11use hotmint_network::service::PeerStatus;
12use hotmint_types::{BlockHash, Height};
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::net::TcpListener;
15use tokio::sync::{Semaphore, watch};
16use tokio::time::{Duration, timeout};
17use tracing::{info, warn};
18
19const MAX_RPC_CONNECTIONS: usize = 256;
20const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
21/// Maximum bytes per RPC line. Prevents OOM from clients sending huge data without newlines.
22const MAX_LINE_BYTES: usize = 1_048_576;
23
24/// Status tuple: (current_view, last_committed_height, epoch_number, validator_count, epoch_start_view)
25pub type StatusTuple = (u64, u64, u64, usize, u64);
26
27/// Shared state accessible by the RPC server
28pub struct RpcState {
29    pub validator_id: u64,
30    pub mempool: Arc<Mempool>,
31    pub status_rx: watch::Receiver<StatusTuple>,
32    /// Shared block store for block queries
33    pub store: Arc<RwLock<Box<dyn BlockStore>>>,
34    /// Peer info channel
35    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
36    /// Live validator set for get_validators
37    pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
38    /// Application reference for tx validation (optional for backward compatibility).
39    pub app: Option<Arc<dyn Application>>,
40}
41
42/// Simple JSON-RPC server over TCP (one JSON object per line)
43pub struct RpcServer {
44    state: Arc<RpcState>,
45    listener: TcpListener,
46}
47
48impl RpcServer {
49    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
50        let listener = TcpListener::bind(addr)
51            .await
52            .c(d!("failed to bind RPC server"))?;
53        info!(addr = addr, "RPC server listening");
54        Ok(Self {
55            state: Arc::new(state),
56            listener,
57        })
58    }
59
60    pub fn local_addr(&self) -> std::net::SocketAddr {
61        self.listener.local_addr().expect("listener has local addr")
62    }
63
64    pub async fn run(self) {
65        let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
66        loop {
67            match self.listener.accept().await {
68                Ok((stream, _addr)) => {
69                    let permit = match semaphore.clone().try_acquire_owned() {
70                        Ok(p) => p,
71                        Err(_) => {
72                            warn!("RPC connection limit reached, rejecting");
73                            drop(stream);
74                            continue;
75                        }
76                    };
77                    let state = self.state.clone();
78                    tokio::spawn(async move {
79                        let _permit = permit;
80                        let (reader, mut writer) = stream.into_split();
81                        let mut reader = BufReader::with_capacity(65_536, reader);
82                        loop {
83                            let line = match timeout(
84                                RPC_READ_TIMEOUT,
85                                read_line_limited(&mut reader, MAX_LINE_BYTES),
86                            )
87                            .await
88                            {
89                                Ok(Ok(Some(line))) => line,
90                                Ok(Err(e)) => {
91                                    warn!(error = %e, "RPC read error (line too long?)");
92                                    break;
93                                }
94                                _ => break, // EOF or timeout
95                            };
96                            let response = handle_request(&state, &line).await;
97                            let mut json = serde_json::to_string(&response).unwrap_or_default();
98                            json.push('\n');
99                            if writer.write_all(json.as_bytes()).await.is_err() {
100                                break;
101                            }
102                        }
103                    });
104                }
105                Err(e) => {
106                    warn!(error = %e, "failed to accept connection");
107                }
108            }
109        }
110    }
111}
112
113async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
114    let req: RpcRequest = match serde_json::from_str(line) {
115        Ok(r) => r,
116        Err(e) => {
117            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
118        }
119    };
120
121    match req.method.as_str() {
122        "status" => {
123            let (view, height, epoch, validator_count, _) = *state.status_rx.borrow();
124            let info = StatusInfo {
125                validator_id: state.validator_id,
126                current_view: view,
127                last_committed_height: height,
128                epoch,
129                validator_count,
130                mempool_size: state.mempool.size().await,
131            };
132            json_ok(req.id, &info)
133        }
134
135        "submit_tx" => {
136            let Some(tx_hex) = req.params.as_str() else {
137                return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
138            };
139            if tx_hex.is_empty() {
140                return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
141            }
142            let tx_bytes = match hex_decode(tx_hex) {
143                Some(b) if !b.is_empty() => b,
144                _ => {
145                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
146                }
147            };
148            // Validate via Application if available
149            if let Some(ref app) = state.app
150                && !app.validate_tx(&tx_bytes, None)
151            {
152                return RpcResponse::err(
153                    req.id,
154                    -32602,
155                    "transaction validation failed".to_string(),
156                );
157            }
158            let accepted = state.mempool.add_tx(tx_bytes).await;
159            json_ok(req.id, &TxResult { accepted })
160        }
161
162        "get_block" => {
163            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
164                Some(h) => h,
165                None => {
166                    return RpcResponse::err(
167                        req.id,
168                        -32602,
169                        "missing or invalid 'height' parameter".to_string(),
170                    );
171                }
172            };
173            let store = state.store.read().unwrap();
174            match store.get_block_by_height(Height(height)) {
175                Some(block) => json_ok(req.id, &block_to_info(&block)),
176                None => RpcResponse::err(
177                    req.id,
178                    -32602,
179                    format!("block at height {height} not found"),
180                ),
181            }
182        }
183
184        "get_block_by_hash" => {
185            let hash_hex = req.params.as_str().unwrap_or_default();
186            match hex_to_block_hash(hash_hex) {
187                Some(hash) => {
188                    let store = state.store.read().unwrap();
189                    match store.get_block(&hash) {
190                        Some(block) => json_ok(req.id, &block_to_info(&block)),
191                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
192                    }
193                }
194                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
195            }
196        }
197
198        "get_validators" => {
199            let validators = state.validator_set_rx.borrow().clone();
200            json_ok(req.id, &validators)
201        }
202
203        "get_epoch" => {
204            let (_, _, epoch, validator_count, start_view) = *state.status_rx.borrow();
205            let info = EpochInfo {
206                number: epoch,
207                start_view,
208                validator_count,
209            };
210            json_ok(req.id, &info)
211        }
212
213        "get_peers" => {
214            let peers = state.peer_info_rx.borrow().clone();
215            json_ok(req.id, &peers)
216        }
217
218        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
219    }
220}
221
222fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
223    match serde_json::to_value(val) {
224        Ok(v) => RpcResponse::ok(id, v),
225        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
226    }
227}
228
229fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
230    BlockInfo {
231        height: block.height.as_u64(),
232        hash: hex_encode(&block.hash.0),
233        parent_hash: hex_encode(&block.parent_hash.0),
234        view: block.view.as_u64(),
235        proposer: block.proposer.0,
236        payload_size: block.payload.len(),
237    }
238}
239
240fn hex_encode(bytes: &[u8]) -> String {
241    bytes.iter().map(|b| format!("{b:02x}")).collect()
242}
243
244fn hex_decode(s: &str) -> Option<Vec<u8>> {
245    if !s.len().is_multiple_of(2) {
246        return None;
247    }
248    (0..s.len())
249        .step_by(2)
250        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
251        .collect()
252}
253
254fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
255    let bytes = hex_decode(s)?;
256    if bytes.len() != 32 {
257        return None;
258    }
259    let mut arr = [0u8; 32];
260    arr.copy_from_slice(&bytes);
261    Some(BlockHash(arr))
262}
263
264/// Read a line from `reader`, failing fast if it exceeds `max_bytes`.
265///
266/// Uses `fill_buf` + incremental scanning so memory allocation is bounded.
267/// Returns `Ok(None)` on EOF, `Ok(Some(line))` on success, or an error
268/// if the line exceeds the limit.
269async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
270    reader: &mut R,
271    max_bytes: usize,
272) -> std::io::Result<Option<String>> {
273    let mut buf = Vec::new();
274    loop {
275        let available = reader.fill_buf().await?;
276        if available.is_empty() {
277            return if buf.is_empty() {
278                Ok(None)
279            } else {
280                Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
281            };
282        }
283        if let Some(pos) = available.iter().position(|&b| b == b'\n') {
284            buf.extend_from_slice(&available[..pos]);
285            reader.consume(pos + 1);
286            return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
287        }
288        let to_consume = available.len();
289        buf.extend_from_slice(available);
290        reader.consume(to_consume);
291        if buf.len() > max_bytes {
292            return Err(std::io::Error::new(
293                std::io::ErrorKind::InvalidData,
294                "line too long",
295            ));
296        }
297    }
298}