Skip to main content

hotmint_api/
rpc.rs

1use ruc::*;
2
3use std::sync::{Arc, RwLock};
4
5use crate::types::{
6    BlockInfo, EpochInfo, RpcRequest, RpcResponse, StatusInfo, TxResult, ValidatorInfoResponse,
7};
8use hotmint_consensus::store::BlockStore;
9use hotmint_mempool::Mempool;
10use hotmint_network::service::PeerStatus;
11use hotmint_types::{BlockHash, Height};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
13use tokio::net::TcpListener;
14use tokio::sync::watch;
15use tracing::{info, warn};
16
17/// Shared state accessible by the RPC server
18pub struct RpcState {
19    pub validator_id: u64,
20    pub mempool: Arc<Mempool>,
21    /// (current_view, last_committed_height, epoch, validator_count)
22    pub status_rx: watch::Receiver<(u64, u64, u64, usize)>,
23    /// Shared block store for block queries
24    pub store: Arc<RwLock<Box<dyn BlockStore>>>,
25    /// Peer info channel
26    pub peer_info_rx: watch::Receiver<Vec<PeerStatus>>,
27}
28
29/// Simple JSON-RPC server over TCP (one JSON object per line)
30pub struct RpcServer {
31    state: Arc<RpcState>,
32    listener: TcpListener,
33}
34
35impl RpcServer {
36    pub async fn bind(addr: &str, state: RpcState) -> Result<Self> {
37        let listener = TcpListener::bind(addr)
38            .await
39            .c(d!("failed to bind RPC server"))?;
40        info!(addr = addr, "RPC server listening");
41        Ok(Self {
42            state: Arc::new(state),
43            listener,
44        })
45    }
46
47    pub fn local_addr(&self) -> std::net::SocketAddr {
48        self.listener.local_addr().expect("listener has local addr")
49    }
50
51    pub async fn run(self) {
52        loop {
53            match self.listener.accept().await {
54                Ok((stream, _addr)) => {
55                    let state = self.state.clone();
56                    tokio::spawn(async move {
57                        let (reader, mut writer) = stream.into_split();
58                        let mut lines = BufReader::new(reader).lines();
59                        while let Ok(Some(line)) = lines.next_line().await {
60                            let response = handle_request(&state, &line).await;
61                            let mut json = serde_json::to_string(&response).unwrap_or_default();
62                            json.push('\n');
63                            if writer.write_all(json.as_bytes()).await.is_err() {
64                                break;
65                            }
66                        }
67                    });
68                }
69                Err(e) => {
70                    warn!(error = %e, "failed to accept connection");
71                }
72            }
73        }
74    }
75}
76
77async fn handle_request(state: &RpcState, line: &str) -> RpcResponse {
78    let req: RpcRequest = match serde_json::from_str(line) {
79        Ok(r) => r,
80        Err(e) => {
81            return RpcResponse::err(0, -32700, format!("parse error: {e}"));
82        }
83    };
84
85    match req.method.as_str() {
86        "status" => {
87            let (view, height, epoch, validator_count) = *state.status_rx.borrow();
88            let info = StatusInfo {
89                validator_id: state.validator_id,
90                current_view: view,
91                last_committed_height: height,
92                epoch,
93                validator_count,
94                mempool_size: state.mempool.size().await,
95            };
96            json_ok(req.id, &info)
97        }
98
99        "submit_tx" => {
100            let tx_hex = req.params.as_str().unwrap_or_default();
101            let tx_bytes = match hex_decode(tx_hex) {
102                Some(b) => b,
103                None => {
104                    return RpcResponse::err(req.id, -32602, "invalid hex".to_string());
105                }
106            };
107            let accepted = state.mempool.add_tx(tx_bytes).await;
108            json_ok(req.id, &TxResult { accepted })
109        }
110
111        "get_block" => {
112            let height = match req.params.get("height").and_then(|v| v.as_u64()) {
113                Some(h) => h,
114                None => {
115                    return RpcResponse::err(
116                        req.id,
117                        -32602,
118                        "missing or invalid 'height' parameter".to_string(),
119                    );
120                }
121            };
122            let store = state.store.read().unwrap();
123            match store.get_block_by_height(Height(height)) {
124                Some(block) => json_ok(req.id, &block_to_info(&block)),
125                None => RpcResponse::err(
126                    req.id,
127                    -32602,
128                    format!("block at height {height} not found"),
129                ),
130            }
131        }
132
133        "get_block_by_hash" => {
134            let hash_hex = req.params.as_str().unwrap_or_default();
135            match hex_to_block_hash(hash_hex) {
136                Some(hash) => {
137                    let store = state.store.read().unwrap();
138                    match store.get_block(&hash) {
139                        Some(block) => json_ok(req.id, &block_to_info(&block)),
140                        None => RpcResponse::err(req.id, -32602, "block not found".to_string()),
141                    }
142                }
143                None => RpcResponse::err(req.id, -32602, "invalid hash hex".to_string()),
144            }
145        }
146
147        "get_validators" => {
148            let (_, _, _, _) = *state.status_rx.borrow();
149            // Read validator set from the store's genesis or from status
150            // For now, return from the peer info (validators are the ones we know about)
151            let store = state.store.read().unwrap();
152            // Get the latest committed block to find proposer info
153            let tip = store.tip_height();
154            drop(store);
155
156            // We don't have direct access to ValidatorSet from RPC.
157            // Return what we know: the peer list as validator info.
158            // A more complete implementation would pass ValidatorSet via watch channel.
159            let peers = state.peer_info_rx.borrow().clone();
160            let validators: Vec<ValidatorInfoResponse> = peers
161                .iter()
162                .map(|p| ValidatorInfoResponse {
163                    id: p.validator_id.0,
164                    power: 0, // not available from peer info
165                    public_key: String::new(),
166                })
167                .collect();
168
169            // If no peers, at minimum return our own validator
170            let result = if validators.is_empty() {
171                vec![ValidatorInfoResponse {
172                    id: state.validator_id,
173                    power: 0,
174                    public_key: String::new(),
175                }]
176            } else {
177                validators
178            };
179            let _ = tip; // silence unused warning
180            json_ok(req.id, &result)
181        }
182
183        "get_epoch" => {
184            let (_, _, epoch, validator_count) = *state.status_rx.borrow();
185            let info = EpochInfo {
186                number: epoch,
187                start_view: 0, // not available from status channel
188                validator_count,
189            };
190            json_ok(req.id, &info)
191        }
192
193        "get_peers" => {
194            let peers = state.peer_info_rx.borrow().clone();
195            json_ok(req.id, &peers)
196        }
197
198        _ => RpcResponse::err(req.id, -32601, format!("unknown method: {}", req.method)),
199    }
200}
201
202fn json_ok<T: serde::Serialize>(id: u64, val: &T) -> RpcResponse {
203    match serde_json::to_value(val) {
204        Ok(v) => RpcResponse::ok(id, v),
205        Err(e) => RpcResponse::err(id, -32603, format!("serialization error: {e}")),
206    }
207}
208
209fn block_to_info(block: &hotmint_types::Block) -> BlockInfo {
210    BlockInfo {
211        height: block.height.as_u64(),
212        hash: hex_encode(&block.hash.0),
213        parent_hash: hex_encode(&block.parent_hash.0),
214        view: block.view.as_u64(),
215        proposer: block.proposer.0,
216        payload_size: block.payload.len(),
217    }
218}
219
220fn hex_encode(bytes: &[u8]) -> String {
221    bytes.iter().map(|b| format!("{b:02x}")).collect()
222}
223
224fn hex_decode(s: &str) -> Option<Vec<u8>> {
225    if !s.len().is_multiple_of(2) {
226        return None;
227    }
228    (0..s.len())
229        .step_by(2)
230        .map(|i| u8::from_str_radix(&s[i..i + 2], 16).ok())
231        .collect()
232}
233
234fn hex_to_block_hash(s: &str) -> Option<BlockHash> {
235    let bytes = hex_decode(s)?;
236    if bytes.len() != 32 {
237        return None;
238    }
239    let mut arr = [0u8; 32];
240    arr.copy_from_slice(&bytes);
241    Some(BlockHash(arr))
242}