1use ruc::*;
2
3use std::io;
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use tokio::sync::RwLock;
8
9use crate::types::{
10 BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
11};
12use hotmint_consensus::application::Application;
13use hotmint_consensus::store::BlockStore;
14use hotmint_mempool::Mempool;
15use hotmint_network::service::PeerStatus;
16use hotmint_types::{BlockHash, Height};
17use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
18use tokio::net::TcpListener;
19use tokio::sync::{Semaphore, watch};
20use tokio::time::{Duration, timeout};
21use tracing::{info, warn};
22
23const MAX_RPC_CONNECTIONS: usize = 256;
24const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
25const MAX_LINE_BYTES: usize = 1_048_576;
27
28#[derive(Debug, Clone, Copy)]
30pub struct ConsensusStatus {
31 pub current_view: u64,
32 pub last_committed_height: u64,
33 pub epoch_number: u64,
34 pub validator_count: usize,
35 pub epoch_start_view: u64,
36}
37
38impl ConsensusStatus {
39 pub fn new(
40 current_view: u64,
41 last_committed_height: u64,
42 epoch_number: u64,
43 validator_count: usize,
44 epoch_start_view: u64,
45 ) -> Self {
46 Self {
47 current_view,
48 last_committed_height,
49 epoch_number,
50 validator_count,
51 epoch_start_view,
52 }
53 }
54}
55
56pub struct RpcState {
58 pub validator_id: u64,
59 pub mempool: Arc<Mempool>,
60 pub status_rx: watch::Receiver<ConsensusStatus>,
61 pub store: Arc<RwLock<Box<dyn BlockStore>>>,
63 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
65 pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
67 pub app: Option<Arc<dyn Application>>,
69}
70
71pub struct RpcServer {
73 state: Arc<RpcState>,
74 listener: TcpListener,
75}
76
77impl RpcServer {
78 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
79 let listener = TcpListener::bind(addr)
80 .await
81 .c(d!("failed to bind RPC server"))?;
82 info!(addr = addr, "RPC server listening");
83 Ok(Self {
84 state: Arc::new(state),
85 listener,
86 })
87 }
88
89 pub fn local_addr(&self) -> SocketAddr {
90 self.listener.local_addr().expect("listener has local addr")
91 }
92
93 pub async fn run(self) {
94 let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
95 loop {
96 match self.listener.accept().await {
97 Ok((stream, _addr)) => {
98 let permit = match semaphore.clone().try_acquire_owned() {
99 Ok(p) => p,
100 Err(_) => {
101 warn!("RPC connection limit reached, rejecting");
102 drop(stream);
103 continue;
104 }
105 };
106 let state = self.state.clone();
107 tokio::spawn(async move {
108 let _permit = permit;
109 let (reader, mut writer) = stream.into_split();
110 let mut reader = BufReader::with_capacity(65_536, reader);
111 loop {
112 let line = match timeout(
113 RPC_READ_TIMEOUT,
114 read_line_limited(&mut reader, MAX_LINE_BYTES),
115 )
116 .await
117 {
118 Ok(Ok(Some(line))) => line,
119 Ok(Err(e)) => {
120 warn!(error = %e, "RPC read error (line too long?)");
121 break;
122 }
123 _ => break, };
125 let response = handle_request(&state, &line).await;
126 let mut json = serde_json::to_string(&response).unwrap_or_default();
127 json.push('\n');
128 if writer.write_all(json.as_bytes()).await.is_err() {
129 break;
130 }
131 }
132 });
133 }
134 Err(e) => {
135 warn!(error = %e, "failed to accept connection");
136 }
137 }
138 }
139 }
140}
141
142async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
143 let req: RpcRequest = match serde_json::from_str(line) {
144 Ok(r) => r,
145 Err(e) => {
146 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
147 }
148 };
149
150 match req.method.as_str() {
151 "status" => {
152 let s = *state.status_rx.borrow();
153 let info = StatusInfo {
154 validator_id: state.validator_id,
155 current_view: s.current_view,
156 last_committed_height: s.last_committed_height,
157 epoch: s.epoch_number,
158 validator_count: s.validator_count,
159 mempool_size: state.mempool.size().await,
160 };
161 json_ok(req.id, &info)
162 }
163
164 "submit_tx" => {
165 let Some(tx_hex) = req.params.as_str() else {
166 return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
167 };
168 if tx_hex.is_empty() {
169 return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
170 }
171 let tx_bytes = match hex_decode(tx_hex) {
172 Some(b) if !b.is_empty() => b,
173 _ => {
174 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
175 }
176 };
177 if let Some(ref app) = state.app
179 && !app.validate_tx(&tx_bytes, None)
180 {
181 return RpcResponse::err(
182 req.id,
183 -32602,
184 "transaction validation failed".to_string(),
185 );
186 }
187 let accepted = state.mempool.add_tx(tx_bytes).await;
188 json_ok(req.id, &TxResult { accepted })
189 }
190
191 "get_block" => {
192 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
193 Some(h) => h,
194 None => {
195 return RpcResponse::err(
196 req.id,
197 -32602,
198 "missing or invalid 'height' parameter".to_string(),
199 );
200 }
201 };
202 let store = state.store.read().await;
203 match store.get_block_by_height(Height(height)) {
204 Some(block) => json_ok(req.id, &block_to_info(&block)),
205 None => RpcResponse::err(
206 req.id,
207 -32602,
208 format!("block at height {height} not found"),
209 ),
210 }
211 }
212
213 "get_block_by_hash" => {
214 let hash_hex = req.params.as_str().unwrap_or_default();
215 match hex_to_block_hash(hash_hex) {
216 Some(hash) => {
217 let store = state.store.read().await;
218 match store.get_block(&hash) {
219 Some(block) => json_ok(req.id, &block_to_info(&block)),
220 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
221 }
222 }
223 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
224 }
225 }
226
227 "get_validators" => {
228 let validators = state.validator_set_rx.borrow().clone();
229 json_ok(req.id, &validators)
230 }
231
232 "get_epoch" => {
233 let s = *state.status_rx.borrow();
234 let info = EpochInfo {
235 number: s.epoch_number,
236 start_view: s.epoch_start_view,
237 validator_count: s.validator_count,
238 };
239 json_ok(req.id, &info)
240 }
241
242 "get_peers" => {
243 let peers = state.peer_info_rx.borrow().clone();
244 json_ok(req.id, &peers)
245 }
246
247 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
248 }
249}
250
251fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
252 match serde_json::to_value(val) {
253 Ok(v) => RpcResponse::ok(id, v),
254 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
255 }
256}
257
258fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
259 BlockInfo {
260 height: block.height.as_u64(),
261 hash: hex_encode(&block.hash.0),
262 parent_hash: hex_encode(&block.parent_hash.0),
263 view: block.view.as_u64(),
264 proposer: block.proposer.0,
265 payload_size: block.payload.len(),
266 }
267}
268
269fn hex_encode(bytes: &[u8]) -> String {
270 bytes.iter().map(|b| format!("{b:02x}")).collect()
271}
272
273fn hex_decode(s: &str) -> Option<Vec<u8>> {
274 if !s.len().is_multiple_of(2) {
275 return None;
276 }
277 (0..s.len())
278 .step_by(2)
279 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
280 .collect()
281}
282
283fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
284 let bytes = hex_decode(s)?;
285 if bytes.len() != 32 {
286 return None;
287 }
288 let mut arr = [0u8; 32];
289 arr.copy_from_slice(&bytes);
290 Some(BlockHash(arr))
291}
292
293async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
299 reader: &mut R,
300 max_bytes: usize,
301) -> io::Result<Option<String>> {
302 let mut buf = Vec::new();
303 loop {
304 let available = reader.fill_buf().await?;
305 if available.is_empty() {
306 return if buf.is_empty() {
307 Ok(None)
308 } else {
309 Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
310 };
311 }
312 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
313 buf.extend_from_slice(&available[..pos]);
314 reader.consume(pos + 1);
315 return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
316 }
317 let to_consume = available.len();
318 buf.extend_from_slice(available);
319 reader.consume(to_consume);
320 if buf.len() > max_bytes {
321 return Err(io::Error::new(io::ErrorKind::InvalidData, "line too long"));
322 }
323 }
324}