Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::io;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use tokio::sync::RwLock;
8
9use crate::types::{
10    BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
11};
12use hotmint_consensus::application::Application;
13use hotmint_consensus::store::BlockStore;
14use hotmint_mempool::Mempool;
15use hotmint_network::service::PeerStatus;
16use hotmint_types::{BlockHash, Height};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::net::TcpListener;
19use tokio::sync::{Semaphore, watch};
20use tokio::time::{Duration, timeout};
21use tracing::{info, warn};
22
23const MAX_RPC_CONNECTIONS: usize = 256;
24const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
25/// Maximum bytes per RPC line. Prevents OOM from clients sending huge data without newlines.
26const MAX_LINE_BYTES: usize = 1_048_576;
27
28/// Named consensus status shared via watch channel.
29#[derive(Debug, Clone, Copy)]
30pub struct ConsensusStatus {
31    pub current_view: u64,
32    pub last_committed_height: u64,
33    pub epoch_number: u64,
34    pub validator_count: usize,
35    pub epoch_start_view: u64,
36}
37
38impl ConsensusStatus {
39    pub fn new(
40        current_view: u64,
41        last_committed_height: u64,
42        epoch_number: u64,
43        validator_count: usize,
44        epoch_start_view: u64,
45    ) -> Self {
46        Self {
47            current_view,
48            last_committed_height,
49            epoch_number,
50            validator_count,
51            epoch_start_view,
52        }
53    }
54}
55
56/// Shared state accessible by the RPC server
57pub struct RpcState {
58    pub validator_id: u64,
59    pub mempool: Arc<Mempool>,
60    pub status_rx: watch::Receiver<ConsensusStatus>,
61    /// Shared block store for block queries
62    pub store: Arc<RwLock<Box<dyn BlockStore>>>,
63    /// Peer info channel
64    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
65    /// Live validator set for get_validators
66    pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
67    /// Application reference for tx validation (optional for backward compatibility).
68    pub app: Option<Arc<dyn Application>>,
69}
70
71/// Simple JSON-RPC server over TCP (one JSON object per line)
72pub struct RpcServer {
73    state: Arc<RpcState>,
74    listener: TcpListener,
75}
76
77impl RpcServer {
78    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
79        let listener = TcpListener::bind(addr)
80            .await
81            .c(d!("failed to bind RPC server"))?;
82        info!(addr = addr, "RPC server listening");
83        Ok(Self {
84            state: Arc::new(state),
85            listener,
86        })
87    }
88
89    pub fn local_addr(&self) -> SocketAddr {
90        self.listener.local_addr().expect("listener has local addr")
91    }
92
93    pub async fn run(self) {
94        let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
95        loop {
96            match self.listener.accept().await {
97                Ok((stream, _addr)) => {
98                    let permit = match semaphore.clone().try_acquire_owned() {
99                        Ok(p) => p,
100                        Err(_) => {
101                            warn!("RPC connection limit reached, rejecting");
102                            drop(stream);
103                            continue;
104                        }
105                    };
106                    let state = self.state.clone();
107                    tokio::spawn(async move {
108                        let _permit = permit;
109                        let (reader, mut writer) = stream.into_split();
110                        let mut reader = BufReader::with_capacity(65_536, reader);
111                        loop {
112                            let line = match timeout(
113                                RPC_READ_TIMEOUT,
114                                read_line_limited(&mut reader, MAX_LINE_BYTES),
115                            )
116                            .await
117                            {
118                                Ok(Ok(Some(line))) => line,
119                                Ok(Err(e)) => {
120                                    warn!(error = %e, "RPC read error (line too long?)");
121                                    break;
122                                }
123                                _ => break, // EOF or timeout
124                            };
125                            let response = handle_request(&state, &line).await;
126                            let mut json = serde_json::to_string(&response).unwrap_or_default();
127                            json.push('\n');
128                            if writer.write_all(json.as_bytes()).await.is_err() {
129                                break;
130                            }
131                        }
132                    });
133                }
134                Err(e) => {
135                    warn!(error = %e, "failed to accept connection");
136                }
137            }
138        }
139    }
140}
141
142async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
143    let req: RpcRequest = match serde_json::from_str(line) {
144        Ok(r) => r,
145        Err(e) => {
146            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
147        }
148    };
149
150    match req.method.as_str() {
151        "status" => {
152            let s = *state.status_rx.borrow();
153            let info = StatusInfo {
154                validator_id: state.validator_id,
155                current_view: s.current_view,
156                last_committed_height: s.last_committed_height,
157                epoch: s.epoch_number,
158                validator_count: s.validator_count,
159                mempool_size: state.mempool.size().await,
160            };
161            json_ok(req.id, &info)
162        }
163
164        "submit_tx" => {
165            let Some(tx_hex) = req.params.as_str() else {
166                return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
167            };
168            if tx_hex.is_empty() {
169                return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
170            }
171            let tx_bytes = match hex_decode(tx_hex) {
172                Some(b) if !b.is_empty() => b,
173                _ => {
174                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
175                }
176            };
177            // Validate via Application if available
178            if let Some(ref app) = state.app
179                && !app.validate_tx(&tx_bytes, None)
180            {
181                return RpcResponse::err(
182                    req.id,
183                    -32602,
184                    "transaction validation failed".to_string(),
185                );
186            }
187            let accepted = state.mempool.add_tx(tx_bytes).await;
188            json_ok(req.id, &TxResult { accepted })
189        }
190
191        "get_block" => {
192            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
193                Some(h) => h,
194                None => {
195                    return RpcResponse::err(
196                        req.id,
197                        -32602,
198                        "missing or invalid 'height' parameter".to_string(),
199                    );
200                }
201            };
202            let store = state.store.read().await;
203            match store.get_block_by_height(Height(height)) {
204                Some(block) => json_ok(req.id, &block_to_info(&block)),
205                None => RpcResponse::err(
206                    req.id,
207                    -32602,
208                    format!("block at height {height} not found"),
209                ),
210            }
211        }
212
213        "get_block_by_hash" => {
214            let hash_hex = req.params.as_str().unwrap_or_default();
215            match hex_to_block_hash(hash_hex) {
216                Some(hash) => {
217                    let store = state.store.read().await;
218                    match store.get_block(&hash) {
219                        Some(block) => json_ok(req.id, &block_to_info(&block)),
220                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
221                    }
222                }
223                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
224            }
225        }
226
227        "get_validators" => {
228            let validators = state.validator_set_rx.borrow().clone();
229            json_ok(req.id, &validators)
230        }
231
232        "get_epoch" => {
233            let s = *state.status_rx.borrow();
234            let info = EpochInfo {
235                number: s.epoch_number,
236                start_view: s.epoch_start_view,
237                validator_count: s.validator_count,
238            };
239            json_ok(req.id, &info)
240        }
241
242        "get_peers" => {
243            let peers = state.peer_info_rx.borrow().clone();
244            json_ok(req.id, &peers)
245        }
246
247        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
248    }
249}
250
251fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
252    match serde_json::to_value(val) {
253        Ok(v) => RpcResponse::ok(id, v),
254        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
255    }
256}
257
258fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
259    BlockInfo {
260        height: block.height.as_u64(),
261        hash: hex_encode(&block.hash.0),
262        parent_hash: hex_encode(&block.parent_hash.0),
263        view: block.view.as_u64(),
264        proposer: block.proposer.0,
265        payload_size: block.payload.len(),
266    }
267}
268
269fn hex_encode(bytes: &[u8]) -> String {
270    bytes.iter().map(|b| format!("{b:02x}")).collect()
271}
272
273fn hex_decode(s: &str) -> Option<Vec<u8>> {
274    if !s.len().is_multiple_of(2) {
275        return None;
276    }
277    (0..s.len())
278        .step_by(2)
279        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
280        .collect()
281}
282
283fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
284    let bytes = hex_decode(s)?;
285    if bytes.len() != 32 {
286        return None;
287    }
288    let mut arr = [0u8; 32];
289    arr.copy_from_slice(&bytes);
290    Some(BlockHash(arr))
291}
292
293/// Read a line from `reader`, failing fast if it exceeds `max_bytes`.
294///
295/// Uses `fill_buf` + incremental scanning so memory allocation is bounded.
296/// Returns `Ok(None)` on EOF, `Ok(Some(line))` on success, or an error
297/// if the line exceeds the limit.
298async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
299    reader: &mut R,
300    max_bytes: usize,
301) -> io::Result<Option<String>> {
302    let mut buf = Vec::new();
303    loop {
304        let available = reader.fill_buf().await?;
305        if available.is_empty() {
306            return if buf.is_empty() {
307                Ok(None)
308            } else {
309                Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
310            };
311        }
312        if let Some(pos) = available.iter().position(|&b| b == b'\n') {
313            buf.extend_from_slice(&available[..pos]);
314            reader.consume(pos + 1);
315            return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
316        }
317        let to_consume = available.len();
318        buf.extend_from_slice(available);
319        reader.consume(to_consume);
320        if buf.len() > max_bytes {
321            return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
322        }
323    }
324}