mod error;
pub mod methods;
pub mod types;
pub use error::{RpcServerError, error_codes};
use crate::{Blockchain, TxPool};
use jsonrpsee::server::{
RandomStringIdProvider, ServerBuilder, ServerHandle,
middleware::rpc::{RpcServiceBuilder, RpcServiceT},
};
use std::{net::SocketAddr, pin::Pin, sync::Arc};
use subxt::config::substrate::H256;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
struct RpcLogger<S>(S);
impl<'a, S> RpcServiceT<'a> for RpcLogger<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
type Future = Pin<Box<dyn std::future::Future<Output = jsonrpsee::MethodResponse> + Send + 'a>>;
fn call(&self, req: jsonrpsee::types::Request<'a>) -> Self::Future {
log::debug!("JSON-RPC --> {}", req.method_name());
let inner = self.0.clone();
Box::pin(async move { inner.call(req).await })
}
}
pub fn parse_block_hash(hex: &str) -> Result<H256, RpcServerError> {
let bytes = hex::decode(hex.trim_start_matches("0x"))
.map_err(|e| RpcServerError::InvalidParam(format!("Invalid hex hash: {e}")))?;
if bytes.len() != 32 {
return Err(RpcServerError::Internal("Invalid block hash length.".to_string()));
}
Ok(H256::from_slice(&bytes))
}
pub fn parse_hex_bytes(hex: &str, field_name: &str) -> Result<Vec<u8>, RpcServerError> {
hex::decode(hex.trim_start_matches("0x"))
.map_err(|e| RpcServerError::InvalidParam(format!("Invalid hex {field_name}: {e}")))
}
pub const DEFAULT_RPC_PORT: u16 = 9944;
const MAX_PORT_ATTEMPTS: u16 = 20;
#[derive(Debug, Clone)]
pub struct RpcServerConfig {
pub port: Option<u16>,
pub max_connections: u32,
}
impl Default for RpcServerConfig {
fn default() -> Self {
Self { port: None, max_connections: 100 }
}
}
impl RpcServerConfig {
pub fn with_port(port: u16) -> Self {
Self { port: Some(port), ..Default::default() }
}
}
const SUBSCRIPTION_ID_LENGTH: usize = 16;
pub struct ForkRpcServer {
handle: ServerHandle,
addr: SocketAddr,
shutdown_token: CancellationToken,
}
impl ForkRpcServer {
pub async fn start(
blockchain: Arc<Blockchain>,
txpool: Arc<TxPool>,
config: RpcServerConfig,
) -> Result<Self, RpcServerError> {
let shutdown_token = CancellationToken::new();
let rpc_module = methods::create_rpc_module(blockchain, txpool, shutdown_token.clone())?;
let (server, addr) = if let Some(port) = config.port {
let addr: SocketAddr = ([127, 0, 0, 1], port).into();
let server = ServerBuilder::default()
.set_id_provider(RandomStringIdProvider::new(SUBSCRIPTION_ID_LENGTH))
.set_rpc_middleware(RpcServiceBuilder::new().layer_fn(RpcLogger))
.max_connections(config.max_connections)
.max_request_body_size(u32::MAX)
.max_response_body_size(u32::MAX)
.build(addr)
.await
.map_err(|e| RpcServerError::ServerStart(e.to_string()))?;
let addr =
server.local_addr().map_err(|e| RpcServerError::ServerStart(e.to_string()))?;
(server, addr)
} else {
let mut found = None;
for port in DEFAULT_RPC_PORT..DEFAULT_RPC_PORT.saturating_add(MAX_PORT_ATTEMPTS) {
let addr: SocketAddr = ([127, 0, 0, 1], port).into();
if let Ok(server) = ServerBuilder::default()
.set_id_provider(RandomStringIdProvider::new(SUBSCRIPTION_ID_LENGTH))
.set_rpc_middleware(RpcServiceBuilder::new().layer_fn(RpcLogger))
.max_connections(config.max_connections)
.max_request_body_size(u32::MAX)
.max_response_body_size(u32::MAX)
.build(addr)
.await
{
let bound_addr = server
.local_addr()
.map_err(|e| RpcServerError::ServerStart(e.to_string()))?;
found = Some((server, bound_addr));
break;
}
}
match found {
Some(result) => result,
None => {
let port = pop_common::resolve_port(None);
let addr: SocketAddr = ([127, 0, 0, 1], port).into();
let server = ServerBuilder::default()
.set_id_provider(RandomStringIdProvider::new(SUBSCRIPTION_ID_LENGTH))
.set_rpc_middleware(RpcServiceBuilder::new().layer_fn(RpcLogger))
.max_connections(config.max_connections)
.max_request_body_size(u32::MAX)
.max_response_body_size(u32::MAX)
.build(addr)
.await
.map_err(|e| RpcServerError::ServerStart(e.to_string()))?;
let bound_addr = server
.local_addr()
.map_err(|e| RpcServerError::ServerStart(e.to_string()))?;
(server, bound_addr)
},
}
};
let handle = server.start(rpc_module);
Ok(Self { handle, addr, shutdown_token })
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub fn ws_url(&self) -> String {
format!("ws://{}", self.addr)
}
pub fn http_url(&self) -> String {
format!("http://{}", self.addr)
}
pub async fn stop(self) {
self.shutdown_token.cancel();
self.handle.stop().expect("Server stop should not fail");
self.handle.stopped().await;
}
pub fn handle(&self) -> &ServerHandle {
&self.handle
}
}