Skip to main content

hotmint_api/
http_rpc.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::Router;
5use axum::extract::State;
6use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
7use axum::response::IntoResponse;
8use axum::routing::{get, post};
9use serde::Serialize;
10use tokio::sync::broadcast;
11use tower_http::cors::{Any, CorsLayer};
12use tracing::{info, warn};
13
14use tokio::sync::Mutex;
15
16use crate::rpc::{RpcState, TX_RATE_LIMIT_PER_SEC, TxRateLimiter, handle_request};
17use crate::types::RpcResponse;
18
19/// Events broadcast to WebSocket subscribers.
20#[derive(Clone, Debug, Serialize)]
21#[serde(tag = "type")]
22pub enum ChainEvent {
23    NewBlock {
24        height: u64,
25        hash: String,
26        view: u64,
27        proposer: u64,
28    },
29    // Future: TxCommitted, EpochChange, etc.
30}
31
32/// Shared state for the HTTP RPC server.
33pub struct HttpRpcState {
34    pub rpc: Arc<RpcState>,
35    pub event_tx: broadcast::Sender<ChainEvent>,
36    /// Global rate limiter for submit_tx across all HTTP requests (H-10).
37    pub tx_limiter: Mutex<TxRateLimiter>,
38}
39
40/// HTTP JSON-RPC server (runs alongside the existing TCP RPC server).
41pub struct HttpRpcServer {
42    state: Arc<HttpRpcState>,
43    addr: SocketAddr,
44}
45
46impl HttpRpcServer {
47    /// Create a new HTTP RPC server.
48    ///
49    /// `event_capacity` controls the broadcast channel buffer size for WebSocket events.
50    pub fn new(addr: SocketAddr, rpc: Arc<RpcState>, event_capacity: usize) -> Self {
51        let (event_tx, _) = broadcast::channel(event_capacity);
52        Self {
53            state: Arc::new(HttpRpcState {
54                rpc,
55                event_tx,
56                tx_limiter: Mutex::new(TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC)),
57            }),
58            addr,
59        }
60    }
61
62    /// Get a `broadcast::Sender` so the node can publish chain events.
63    pub fn event_sender(&self) -> broadcast::Sender<ChainEvent> {
64        self.state.event_tx.clone()
65    }
66
67    /// Run the HTTP server (blocks until shutdown).
68    pub async fn run(self) {
69        let cors = CorsLayer::new()
70            .allow_origin(Any)
71            .allow_methods(Any)
72            .allow_headers(Any);
73
74        let app = Router::new()
75            .route("/", post(json_rpc_handler))
76            .route("/ws", get(ws_upgrade_handler))
77            .layer(cors)
78            .with_state(self.state.clone());
79
80        let listener = match tokio::net::TcpListener::bind(self.addr).await {
81            Ok(l) => l,
82            Err(e) => {
83                warn!(addr = %self.addr, error = %e, "HTTP RPC server failed to bind");
84                return;
85            }
86        };
87
88        let local_addr = listener.local_addr().expect("listener has local addr");
89        info!(addr = %local_addr, "HTTP RPC server listening");
90
91        if let Err(e) = axum::serve(listener, app).await {
92            warn!(error = %e, "HTTP RPC server exited with error");
93        }
94    }
95}
96
97/// POST / handler: parse JSON-RPC request body, dispatch, return JSON response.
98async fn json_rpc_handler(
99    State(state): State<Arc<HttpRpcState>>,
100    body: String,
101) -> impl IntoResponse {
102    // H-10: Use the shared rate limiter so submit_tx is globally rate-limited
103    // across all HTTP requests, not per-request (which would be useless).
104    let mut tx_limiter = state.tx_limiter.lock().await;
105    let response: RpcResponse = handle_request(&state.rpc, &body, &mut tx_limiter).await;
106
107    axum::Json(response)
108}
109
110/// GET /ws handler: upgrade to WebSocket and stream chain events.
111async fn ws_upgrade_handler(
112    State(state): State<Arc<HttpRpcState>>,
113    ws: WebSocketUpgrade,
114) -> impl IntoResponse {
115    ws.on_upgrade(move |socket| handle_ws(socket, state))
116}
117
118/// WebSocket connection handler: subscribe to chain events and forward them.
119async fn handle_ws(mut socket: WebSocket, state: Arc<HttpRpcState>) {
120    let mut rx = state.event_tx.subscribe();
121
122    loop {
123        tokio::select! {
124            // Forward broadcast events to the client
125            event = rx.recv() => {
126                match event {
127                    Ok(ev) => {
128                        let json = match serde_json::to_string(&ev) {
129                            Ok(j) => j,
130                            Err(e) => {
131                                warn!(error = %e, "failed to serialize chain event");
132                                continue;
133                            }
134                        };
135                        if socket.send(Message::Text(json.into())).await.is_err() {
136                            // Client disconnected
137                            break;
138                        }
139                    }
140                    Err(broadcast::error::RecvError::Lagged(n)) => {
141                        warn!(missed = n, "WebSocket client lagged, some events dropped");
142                    }
143                    Err(broadcast::error::RecvError::Closed) => {
144                        break;
145                    }
146                }
147            }
148            // Listen for client messages (e.g. close frames, pings)
149            msg = socket.recv() => {
150                match msg {
151                    Some(Ok(Message::Close(_))) | None => break,
152                    Some(Err(_)) => break,
153                    _ => {} // ignore text/binary from client for now
154                }
155            }
156        }
157    }
158}