1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::types::{
6 BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
7};
8use hotmint_consensus::application::Application;
9use hotmint_consensus::store::BlockStore;
10use hotmint_mempool::Mempool;
11use hotmint_network::service::PeerStatus;
12use hotmint_types::{BlockHash, Height};
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::net::TcpListener;
15use tokio::sync::{Semaphore, watch};
16use tokio::time::{Duration, timeout};
17use tracing::{info, warn};
18
19const MAX_RPC_CONNECTIONS: usize = 256;
20const RPC_READ_TIMEOUT: Duration = Duration::from_secs(30);
21const MAX_LINE_BYTES: usize = 1_048_576;
23
24#[derive(Debug, Clone, Copy)]
26pub struct ConsensusStatus {
27 pub current_view: u64,
28 pub last_committed_height: u64,
29 pub epoch_number: u64,
30 pub validator_count: usize,
31 pub epoch_start_view: u64,
32}
33
34impl ConsensusStatus {
35 pub fn new(
36 current_view: u64,
37 last_committed_height: u64,
38 epoch_number: u64,
39 validator_count: usize,
40 epoch_start_view: u64,
41 ) -> Self {
42 Self {
43 current_view,
44 last_committed_height,
45 epoch_number,
46 validator_count,
47 epoch_start_view,
48 }
49 }
50}
51
52pub struct RpcState {
54 pub validator_id: u64,
55 pub mempool: Arc<Mempool>,
56 pub status_rx: watch::Receiver<ConsensusStatus>,
57 pub store: Arc<RwLock<Box<dyn BlockStore>>>,
59 pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
61 pub validator_set_rx: watch::Receiver<Vec<ValidatorInfoResponse>>,
63 pub app: Option<Arc<dyn Application>>,
65}
66
67pub struct RpcServer {
69 state: Arc<RpcState>,
70 listener: TcpListener,
71}
72
73impl RpcServer {
74 pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
75 let listener = TcpListener::bind(addr)
76 .await
77 .c(d!("failed to bind RPC server"))?;
78 info!(addr = addr, "RPC server listening");
79 Ok(Self {
80 state: Arc::new(state),
81 listener,
82 })
83 }
84
85 pub fn local_addr(&self) -> std::net::SocketAddr {
86 self.listener.local_addr().expect("listener has local addr")
87 }
88
89 pub async fn run(self) {
90 let semaphore = Arc::new(Semaphore::new(MAX_RPC_CONNECTIONS));
91 loop {
92 match self.listener.accept().await {
93 Ok((stream, _addr)) => {
94 let permit = match semaphore.clone().try_acquire_owned() {
95 Ok(p) => p,
96 Err(_) => {
97 warn!("RPC connection limit reached, rejecting");
98 drop(stream);
99 continue;
100 }
101 };
102 let state = self.state.clone();
103 tokio::spawn(async move {
104 let _permit = permit;
105 let (reader, mut writer) = stream.into_split();
106 let mut reader = BufReader::with_capacity(65_536, reader);
107 loop {
108 let line = match timeout(
109 RPC_READ_TIMEOUT,
110 read_line_limited(&mut reader, MAX_LINE_BYTES),
111 )
112 .await
113 {
114 Ok(Ok(Some(line))) => line,
115 Ok(Err(e)) => {
116 warn!(error = %e, "RPC read error (line too long?)");
117 break;
118 }
119 _ => break, };
121 let response = handle_request(&state, &line).await;
122 let mut json = serde_json::to_string(&response).unwrap_or_default();
123 json.push('\n');
124 if writer.write_all(json.as_bytes()).await.is_err() {
125 break;
126 }
127 }
128 });
129 }
130 Err(e) => {
131 warn!(error = %e, "failed to accept connection");
132 }
133 }
134 }
135 }
136}
137
138async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
139 let req: RpcRequest = match serde_json::from_str(line) {
140 Ok(r) => r,
141 Err(e) => {
142 return RpcResponse::err(0, -32700, format!("parse error: {e}"));
143 }
144 };
145
146 match req.method.as_str() {
147 "status" => {
148 let s = *state.status_rx.borrow();
149 let info = StatusInfo {
150 validator_id: state.validator_id,
151 current_view: s.current_view,
152 last_committed_height: s.last_committed_height,
153 epoch: s.epoch_number,
154 validator_count: s.validator_count,
155 mempool_size: state.mempool.size().await,
156 };
157 json_ok(req.id, &info)
158 }
159
160 "submit_tx" => {
161 let Some(tx_hex) = req.params.as_str() else {
162 return RpcResponse::err(req.id, -32602, "params must be a hex string".to_string());
163 };
164 if tx_hex.is_empty() {
165 return RpcResponse::err(req.id, -32602, "empty transaction".to_string());
166 }
167 let tx_bytes = match hex_decode(tx_hex) {
168 Some(b) if !b.is_empty() => b,
169 _ => {
170 return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
171 }
172 };
173 if let Some(ref app) = state.app
175 && !app.validate_tx(&tx_bytes, None)
176 {
177 return RpcResponse::err(
178 req.id,
179 -32602,
180 "transaction validation failed".to_string(),
181 );
182 }
183 let accepted = state.mempool.add_tx(tx_bytes).await;
184 json_ok(req.id, &TxResult { accepted })
185 }
186
187 "get_block" => {
188 let height = match req.params.get("height").and_then(|v| v.as_u64()) {
189 Some(h) => h,
190 None => {
191 return RpcResponse::err(
192 req.id,
193 -32602,
194 "missing or invalid 'height' parameter".to_string(),
195 );
196 }
197 };
198 let store = state.store.read().unwrap();
199 match store.get_block_by_height(Height(height)) {
200 Some(block) => json_ok(req.id, &block_to_info(&block)),
201 None => RpcResponse::err(
202 req.id,
203 -32602,
204 format!("block at height {height} not found"),
205 ),
206 }
207 }
208
209 "get_block_by_hash" => {
210 let hash_hex = req.params.as_str().unwrap_or_default();
211 match hex_to_block_hash(hash_hex) {
212 Some(hash) => {
213 let store = state.store.read().unwrap();
214 match store.get_block(&hash) {
215 Some(block) => json_ok(req.id, &block_to_info(&block)),
216 None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
217 }
218 }
219 None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
220 }
221 }
222
223 "get_validators" => {
224 let validators = state.validator_set_rx.borrow().clone();
225 json_ok(req.id, &validators)
226 }
227
228 "get_epoch" => {
229 let s = *state.status_rx.borrow();
230 let info = EpochInfo {
231 number: s.epoch_number,
232 start_view: s.epoch_start_view,
233 validator_count: s.validator_count,
234 };
235 json_ok(req.id, &info)
236 }
237
238 "get_peers" => {
239 let peers = state.peer_info_rx.borrow().clone();
240 json_ok(req.id, &peers)
241 }
242
243 _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
244 }
245}
246
247fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
248 match serde_json::to_value(val) {
249 Ok(v) => RpcResponse::ok(id, v),
250 Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
251 }
252}
253
254fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
255 BlockInfo {
256 height: block.height.as_u64(),
257 hash: hex_encode(&block.hash.0),
258 parent_hash: hex_encode(&block.parent_hash.0),
259 view: block.view.as_u64(),
260 proposer: block.proposer.0,
261 payload_size: block.payload.len(),
262 }
263}
264
265fn hex_encode(bytes: &[u8]) -> String {
266 bytes.iter().map(|b| format!("{b:02x}")).collect()
267}
268
269fn hex_decode(s: &str) -> Option<Vec<u8>> {
270 if !s.len().is_multiple_of(2) {
271 return None;
272 }
273 (0..s.len())
274 .step_by(2)
275 .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
276 .collect()
277}
278
279fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
280 let bytes = hex_decode(s)?;
281 if bytes.len() != 32 {
282 return None;
283 }
284 let mut arr = [0u8; 32];
285 arr.copy_from_slice(&bytes);
286 Some(BlockHash(arr))
287}
288
289async fn read_line_limited<R: AsyncBufReadExt + Unpin>(
295 reader: &mut R,
296 max_bytes: usize,
297) -> std::io::Result<Option<String>> {
298 let mut buf = Vec::new();
299 loop {
300 let available = reader.fill_buf().await?;
301 if available.is_empty() {
302 return if buf.is_empty() {
303 Ok(None)
304 } else {
305 Ok(Some(String::from_utf8_lossy(&buf).into_owned()))
306 };
307 }
308 if let Some(pos) = available.iter().position(|&b| b == b'\n') {
309 buf.extend_from_slice(&available[..pos]);
310 reader.consume(pos + 1);
311 return Ok(Some(String::from_utf8_lossy(&buf).into_owned()));
312 }
313 let to_consume = available.len();
314 buf.extend_from_slice(available);
315 reader.consume(to_consume);
316 if buf.len() > max_bytes {
317 return Err(std::io::Error::new(
318 std::io::ErrorKind::InvalidData,
319 "line too long",
320 ));
321 }
322 }
323}