1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::Router;
5use axum::extract::State;
6use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
7use axum::response::IntoResponse;
8use axum::routing::{get, post};
9use serde::Serialize;
10use tokio::sync::broadcast;
11use tower_http::cors::{Any, CorsLayer};
12use tracing::{info, warn};
13
14use tokio::sync::Mutex;
15
16use crate::rpc::{RpcState, TX_RATE_LIMIT_PER_SEC, TxRateLimiter, handle_request};
17use crate::types::RpcResponse;
18
19#[derive(Clone, Debug, Serialize)]
21#[serde(tag = "type")]
22pub enum ChainEvent {
23 NewBlock {
24 height: u64,
25 hash: String,
26 view: u64,
27 proposer: u64,
28 },
29 }
31
32pub struct HttpRpcState {
34 pub rpc: Arc<RpcState>,
35 pub event_tx: broadcast::Sender<ChainEvent>,
36 pub tx_limiter: Mutex<TxRateLimiter>,
38}
39
40pub struct HttpRpcServer {
42 state: Arc<HttpRpcState>,
43 addr: SocketAddr,
44}
45
46impl HttpRpcServer {
47 pub fn new(addr: SocketAddr, rpc: Arc<RpcState>, event_capacity: usize) -> Self {
51 let (event_tx, _) = broadcast::channel(event_capacity);
52 Self {
53 state: Arc::new(HttpRpcState {
54 rpc,
55 event_tx,
56 tx_limiter: Mutex::new(TxRateLimiter::new(TX_RATE_LIMIT_PER_SEC)),
57 }),
58 addr,
59 }
60 }
61
62 pub fn event_sender(&self) -> broadcast::Sender<ChainEvent> {
64 self.state.event_tx.clone()
65 }
66
67 pub async fn run(self) {
69 let cors = CorsLayer::new()
70 .allow_origin(Any)
71 .allow_methods(Any)
72 .allow_headers(Any);
73
74 let app = Router::new()
75 .route("/", post(json_rpc_handler))
76 .route("/ws", get(ws_upgrade_handler))
77 .layer(cors)
78 .with_state(self.state.clone());
79
80 let listener = match tokio::net::TcpListener::bind(self.addr).await {
81 Ok(l) => l,
82 Err(e) => {
83 warn!(addr = %self.addr, error = %e, "HTTP RPC server failed to bind");
84 return;
85 }
86 };
87
88 let local_addr = listener.local_addr().expect("listener has local addr");
89 info!(addr = %local_addr, "HTTP RPC server listening");
90
91 if let Err(e) = axum::serve(listener, app).await {
92 warn!(error = %e, "HTTP RPC server exited with error");
93 }
94 }
95}
96
97async fn json_rpc_handler(
99 State(state): State<Arc<HttpRpcState>>,
100 body: String,
101) -> impl IntoResponse {
102 let mut tx_limiter = state.tx_limiter.lock().await;
105 let response: RpcResponse = handle_request(&state.rpc, &body, &mut tx_limiter).await;
106
107 axum::Json(response)
108}
109
110async fn ws_upgrade_handler(
112 State(state): State<Arc<HttpRpcState>>,
113 ws: WebSocketUpgrade,
114) -> impl IntoResponse {
115 ws.on_upgrade(move |socket| handle_ws(socket, state))
116}
117
118async fn handle_ws(mut socket: WebSocket, state: Arc<HttpRpcState>) {
120 let mut rx = state.event_tx.subscribe();
121
122 loop {
123 tokio::select! {
124 event = rx.recv() => {
126 match event {
127 Ok(ev) => {
128 let json = match serde_json::to_string(&ev) {
129 Ok(j) => j,
130 Err(e) => {
131 warn!(error = %e, "failed to serialize chain event");
132 continue;
133 }
134 };
135 if socket.send(Message::Text(json.into())).await.is_err() {
136 break;
138 }
139 }
140 Err(broadcast::error::RecvError::Lagged(n)) => {
141 warn!(missed = n, "WebSocket client lagged, some events dropped");
142 }
143 Err(broadcast::error::RecvError::Closed) => {
144 break;
145 }
146 }
147 }
148 msg = socket.recv() => {
150 match msg {
151 Some(Ok(Message::Close(_))) | None => break,
152 Some(Err(_)) => break,
153 _ => {} }
155 }
156 }
157 }
158}