Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::types::{
6    BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
7};
8use hotmint_consensus::application::Application;
9use hotmint_consensus::store::BlockStore;
10use hotmint_mempool::Mempool;
11use hotmint_network::service::PeerStatus;
12use hotmint_types::{BlockHash, Height};
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::net::TcpListener;
15use tokio::sync::{Semaphore, watch};
16use tokio::time::{Duration, timeout};
17use tracing::{info, warn};
18
19const MAX_RPC_CONNECTIONS: usize = 256;
20const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
21/// Maximum bytes per RPC line. Prevents OOM from clients sending huge data without newlines.
22const MAX_LINE_BYTES: usize = 1_048_576;
23
24/// Named consensus status shared via watch channel.
25#[derive(Debug, Clone, Copy)]
26pub struct ConsensusStatus {
27    pub current_view: u64,
28    pub last_committed_height: u64,
29    pub epoch_number: u64,
30    pub validator_count: usize,
31    pub epoch_start_view: u64,
32}
33
34impl ConsensusStatus {
35    pub fn new(
36        current_view: u64,
37        last_committed_height: u64,
38        epoch_number: u64,
39        validator_count: usize,
40        epoch_start_view: u64,
41    ) -> Self {
42        Self {
43            current_view,
44            last_committed_height,
45            epoch_number,
46            validator_count,
47            epoch_start_view,
48        }
49    }
50}
51
52/// Shared state accessible by the RPC server
53pub struct RpcState {
54    pub validator_id: u64,
55    pub mempool: Arc<Mempool>,
56    pub status_rx: watch::Receiver<ConsensusStatus>,
57    /// Shared block store for block queries
58    pub store: Arc<RwLock<Box<dyn BlockStore>>>,
59    /// Peer info channel
60    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
61    /// Live validator set for get_validators
62    pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
63    /// Application reference for tx validation (optional for backward compatibility).
64    pub app: Option<Arc<dyn Application>>,
65}
66
67/// Simple JSON-RPC server over TCP (one JSON object per line)
68pub struct RpcServer {
69    state: Arc<RpcState>,
70    listener: TcpListener,
71}
72
73impl RpcServer {
74    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
75        let listener = TcpListener::bind(addr)
76            .await
77            .c(d!("failed to bind RPC server"))?;
78        info!(addr = addr, "RPC server listening");
79        Ok(Self {
80            state: Arc::new(state),
81            listener,
82        })
83    }
84
85    pub fn local_addr(&self) -> std::net::SocketAddr {
86        self.listener.local_addr().expect("listener has local addr")
87    }
88
89    pub async fn run(self) {
90        let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
91        loop {
92            match self.listener.accept().await {
93                Ok((stream, _addr)) => {
94                    let permit = match semaphore.clone().try_acquire_owned() {
95                        Ok(p) => p,
96                        Err(_) => {
97                            warn!("RPC connection limit reached, rejecting");
98                            drop(stream);
99                            continue;
100                        }
101                    };
102                    let state = self.state.clone();
103                    tokio::spawn(async move {
104                        let _permit = permit;
105                        let (reader, mut writer) = stream.into_split();
106                        let mut reader = BufReader::with_capacity(65_536, reader);
107                        loop {
108                            let line = match timeout(
109                                RPC_READ_TIMEOUT,
110                                read_line_limited(&mut reader, MAX_LINE_BYTES),
111                            )
112                            .await
113                            {
114                                Ok(Ok(Some(line))) => line,
115                                Ok(Err(e)) => {
116                                    warn!(error = %e, "RPC read error (line too long?)");
117                                    break;
118                                }
119                                _ => break, // EOF or timeout
120                            };
121                            let response = handle_request(&state, &line).await;
122                            let mut json = serde_json::to_string(&response).unwrap_or_default();
123                            json.push('\n');
124                            if writer.write_all(json.as_bytes()).await.is_err() {
125                                break;
126                            }
127                        }
128                    });
129                }
130                Err(e) => {
131                    warn!(error = %e, "failed to accept connection");
132                }
133            }
134        }
135    }
136}
137
138async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
139    let req: RpcRequest = match serde_json::from_str(line) {
140        Ok(r) => r,
141        Err(e) => {
142            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
143        }
144    };
145
146    match req.method.as_str() {
147        "status" => {
148            let s = *state.status_rx.borrow();
149            let info = StatusInfo {
150                validator_id: state.validator_id,
151                current_view: s.current_view,
152                last_committed_height: s.last_committed_height,
153                epoch: s.epoch_number,
154                validator_count: s.validator_count,
155                mempool_size: state.mempool.size().await,
156            };
157            json_ok(req.id, &info)
158        }
159
160        "submit_tx" => {
161            let Some(tx_hex) = req.params.as_str() else {
162                return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
163            };
164            if tx_hex.is_empty() {
165                return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
166            }
167            let tx_bytes = match hex_decode(tx_hex) {
168                Some(b) if !b.is_empty() => b,
169                _ => {
170                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
171                }
172            };
173            // Validate via Application if available
174            if let Some(ref app) = state.app
175                && !app.validate_tx(&tx_bytes, None)
176            {
177                return RpcResponse::err(
178                    req.id,
179                    -32602,
180                    "transaction validation failed".to_string(),
181                );
182            }
183            let accepted = state.mempool.add_tx(tx_bytes).await;
184            json_ok(req.id, &TxResult { accepted })
185        }
186
187        "get_block" => {
188            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
189                Some(h) => h,
190                None => {
191                    return RpcResponse::err(
192                        req.id,
193                        -32602,
194                        "missing or invalid 'height' parameter".to_string(),
195                    );
196                }
197            };
198            let store = state.store.read().unwrap();
199            match store.get_block_by_height(Height(height)) {
200                Some(block) => json_ok(req.id, &block_to_info(&block)),
201                None => RpcResponse::err(
202                    req.id,
203                    -32602,
204                    format!("block at height {height} not found"),
205                ),
206            }
207        }
208
209        "get_block_by_hash" => {
210            let hash_hex = req.params.as_str().unwrap_or_default();
211            match hex_to_block_hash(hash_hex) {
212                Some(hash) => {
213                    let store = state.store.read().unwrap();
214                    match store.get_block(&hash) {
215                        Some(block) => json_ok(req.id, &block_to_info(&block)),
216                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
217                    }
218                }
219                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
220            }
221        }
222
223        "get_validators" => {
224            let validators = state.validator_set_rx.borrow().clone();
225            json_ok(req.id, &validators)
226        }
227
228        "get_epoch" => {
229            let s = *state.status_rx.borrow();
230            let info = EpochInfo {
231                number: s.epoch_number,
232                start_view: s.epoch_start_view,
233                validator_count: s.validator_count,
234            };
235            json_ok(req.id, &info)
236        }
237
238        "get_peers" => {
239            let peers = state.peer_info_rx.borrow().clone();
240            json_ok(req.id, &peers)
241        }
242
243        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
244    }
245}
246
247fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
248    match serde_json::to_value(val) {
249        Ok(v) => RpcResponse::ok(id, v),
250        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
251    }
252}
253
254fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
255    BlockInfo {
256        height: block.height.as_u64(),
257        hash: hex_encode(&block.hash.0),
258        parent_hash: hex_encode(&block.parent_hash.0),
259        view: block.view.as_u64(),
260        proposer: block.proposer.0,
261        payload_size: block.payload.len(),
262    }
263}
264
265fn hex_encode(bytes: &[u8]) -> String {
266    bytes.iter().map(|b| format!("{b:02x}")).collect()
267}
268
269fn hex_decode(s: &str) -> Option<Vec<u8>> {
270    if !s.len().is_multiple_of(2) {
271        return None;
272    }
273    (0..s.len())
274        .step_by(2)
275        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
276        .collect()
277}
278
279fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
280    let bytes = hex_decode(s)?;
281    if bytes.len() != 32 {
282        return None;
283    }
284    let mut arr = [0u8; 32];
285    arr.copy_from_slice(&bytes);
286    Some(BlockHash(arr))
287}
288
289/// Read a line from `reader`, failing fast if it exceeds `max_bytes`.
290///
291/// Uses `fill_buf` + incremental scanning so memory allocation is bounded.
292/// Returns `Ok(None)` on EOF, `Ok(Some(line))` on success, or an error
293/// if the line exceeds the limit.
294async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
295    reader: &mut R,
296    max_bytes: usize,
297) -> std::io::Result<Option<String>> {
298    let mut buf = Vec::new();
299    loop {
300        let available = reader.fill_buf().await?;
301        if available.is_empty() {
302            return if buf.is_empty() {
303                Ok(None)
304            } else {
305                Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
306            };
307        }
308        if let Some(pos) = available.iter().position(|&b| b == b'\n') {
309            buf.extend_from_slice(&available[..pos]);
310            reader.consume(pos + 1);
311            return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
312        }
313        let to_consume = available.len();
314        buf.extend_from_slice(available);
315        reader.consume(to_consume);
316        if buf.len() > max_bytes {
317            return Err(std::io::Error::new(
318                std::io::ErrorKind::InvalidData,
319                "line too long",
320            ));
321        }
322    }
323}