use anyhow::Result;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use serde_json::{json, Value};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing::{debug, error, info, warn, Span};
use uuid::Uuid;
#[cfg(feature = "miniscript")]
use super::miniscript::miniscript_rpc;
#[cfg(feature = "bip70-http")]
use super::payment;
use super::{auth, blockchain, control, errors, mempool, mining, network, rawtx};
use crate::module::rpc::handler::ModuleRpcHandler;
use crate::network::dos_protection::ConnectionRateLimiter;
use crate::node::metrics::MetricsCollector;
use crate::utils::{RPC_CLIENT_READ_TIMEOUT, RPC_SERVER_STARTUP_WAIT};
use std::collections::HashMap;
pub const DEFAULT_MAX_REQUEST_SIZE: usize = 1_048_576;
#[derive(Clone)]
pub struct RpcServer {
addr: SocketAddr,
blockchain: Arc<blockchain::BlockchainRpc>,
network: Arc<network::NetworkRpc>,
mempool: Arc<mempool::MempoolRpc>,
mining: Arc<mining::MiningRpc>,
rawtx: Arc<rawtx::RawTxRpc>,
control: Arc<control::ControlRpc>,
#[cfg(feature = "bip70-http")]
payment: Option<Arc<payment::PaymentRpc>>,
auth_manager: Option<Arc<auth::RpcAuthManager>>,
metrics: Option<Arc<MetricsCollector>>,
module_endpoints: Arc<tokio::sync::RwLock<HashMap<String, Arc<dyn ModuleRpcHandler>>>>,
max_request_size: usize,
connection_limiter: Option<Arc<tokio::sync::Mutex<ConnectionRateLimiter>>>,
batch_rate_multiplier_cap: u32,
}
impl RpcServer {
fn default_handlers() -> (
Arc<blockchain::BlockchainRpc>,
Arc<network::NetworkRpc>,
Arc<mempool::MempoolRpc>,
Arc<mining::MiningRpc>,
Arc<rawtx::RawTxRpc>,
Arc<control::ControlRpc>,
) {
(
Arc::new(blockchain::BlockchainRpc::new()),
Arc::new(network::NetworkRpc::new()),
Arc::new(mempool::MempoolRpc::new()),
Arc::new(mining::MiningRpc::new()),
Arc::new(rawtx::RawTxRpc::new()),
Arc::new(control::ControlRpc::new()),
)
}
fn from_components(
addr: SocketAddr,
blockchain: Arc<blockchain::BlockchainRpc>,
network: Arc<network::NetworkRpc>,
mempool: Arc<mempool::MempoolRpc>,
mining: Arc<mining::MiningRpc>,
rawtx: Arc<rawtx::RawTxRpc>,
control: Arc<control::ControlRpc>,
auth_manager: Option<Arc<auth::RpcAuthManager>>,
metrics: Option<Arc<MetricsCollector>>,
connection_limiter: Option<Arc<tokio::sync::Mutex<ConnectionRateLimiter>>>,
max_request_size: usize,
batch_rate_multiplier_cap: u32,
) -> Self {
Self {
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
#[cfg(feature = "bip70-http")]
payment: None,
auth_manager,
metrics,
module_endpoints: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
max_request_size,
connection_limiter,
batch_rate_multiplier_cap,
}
}
pub fn new(addr: SocketAddr) -> Self {
let (blockchain, network, mempool, mining, rawtx, control) = Self::default_handlers();
Self::from_components(
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
None,
None,
None,
DEFAULT_MAX_REQUEST_SIZE,
10,
)
}
pub fn with_auth(addr: SocketAddr, auth_manager: Arc<auth::RpcAuthManager>) -> Self {
let (blockchain, network, mempool, mining, rawtx, control) = Self::default_handlers();
Self::from_components(
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
Some(auth_manager),
None,
None,
DEFAULT_MAX_REQUEST_SIZE,
10,
)
}
pub fn with_connection_limiter(
mut self,
limiter: Arc<tokio::sync::Mutex<ConnectionRateLimiter>>,
) -> Self {
self.connection_limiter = Some(limiter);
self
}
pub fn with_batch_rate_multiplier_cap(mut self, cap: u32) -> Self {
self.batch_rate_multiplier_cap = cap;
self
}
pub fn with_dependencies(
addr: SocketAddr,
blockchain: Arc<blockchain::BlockchainRpc>,
network: Arc<network::NetworkRpc>,
mempool: Arc<mempool::MempoolRpc>,
mining: Arc<mining::MiningRpc>,
rawtx: Arc<rawtx::RawTxRpc>,
control: Arc<control::ControlRpc>,
max_request_size: usize,
) -> Self {
Self::from_components(
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
None,
None,
None,
max_request_size,
10,
)
}
#[cfg(feature = "bip70-http")]
pub fn with_payment(mut self, payment: Arc<payment::PaymentRpc>) -> Self {
self.payment = Some(payment);
self
}
pub fn with_dependencies_and_metrics(
addr: SocketAddr,
blockchain: Arc<blockchain::BlockchainRpc>,
network: Arc<network::NetworkRpc>,
mempool: Arc<mempool::MempoolRpc>,
mining: Arc<mining::MiningRpc>,
rawtx: Arc<rawtx::RawTxRpc>,
control: Arc<control::ControlRpc>,
metrics: Arc<MetricsCollector>,
max_request_size: usize,
) -> Self {
Self::from_components(
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
None,
Some(metrics),
None,
max_request_size,
10,
)
}
pub fn with_dependencies_and_auth(
addr: SocketAddr,
blockchain: Arc<blockchain::BlockchainRpc>,
network: Arc<network::NetworkRpc>,
mempool: Arc<mempool::MempoolRpc>,
mining: Arc<mining::MiningRpc>,
rawtx: Arc<rawtx::RawTxRpc>,
control: Arc<control::ControlRpc>,
auth_manager: Arc<auth::RpcAuthManager>,
max_request_size: usize,
) -> Self {
Self::from_components(
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
Some(auth_manager),
None,
None,
max_request_size,
10,
)
}
pub fn with_dependencies_auth_and_metrics(
addr: SocketAddr,
blockchain: Arc<blockchain::BlockchainRpc>,
network: Arc<network::NetworkRpc>,
mempool: Arc<mempool::MempoolRpc>,
mining: Arc<mining::MiningRpc>,
rawtx: Arc<rawtx::RawTxRpc>,
control: Arc<control::ControlRpc>,
auth_manager: Arc<auth::RpcAuthManager>,
metrics: Arc<MetricsCollector>,
max_request_size: usize,
) -> Self {
Self::from_components(
addr,
blockchain,
network,
mempool,
mining,
rawtx,
control,
Some(auth_manager),
Some(metrics),
None,
max_request_size,
10,
)
}
pub async fn start(&self) -> Result<()> {
let listener = TcpListener::bind(self.addr).await?;
info!("RPC server listening on {}", self.addr);
let server = Arc::new(RpcServer {
addr: self.addr,
blockchain: Arc::clone(&self.blockchain),
network: Arc::clone(&self.network),
mempool: Arc::clone(&self.mempool),
mining: Arc::clone(&self.mining),
rawtx: Arc::clone(&self.rawtx),
control: Arc::clone(&self.control),
#[cfg(feature = "bip70-http")]
payment: self.payment.clone(),
auth_manager: self.auth_manager.clone(),
metrics: self.metrics.clone(),
module_endpoints: Arc::clone(&self.module_endpoints),
max_request_size: self.max_request_size,
connection_limiter: self.connection_limiter.clone(),
batch_rate_multiplier_cap: self.batch_rate_multiplier_cap,
});
loop {
match listener.accept().await {
Ok((stream, addr)) => {
if let Some(ref limiter) = server.connection_limiter {
let mut guard = limiter.lock().await;
if !guard.check_connection(addr.ip()) {
debug!("RPC connection rejected (rate limit) from {}", addr);
drop(stream);
continue;
}
}
debug!("New RPC connection from {}", addr);
let peer_addr = addr;
let server = Arc::clone(&server);
let server_for_spawn = Arc::clone(&server);
let peer_addr_for_spawn = peer_addr;
tokio::spawn(async move {
let io = TokioIo::new(stream);
let server_clone = Arc::clone(&server_for_spawn);
let peer_addr_clone = peer_addr_for_spawn;
let service = service_fn({
let server_for_service = Arc::clone(&server_clone);
let addr_for_service = peer_addr_clone;
move |req| {
let server_inner = Arc::clone(&server_for_service);
let addr_inner = addr_for_service;
Self::handle_http_request_with_server(server_inner, req, addr_inner)
}
});
if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
debug!(
"HTTP connection failed from {} (might be raw TCP): {}",
peer_addr_clone, e
);
}
});
}
Err(e) => {
error!("Failed to accept RPC connection: {}", e);
}
}
}
}
async fn handle_http_request_with_server(
server: Arc<Self>,
req: Request<Incoming>,
addr: SocketAddr,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let headers = req.headers().clone();
if req.method() == Method::GET {
let path = req.uri().path();
if let Some(ref auth_manager) = server.auth_manager {
if path == "/metrics" {
let auth_result = auth_manager.authenticate_request(&headers, addr).await;
if let Some(error) = &auth_result.error {
warn!(
"Metrics endpoint authentication failed from {}: {}",
addr, error
);
return Ok(Self::http_error_response(
StatusCode::UNAUTHORIZED,
"Authentication required for metrics endpoint",
));
}
if let Some(ref user_id) = auth_result.user_id {
if !auth_manager
.check_rate_limit_with_endpoint(user_id, Some(addr), Some("/metrics"))
.await
{
return Ok(Self::http_error_response(
StatusCode::TOO_MANY_REQUESTS,
"Rate limit exceeded",
));
}
} else if !auth_manager
.check_ip_rate_limit_with_endpoint(addr, Some("/metrics"))
.await
{
return Ok(Self::http_error_response(
StatusCode::TOO_MANY_REQUESTS,
"IP rate limit exceeded",
));
}
} else if path.starts_with("/health") {
let auth_result = auth_manager.authenticate_request(&headers, addr).await;
if auth_result.user_id.is_none() {
if !auth_manager
.check_ip_rate_limit_with_endpoint(addr, Some(path))
.await
{
return Ok(Self::http_error_response(
StatusCode::TOO_MANY_REQUESTS,
"IP rate limit exceeded",
));
}
}
}
}
if path == "/metrics" {
return Self::handle_metrics_endpoint(server).await;
}
return Self::handle_health_endpoint(server, req).await;
}
if req.method() != Method::POST {
return Ok(Self::http_error_response(
StatusCode::METHOD_NOT_ALLOWED,
"Only POST method is supported for JSON-RPC",
));
}
let headers = req.headers().clone();
if let Some(content_type) = headers.get("content-type") {
if content_type != "application/json" {
warn!("Invalid Content-Type from {}: {:?}", addr, content_type);
}
}
let body = req.collect().await?;
let body_bytes = body.to_bytes();
if body_bytes.len() > server.max_request_size {
return Ok(Self::http_error_response(
StatusCode::PAYLOAD_TOO_LARGE,
&format!(
"Request body too large: {} bytes (max: {} bytes)",
body_bytes.len(),
server.max_request_size
),
));
}
let json_body = match std::str::from_utf8(&body_bytes) {
Ok(s) => s.to_string(),
Err(e) => {
return Ok(Self::http_error_response(
StatusCode::BAD_REQUEST,
&format!("Invalid UTF-8 in request body: {e}"),
));
}
};
let request_id = Uuid::new_v4().to_string();
let request_id_short = request_id.chars().take(8).collect::<String>();
let span = tracing::span!(
tracing::Level::DEBUG,
"rpc_request",
request_id = %request_id_short,
method = tracing::field::Empty,
client_addr = %addr,
request_size = json_body.len()
);
let _guard = span.enter();
debug!("HTTP RPC request from {}: {} bytes", addr, json_body.len());
let parsed = serde_json::from_str::<Value>(&json_body).ok();
let (method_name, rate_limit_n) = match &parsed {
Some(Value::Array(requests)) => {
let cap = server.batch_rate_multiplier_cap as usize;
let n = requests.len().min(cap) as u32;
("batch".to_string(), n)
}
Some(req) => (
req.get("method")
.and_then(|m| m.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string()),
1u32,
),
None => ("unknown".to_string(), 1u32),
};
Span::current().record("method", &method_name);
let auth_result = if let Some(ref auth_manager) = server.auth_manager {
Some(auth_manager.authenticate_request(&headers, addr).await)
} else {
None
};
if let Some(ref auth_manager) = server.auth_manager {
if let Some(ref auth_result) = auth_result {
if let Some(error) = &auth_result.error {
return Ok(Self::http_error_response(StatusCode::UNAUTHORIZED, error));
}
if let Some(ref user_id) = auth_result.user_id {
let endpoint = format!("rpc:{method_name}");
if !auth_manager
.check_rate_limit_with_endpoint_n(
user_id,
Some(addr),
Some(&endpoint),
rate_limit_n,
)
.await
{
return Ok(Self::http_error_response(
StatusCode::TOO_MANY_REQUESTS,
"User rate limit exceeded",
));
}
}
} else {
let endpoint = format!("rpc:{method_name}");
if !auth_manager
.check_ip_rate_limit_with_endpoint_n(addr, Some(&endpoint), rate_limit_n)
.await
{
return Ok(Self::http_error_response(
StatusCode::TOO_MANY_REQUESTS,
"IP rate limit exceeded",
));
}
}
if !auth_manager.check_method_rate_limit(&method_name).await {
return Ok(Self::http_error_response(
StatusCode::TOO_MANY_REQUESTS,
&format!("Method '{method_name}' rate limit exceeded"),
));
}
}
let start_time = std::time::Instant::now();
let response_json = Self::process_request_with_server(server, &json_body).await;
let duration = start_time.elapsed();
Span::current().record("duration_ms", duration.as_millis() as u64);
Span::current().record("response_size", response_json.len());
debug!(
"RPC request completed in {:?} (request_id: {})",
duration, request_id_short
);
Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.header("Content-Length", response_json.len())
.header("X-Request-ID", request_id_short)
.body(Full::new(Bytes::from(response_json)))
.expect("Failed to build HTTP response - this should never happen with valid inputs"))
}
async fn handle_metrics_endpoint(
server: Arc<Self>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let metrics_text = if let Some(ref metrics_collector) = server.metrics {
Self::format_prometheus_metrics(metrics_collector.collect())
} else {
"# No metrics available\n".to_string()
};
let mut response = Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=0.0.4")
.header("Content-Length", metrics_text.len())
.body(Full::new(Bytes::from(metrics_text)))
.expect("Failed to build metrics response");
let headers = response.headers_mut();
headers.insert("X-Content-Type-Options", "nosniff".parse().unwrap());
headers.insert(
"Cache-Control",
"no-store, no-cache, must-revalidate".parse().unwrap(),
);
Ok(response)
}
fn format_prometheus_metrics(metrics: crate::node::metrics::NodeMetrics) -> String {
let mut output = String::new();
output.push_str("# HELP blvm_network_peers_total Total number of connected peers\n");
output.push_str("# TYPE blvm_network_peers_total gauge\n");
output.push_str(&format!(
"blvm_network_peers_total {}\n",
metrics.network.peer_count
));
output.push_str("# HELP blvm_network_bytes_sent_total Total bytes sent\n");
output.push_str("# TYPE blvm_network_bytes_sent_total counter\n");
output.push_str(&format!(
"blvm_network_bytes_sent_total {}\n",
metrics.network.bytes_sent
));
output.push_str("# HELP blvm_network_bytes_received_total Total bytes received\n");
output.push_str("# TYPE blvm_network_bytes_received_total counter\n");
output.push_str(&format!(
"blvm_network_bytes_received_total {}\n",
metrics.network.bytes_received
));
output.push_str("# HELP blvm_network_messages_sent_total Total messages sent\n");
output.push_str("# TYPE blvm_network_messages_sent_total counter\n");
output.push_str(&format!(
"blvm_network_messages_sent_total {}\n",
metrics.network.messages_sent
));
output.push_str("# HELP blvm_network_messages_received_total Total messages received\n");
output.push_str("# TYPE blvm_network_messages_received_total counter\n");
output.push_str(&format!(
"blvm_network_messages_received_total {}\n",
metrics.network.messages_received
));
output.push_str("# HELP blvm_network_active_connections Active network connections\n");
output.push_str("# TYPE blvm_network_active_connections gauge\n");
output.push_str(&format!(
"blvm_network_active_connections {}\n",
metrics.network.active_connections
));
output.push_str("# HELP blvm_network_banned_peers Banned peers count\n");
output.push_str("# TYPE blvm_network_banned_peers gauge\n");
output.push_str(&format!(
"blvm_network_banned_peers {}\n",
metrics.network.banned_peers
));
output.push_str("# HELP blvm_storage_blocks_total Total blocks stored\n");
output.push_str("# TYPE blvm_storage_blocks_total gauge\n");
output.push_str(&format!(
"blvm_storage_blocks_total {}\n",
metrics.storage.block_count
));
output.push_str("# HELP blvm_storage_utxos_total Total UTXOs\n");
output.push_str("# TYPE blvm_storage_utxos_total gauge\n");
output.push_str(&format!(
"blvm_storage_utxos_total {}\n",
metrics.storage.utxo_count
));
output.push_str("# HELP blvm_storage_transactions_total Total transactions indexed\n");
output.push_str("# TYPE blvm_storage_transactions_total gauge\n");
output.push_str(&format!(
"blvm_storage_transactions_total {}\n",
metrics.storage.transaction_count
));
output.push_str("# HELP blvm_storage_disk_size_bytes Estimated disk size in bytes\n");
output.push_str("# TYPE blvm_storage_disk_size_bytes gauge\n");
output.push_str(&format!(
"blvm_storage_disk_size_bytes {}\n",
metrics.storage.disk_size
));
output.push_str("# HELP blvm_storage_within_bounds Storage bounds status (1=within bounds, 0=exceeded)\n");
output.push_str("# TYPE blvm_storage_within_bounds gauge\n");
output.push_str(&format!(
"blvm_storage_within_bounds {}\n",
if metrics.storage.within_bounds { 1 } else { 0 }
));
output.push_str("# HELP blvm_rpc_requests_total Total RPC requests\n");
output.push_str("# TYPE blvm_rpc_requests_total counter\n");
output.push_str(&format!(
"blvm_rpc_requests_total {}\n",
metrics.rpc.requests_total
));
output.push_str("# HELP blvm_rpc_requests_success_total Successful RPC requests\n");
output.push_str("# TYPE blvm_rpc_requests_success_total counter\n");
output.push_str(&format!(
"blvm_rpc_requests_success_total {}\n",
metrics.rpc.requests_success
));
output.push_str("# HELP blvm_rpc_requests_failed_total Failed RPC requests\n");
output.push_str("# TYPE blvm_rpc_requests_failed_total counter\n");
output.push_str(&format!(
"blvm_rpc_requests_failed_total {}\n",
metrics.rpc.requests_failed
));
output.push_str("# HELP blvm_rpc_requests_per_second Current RPC requests per second\n");
output.push_str("# TYPE blvm_rpc_requests_per_second gauge\n");
output.push_str(&format!(
"blvm_rpc_requests_per_second {}\n",
metrics.rpc.requests_per_second
));
output.push_str(
"# HELP blvm_rpc_avg_response_time_ms Average RPC response time in milliseconds\n",
);
output.push_str("# TYPE blvm_rpc_avg_response_time_ms gauge\n");
output.push_str(&format!(
"blvm_rpc_avg_response_time_ms {}\n",
metrics.rpc.avg_response_time_ms
));
output.push_str("# HELP blvm_performance_avg_block_processing_time_ms Average block processing time in milliseconds\n");
output.push_str("# TYPE blvm_performance_avg_block_processing_time_ms gauge\n");
output.push_str(&format!(
"blvm_performance_avg_block_processing_time_ms {}\n",
metrics.performance.avg_block_processing_time_ms
));
output.push_str("# HELP blvm_performance_avg_tx_validation_time_ms Average transaction validation time in milliseconds\n");
output.push_str("# TYPE blvm_performance_avg_tx_validation_time_ms gauge\n");
output.push_str(&format!(
"blvm_performance_avg_tx_validation_time_ms {}\n",
metrics.performance.avg_tx_validation_time_ms
));
output.push_str("# HELP blvm_performance_blocks_per_second Blocks processed per second\n");
output.push_str("# TYPE blvm_performance_blocks_per_second gauge\n");
output.push_str(&format!(
"blvm_performance_blocks_per_second {}\n",
metrics.performance.blocks_per_second
));
output.push_str(
"# HELP blvm_performance_transactions_per_second Transactions processed per second\n",
);
output.push_str("# TYPE blvm_performance_transactions_per_second gauge\n");
output.push_str(&format!(
"blvm_performance_transactions_per_second {}\n",
metrics.performance.transactions_per_second
));
output.push_str("# HELP blvm_system_uptime_seconds Node uptime in seconds\n");
output.push_str("# TYPE blvm_system_uptime_seconds gauge\n");
output.push_str(&format!(
"blvm_system_uptime_seconds {}\n",
metrics.system.uptime_seconds
));
if let Some(memory) = metrics.system.memory_usage_bytes {
output.push_str("# HELP blvm_system_memory_usage_bytes Memory usage in bytes\n");
output.push_str("# TYPE blvm_system_memory_usage_bytes gauge\n");
output.push_str(&format!("blvm_system_memory_usage_bytes {memory}\n"));
}
if let Some(cpu) = metrics.system.cpu_usage_percent {
output.push_str("# HELP blvm_system_cpu_usage_percent CPU usage percentage\n");
output.push_str("# TYPE blvm_system_cpu_usage_percent gauge\n");
output.push_str(&format!("blvm_system_cpu_usage_percent {cpu}\n"));
}
output.push_str(
"# HELP blvm_dos_connection_rate_violations_total Connection rate violations\n",
);
output.push_str("# TYPE blvm_dos_connection_rate_violations_total counter\n");
output.push_str(&format!(
"blvm_dos_connection_rate_violations_total {}\n",
metrics.network.dos_protection.connection_rate_violations
));
output.push_str("# HELP blvm_dos_auto_bans_total Auto-bans triggered\n");
output.push_str("# TYPE blvm_dos_auto_bans_total counter\n");
output.push_str(&format!(
"blvm_dos_auto_bans_total {}\n",
metrics.network.dos_protection.auto_bans
));
output
}
async fn handle_health_endpoint(
server: Arc<Self>,
req: Request<Incoming>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
let path = req.uri().path();
let health_params = json!([]);
let health_result = server.control.gethealth(&health_params).await;
let (status_code, body) = match health_result {
Ok(health_value) => {
match path {
"/health" | "/health/live" => {
let status = health_value
.get("status")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
let is_healthy = status == "healthy" || status == "degraded";
let response_body = json!({
"status": status,
"service": "blvm-node"
});
let status_code = if is_healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(
status_code,
serde_json::to_string(&response_body)
.unwrap_or_else(|_| "{}".to_string()),
)
}
"/health/ready" => {
let status = health_value
.get("status")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
let is_ready = status == "healthy";
let response_body = json!({
"status": if is_ready { "ready" } else { "not_ready" },
"service": "blvm-node"
});
let status_code = if is_ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
(
status_code,
serde_json::to_string(&response_body)
.unwrap_or_else(|_| "{}".to_string()),
)
}
"/health/detailed" => {
(
StatusCode::OK,
serde_json::to_string(&health_value)
.unwrap_or_else(|_| "{}".to_string()),
)
}
_ => {
return Ok(Self::http_error_response(
StatusCode::NOT_FOUND,
"Health endpoint not found",
));
}
}
}
Err(_) => {
let response_body = json!({
"status": "unhealthy",
"service": "blvm-node",
"error": "Health check failed"
});
(
StatusCode::SERVICE_UNAVAILABLE,
serde_json::to_string(&response_body).unwrap_or_else(|_| "{}".to_string()),
)
}
};
let mut response = Response::builder()
.status(status_code)
.header("Content-Type", "application/json")
.header("Content-Length", body.len())
.body(Full::new(Bytes::from(body)))
.expect("Failed to build health response");
let headers = response.headers_mut();
headers.insert("X-Content-Type-Options", "nosniff".parse().unwrap());
headers.insert(
"Cache-Control",
"no-store, no-cache, must-revalidate".parse().unwrap(),
);
Ok(response)
}
fn http_error_response(status: StatusCode, message: &str) -> Response<Full<Bytes>> {
let body = json!({
"error": {
"code": status.as_u16(),
"message": message
}
});
let body_json = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string());
Response::builder()
.status(status)
.header("Content-Type", "application/json")
.header("Content-Length", body_json.len())
.body(Full::new(Bytes::from(body_json)))
.unwrap_or_else(|e| {
error!("Failed to build error response: {}", e);
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Full::new(Bytes::from(
"{\"error\":\"Internal server error\"}",
)))
.expect("Fallback response should always succeed")
})
}
pub async fn process_request(request: &str) -> String {
let addr: SocketAddr = "127.0.0.1:0"
.parse()
.expect("127.0.0.1:0 should always parse as valid SocketAddr");
let server = Arc::new(Self::new(addr));
Self::process_request_with_server(server, request).await
}
async fn process_request_with_server(server: Arc<Self>, request: &str) -> String {
let request: Value = match serde_json::from_str(request) {
Ok(req) => req,
Err(e) => {
let err = errors::RpcError::parse_error(format!("Invalid JSON: {e}"));
return serde_json::to_string(&err.to_json(None))
.unwrap_or_else(|_| "{}".to_string());
}
};
if let Some(requests) = request.as_array() {
return Self::process_batch_request(server, requests).await;
}
let method = request.get("method").and_then(|m| m.as_str()).unwrap_or("");
let params = request.get("params").cloned().unwrap_or_else(|| json!([]));
let id = request.get("id");
let result = server.call_method(method, params).await;
match result {
Ok(response) => {
let response_str =
serde_json::to_string(&response).unwrap_or_else(|_| "null".to_string());
let id_str = match id {
Some(id_val) => {
serde_json::to_string(id_val).unwrap_or_else(|_| "null".to_string())
}
None => "null".to_string(),
};
format!(r#"{{"jsonrpc":"2.0","result":{response_str},"id":{id_str}}}"#)
}
Err(e) => {
serde_json::to_string(&e.to_json(id.cloned())).unwrap_or_else(|_| "{}".to_string())
}
}
}
async fn process_batch_request(server: Arc<Self>, requests: &[Value]) -> String {
if requests.is_empty() {
return "[]".to_string();
}
let handles: Vec<_> = requests
.iter()
.enumerate()
.map(|(index, req)| {
let server_clone = Arc::clone(&server);
let req_clone = req.clone();
tokio::spawn(async move {
let method = req_clone
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("");
let params = req_clone
.get("params")
.cloned()
.unwrap_or_else(|| json!([]));
let id = req_clone.get("id").cloned();
let result = server_clone.call_method(method, params).await;
let response = match result {
Ok(response_value) => {
json!({
"jsonrpc": "2.0",
"result": response_value,
"id": id.unwrap_or(Value::Null)
})
}
Err(e) => {
e.to_json(id)
}
};
(index, response)
})
})
.collect();
let mut results = Vec::new();
for handle in handles {
match handle.await {
Ok(result) => results.push(result),
Err(e) => {
error!("Task join error in batch request: {}", e);
}
}
}
results.sort_by_key(|(index, _)| *index);
let responses: Vec<Value> = results.into_iter().map(|(_, response)| response).collect();
serde_json::to_string(&responses).unwrap_or_else(|_| "[]".to_string())
}
async fn call_method(&self, method: &str, params: Value) -> Result<Value, errors::RpcError> {
match method {
"getblockchaininfo" => self
.blockchain
.get_blockchain_info()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getblock" => {
let hash = crate::rpc::params::param_str(¶ms, 0).unwrap_or("");
self.blockchain
.get_block(hash)
.await
.map_err(errors::rpc_error_from_blockchain_result)
}
"getblockhash" => {
let height = crate::rpc::params::param_u64_default(¶ms, 0, 0);
self.blockchain
.get_block_hash(height)
.await
.map_err(errors::rpc_error_from_blockchain_result)
}
"getblockheader" => {
let hash = crate::rpc::params::param_str(¶ms, 0).unwrap_or("");
let verbose = crate::rpc::params::param_bool_default(¶ms, 1, true);
self.blockchain
.get_block_header(hash, verbose)
.await
.map_err(errors::rpc_error_from_blockchain_result)
}
"getbestblockhash" => self
.blockchain
.get_best_block_hash()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getblockcount" => self
.blockchain
.get_block_count()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getdifficulty" => self
.blockchain
.get_difficulty()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"gettxoutsetinfo" => self
.blockchain
.get_txoutset_info()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"loadtxoutset" => self
.blockchain
.load_txout_set(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"verifychain" => {
let checklevel = crate::rpc::params::param_u64(¶ms, 0);
let numblocks = crate::rpc::params::param_u64(¶ms, 1);
self.blockchain
.verify_chain(checklevel, numblocks)
.await
.map_err(errors::rpc_error_from_blockchain_result)
}
"getchaintips" => self
.blockchain
.get_chain_tips()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getchaintxstats" => self
.blockchain
.get_chain_tx_stats(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getblockstats" => self
.blockchain
.get_block_stats(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"pruneblockchain" => self
.blockchain
.prune_blockchain(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getpruneinfo" => self
.blockchain
.get_prune_info(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"invalidateblock" => self
.blockchain
.invalidate_block(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"reconsiderblock" => self
.blockchain
.reconsider_block(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"waitfornewblock" => self
.blockchain
.wait_for_new_block(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"waitforblock" => self
.blockchain
.wait_for_block(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"waitforblockheight" => self
.blockchain
.wait_for_block_height(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getrawtransaction" => self.rawtx.getrawtransaction(¶ms).await,
"sendrawtransaction" => self.rawtx.sendrawtransaction(¶ms).await,
"testmempoolaccept" => self.rawtx.testmempoolaccept(¶ms).await,
"decoderawtransaction" => self.rawtx.decoderawtransaction(¶ms).await,
"createrawtransaction" => self.rawtx.createrawtransaction(¶ms).await,
"gettxout" => self.rawtx.gettxout(¶ms).await,
"gettxoutproof" => self.rawtx.gettxoutproof(¶ms).await,
"verifytxoutproof" => self.rawtx.verifytxoutproof(¶ms).await,
"getmempoolinfo" => self.mempool.getmempoolinfo(¶ms).await,
"getrawmempool" => self.mempool.getrawmempool(¶ms).await,
"savemempool" => self.mempool.savemempool(¶ms).await,
"getmempoolancestors" => self.mempool.getmempoolancestors(¶ms).await,
"getmempooldescendants" => self.mempool.getmempooldescendants(¶ms).await,
"getmempoolentry" => self.mempool.getmempoolentry(¶ms).await,
"getnetworkinfo" => self.network.get_network_info().await,
"getpeerinfo" => self.network.get_peer_info().await,
"getconnectioncount" => self.network.get_connection_count(¶ms).await,
"ping" => self.network.ping(¶ms).await,
"addnode" => self.network.add_node(¶ms).await,
"disconnectnode" => self.network.disconnect_node(¶ms).await,
"getnettotals" => self.network.get_net_totals(¶ms).await,
"clearbanned" => self.network.clear_banned(¶ms).await,
"setban" => self.network.set_ban(¶ms).await,
"listbanned" => self.network.list_banned(¶ms).await,
"getaddednodeinfo" => self.network.getaddednodeinfo(¶ms).await,
"getnodeaddresses" => self.network.getnodeaddresses(¶ms).await,
"setnetworkactive" => self.network.setnetworkactive(¶ms).await,
"getmininginfo" => self.mining.get_mining_info().await,
"getblocktemplate" => self.mining.get_block_template(¶ms).await,
"generatetoaddress" => self.mining.generate_to_address(¶ms).await,
"submitblock" => self.mining.submit_block(¶ms).await,
"estimatesmartfee" => self.mining.estimate_smart_fee(¶ms).await,
"prioritisetransaction" => self.mining.prioritise_transaction(¶ms).await,
"getblockfilter" => self
.blockchain
.get_block_filter(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getindexinfo" => self
.blockchain
.get_index_info(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getblockchainstate" => self
.blockchain
.get_blockchain_state()
.await
.map_err(errors::rpc_error_from_blockchain_result),
"validateaddress" => self
.blockchain
.validate_address(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"getaddressinfo" => self
.blockchain
.get_address_info(¶ms)
.await
.map_err(errors::rpc_error_from_blockchain_result),
"gettransactiondetails" => self.rawtx.get_transaction_details(¶ms).await,
"stop" => self.control.stop(¶ms).await,
"uptime" => self.control.uptime(¶ms).await,
"getmemoryinfo" => self.control.getmemoryinfo(¶ms).await,
"getrpcinfo" => self.control.getrpcinfo(¶ms).await,
"help" => self.control.help(¶ms).await,
"logging" => self.control.logging(¶ms).await,
"gethealth" => self.control.gethealth(¶ms).await,
"getmetrics" => self.control.getmetrics(¶ms).await,
"loadmodule" => self.control.loadmodule(¶ms).await,
"unloadmodule" => self.control.unloadmodule(¶ms).await,
"reloadmodule" => self.control.reloadmodule(¶ms).await,
"listmodules" => self.control.listmodules(¶ms).await,
"getmoduleclispecs" => self.control.getmoduleclispecs(¶ms).await,
"runmodulecli" => self.control.runmodulecli(¶ms).await,
#[cfg(feature = "bip70-http")]
"createpaymentrequest" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.create_payment_request(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"createcovenantproof" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.create_covenant_proof(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
"getpaymentstate" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.get_payment_state(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
"listpayments" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.list_payments(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"createvault" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.create_vault(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"getvaultstate" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.get_vault_state(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"unvault" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.unvault(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"withdrawfromvault" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.withdraw_from_vault(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"createpool" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.create_pool(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"getpoolstate" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.get_pool_state(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"joinpool" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.join_pool(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"distributepool" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.distribute_pool(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"createbatch" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.create_batch(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"addtobatch" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.add_to_batch(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"getcongestion" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.get_congestion(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"getcongestionmetrics" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.get_congestion_metrics(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "bip70-http")]
#[cfg(feature = "ctv")]
"broadcastbatch" => {
if let Some(ref payment_rpc) = self.payment {
payment_rpc
.broadcast_batch(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string()))
} else {
Err(errors::RpcError::internal_error(
"Payment RPC not available".to_string(),
))
}
}
#[cfg(feature = "miniscript")]
"getdescriptorinfo" => miniscript_rpc::get_descriptor_info(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string())),
#[cfg(feature = "miniscript")]
"analyzepsbt" => miniscript_rpc::analyze_psbt(¶ms)
.await
.map_err(|e| errors::RpcError::internal_error(e.to_string())),
_ => {
let endpoints = self.module_endpoints.read().await;
if let Some(handler) = endpoints.get(method) {
handler.handle(params).await
} else {
Err(errors::RpcError::method_not_found(method))
}
}
}
}
pub async fn register_module_endpoint(
&self,
method: String,
handler: Arc<dyn ModuleRpcHandler>,
) -> Result<(), String> {
if crate::rpc::methods::CORE_RPC_METHODS.contains(&method.as_str()) {
return Err(format!("Cannot override core RPC method: {method}"));
}
if !method.contains('_') {
tracing::warn!(
"Module RPC method '{}' does not have module prefix (recommended: 'module_method')",
method
);
}
let mut endpoints = self.module_endpoints.write().await;
endpoints.insert(method.clone(), handler);
tracing::info!("Registered module RPC endpoint: {}", method);
Ok(())
}
pub async fn unregister_module_endpoint(&self, method: &str) -> Result<(), String> {
let mut endpoints = self.module_endpoints.write().await;
if endpoints.remove(method).is_some() {
tracing::info!("Unregistered module RPC endpoint: {}", method);
Ok(())
} else {
Err(format!("Module RPC endpoint not found: {method}"))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream as TokioTcpStream;
#[tokio::test]
async fn test_rpc_server_creation() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let server = RpcServer::new(addr);
assert_eq!(server.addr, addr);
}
#[tokio::test]
async fn test_http_rpc_integration() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let server_addr = listener.local_addr().unwrap();
let server = Arc::new(RpcServer::new(server_addr));
let server_handle = tokio::spawn(async move {
while let Ok((stream, addr)) = listener.accept().await {
let peer_addr = addr;
let server_clone = server.clone();
tokio::spawn(async move {
let io = TokioIo::new(stream);
let service = service_fn(move |req| {
RpcServer::handle_http_request_with_server(
server_clone.clone(),
req,
peer_addr,
)
});
let _ = http1::Builder::new().serve_connection(io, service).await;
});
}
});
tokio::time::sleep(RPC_SERVER_STARTUP_WAIT).await;
let mut client = TokioTcpStream::connect(server_addr).await.unwrap();
let json_body = r#"{"jsonrpc":"2.0","method":"ping","params":[],"id":1}"#;
let http_request = format!(
"POST / HTTP/1.1\r\n\
Host: 127.0.0.1:18332\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{}",
json_body.len(),
json_body
);
client.write_all(http_request.as_bytes()).await.unwrap();
let mut response = vec![0u8; 4096];
let n = tokio::time::timeout(RPC_CLIENT_READ_TIMEOUT, client.read(&mut response))
.await
.unwrap()
.unwrap();
let response_str = String::from_utf8_lossy(&response[..n]);
assert!(
response_str.contains("HTTP/1.1 200 OK") || response_str.contains("200 OK"),
"Response: {response_str}"
);
assert!(
response_str.contains("content-type: application/json")
|| response_str.contains("Content-Type: application/json")
);
assert!(response_str.contains("jsonrpc"));
assert!(response_str.contains("\"result\""));
server_handle.abort();
}
#[tokio::test]
async fn test_process_request_valid_json() {
let request = r#"{"jsonrpc":"2.0","method":"getblockchaininfo","params":[],"id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(response["result"].is_object());
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_process_request_invalid_json() {
let request = "invalid json";
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert_eq!(response["error"]["code"], -32700);
assert!(
response["error"]["message"]
.as_str()
.unwrap()
.contains("Parse error")
|| response["error"]["message"]
.as_str()
.unwrap()
.contains("Invalid JSON")
);
}
#[tokio::test]
async fn test_process_request_unknown_method() {
let request = r#"{"jsonrpc":"2.0","method":"unknown_method","params":[],"id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert_eq!(response["error"]["code"], -32601);
assert!(response["error"]["message"]
.as_str()
.unwrap()
.contains("Method not found"));
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_process_request_without_id() {
let request = r#"{"jsonrpc":"2.0","method":"getblockchaininfo","params":[]}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(response["result"].is_object());
assert_eq!(response["id"], serde_json::Value::Null);
}
#[tokio::test]
async fn test_process_request_with_params() {
let request = r#"{"jsonrpc":"2.0","method":"getblock","params":["000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"],"id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
if response.get("error").is_some() {
assert!(response["error"].is_object());
} else {
assert!(response["result"].is_object() || response["result"].is_null());
}
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_call_method_getblockchaininfo() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let result = server.call_method("getblockchaininfo", json!([])).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.get("chain").is_some());
}
#[tokio::test]
async fn test_call_method_getblock() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let params = json!(["000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"]);
let result = server.call_method("getblock", params).await;
if let Ok(response) = result {
assert!(response.get("hash").is_some());
} else {
assert!(result.is_err());
}
}
#[tokio::test]
async fn test_call_method_getblockhash() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let params = json!([0]);
let result = server.call_method("getblockhash", params).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.is_string());
}
#[tokio::test]
async fn test_call_method_getrawtransaction() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let params = json!(["0000000000000000000000000000000000000000000000000000000000000000"]);
let result = server.call_method("getrawtransaction", params).await;
if let Ok(response) = result {
assert!(response.is_object() || response.is_string());
}
}
#[tokio::test]
async fn test_call_method_getnetworkinfo() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let result = server.call_method("getnetworkinfo", json!([])).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.get("version").is_some());
}
#[tokio::test]
async fn test_call_method_getpeerinfo() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let result = server.call_method("getpeerinfo", json!([])).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.is_array());
}
#[tokio::test]
async fn test_call_method_getmininginfo() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let result = server.call_method("getmininginfo", json!([])).await;
assert!(result.is_ok());
let response = result.unwrap();
assert!(response.get("blocks").is_some());
}
#[tokio::test]
async fn test_call_method_getblocktemplate() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let result = server.call_method("getblocktemplate", json!([])).await;
if let Ok(response) = result {
assert!(response.is_object() || response.is_string());
}
}
#[tokio::test]
async fn test_call_method_unknown_method() {
let server = RpcServer::new("127.0.0.1:0".parse().unwrap());
let result = server.call_method("unknown_method", json!([])).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Method not found"));
}
#[tokio::test]
async fn test_json_rpc_2_0_compliance() {
let request = r#"{"jsonrpc":"2.0","method":"getblockchaininfo","params":[],"id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(response["result"].is_object());
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_error_response_format() {
let request = r#"{"jsonrpc":"2.0","method":"unknown_method","params":[],"id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(response["error"].is_object());
assert!(response["error"]["code"].is_number());
assert!(response["error"]["message"].is_string());
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_parse_error_response() {
let request = "invalid json";
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert_eq!(response["error"]["code"], -32700);
assert!(
response["error"]["message"]
.as_str()
.unwrap()
.contains("Parse error")
|| response["error"]["message"]
.as_str()
.unwrap()
.contains("Invalid JSON")
);
}
#[tokio::test]
async fn test_method_not_found_response() {
let request = r#"{"jsonrpc":"2.0","method":"nonexistent","params":[],"id":42}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert_eq!(response["error"]["code"], -32601);
assert!(response["error"]["message"]
.as_str()
.unwrap()
.contains("Method not found"));
assert!(response["error"]["data"].is_string() || response["error"]["data"].is_null());
assert_eq!(response["id"], 42);
}
#[tokio::test]
async fn test_empty_params_handling() {
let request = r#"{"jsonrpc":"2.0","method":"getblockchaininfo","id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(response["result"].is_object());
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_missing_method_handling() {
let request = r#"{"jsonrpc":"2.0","params":[],"id":1}"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert_eq!(response["error"]["code"], -32601);
assert!(response["error"]["message"]
.as_str()
.unwrap()
.contains("Method not found"));
assert_eq!(response["id"], 1);
}
#[tokio::test]
async fn test_blockchain_methods_integration() {
let methods = vec![
"getblockchaininfo",
"getblock",
"getblockhash",
"getrawtransaction",
];
for method in methods {
let request = format!(r#"{{"jsonrpc":"2.0","method":"{method}","params":[],"id":1}}"#);
let response_str = RpcServer::process_request(&request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(
response["result"].is_object()
|| response["result"].is_string()
|| response["result"].is_null()
);
assert_eq!(response["id"], 1);
}
}
#[tokio::test]
async fn test_network_methods_integration() {
let methods = vec!["getnetworkinfo", "getpeerinfo"];
for method in methods {
let request = format!(r#"{{"jsonrpc":"2.0","method":"{method}","params":[],"id":1}}"#);
let response_str = RpcServer::process_request(&request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(response["result"].is_object() || response["result"].is_array());
assert_eq!(response["id"], 1);
}
}
#[tokio::test]
async fn test_mining_methods_integration() {
let methods = vec!["getmininginfo", "getblocktemplate"];
for method in methods {
let request = format!(r#"{{"jsonrpc":"2.0","method":"{method}","params":[],"id":1}}"#);
let response_str = RpcServer::process_request(&request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert_eq!(response["jsonrpc"], "2.0");
assert!(
response["result"].is_object()
|| response["result"].is_string()
|| response["result"].is_null()
);
assert_eq!(response["id"], 1);
}
}
#[tokio::test]
async fn test_batch_request_empty() {
let request = "[]";
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert!(response.is_array());
assert_eq!(response.as_array().unwrap().len(), 0);
}
#[tokio::test]
async fn test_batch_request_single() {
let request = r#"[{"jsonrpc":"2.0","method":"getblockchaininfo","params":[],"id":1}]"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert!(response.is_array());
let responses = response.as_array().unwrap();
assert_eq!(responses.len(), 1);
assert_eq!(responses[0]["jsonrpc"], "2.0");
assert!(responses[0]["result"].is_object());
assert_eq!(responses[0]["id"], 1);
}
#[tokio::test]
async fn test_batch_request_multiple() {
let request = r#"[
{"jsonrpc":"2.0","method":"getblockchaininfo","params":[],"id":1},
{"jsonrpc":"2.0","method":"getblockcount","params":[],"id":2},
{"jsonrpc":"2.0","method":"getnetworkinfo","params":[],"id":3}
]"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert!(response.is_array());
let responses = response.as_array().unwrap();
assert_eq!(responses.len(), 3);
assert_eq!(responses[0]["id"], 1);
assert_eq!(responses[1]["id"], 2);
assert_eq!(responses[2]["id"], 3);
assert!(responses[0]["result"].is_object());
assert!(responses[1]["result"].is_number());
assert!(responses[2]["result"].is_object());
}
#[tokio::test]
async fn test_batch_request_with_errors() {
let request = r#"[
{"jsonrpc":"2.0","method":"getblockchaininfo","params":[],"id":1},
{"jsonrpc":"2.0","method":"unknown_method","params":[],"id":2},
{"jsonrpc":"2.0","method":"getblockcount","params":[],"id":3}
]"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert!(response.is_array());
let responses = response.as_array().unwrap();
assert_eq!(responses.len(), 3);
assert_eq!(responses[0]["id"], 1);
assert!(responses[0]["result"].is_object());
assert!(responses[0]["error"].is_null());
assert_eq!(responses[1]["id"], 2);
assert!(responses[1]["result"].is_null());
assert!(responses[1]["error"].is_object());
assert_eq!(responses[1]["error"]["code"], -32601);
assert_eq!(responses[2]["id"], 3);
assert!(responses[2]["result"].is_number());
assert!(responses[2]["error"].is_null());
}
#[tokio::test]
async fn test_batch_request_order_preserved() {
let request = r#"[
{"jsonrpc":"2.0","method":"getblockcount","params":[],"id":3},
{"jsonrpc":"2.0","method":"getblockchaininfo","params":[],"id":1},
{"jsonrpc":"2.0","method":"getnetworkinfo","params":[],"id":2}
]"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert!(response.is_array());
let responses = response.as_array().unwrap();
assert_eq!(responses.len(), 3);
assert_eq!(responses[0]["id"], 3);
assert_eq!(responses[1]["id"], 1);
assert_eq!(responses[2]["id"], 2);
}
#[tokio::test]
async fn test_batch_request_without_ids() {
let request = r#"[
{"jsonrpc":"2.0","method":"getblockchaininfo","params":[]},
{"jsonrpc":"2.0","method":"getblockcount","params":[]}
]"#;
let response_str = RpcServer::process_request(request).await;
let response: Value = serde_json::from_str(&response_str).unwrap();
assert!(response.is_array());
let responses = response.as_array().unwrap();
assert_eq!(responses.len(), 2);
assert_eq!(responses[0]["id"], Value::Null);
assert_eq!(responses[1]["id"], Value::Null);
assert!(responses[0]["result"].is_object());
assert!(responses[1]["result"].is_number());
}
}