use std::num::NonZeroUsize;
use std::sync::Arc;
use accept::AcceptHeaderLayer;
use anyhow::Context;
use miden_node_block_producer::BlockProducerApi;
use miden_node_proto::clients::{NtxBuilderClient, RpcClient as SourceRpcClient, ValidatorClient};
use miden_node_proto::generated::rpc::api_server;
use miden_node_proto_build::rpc_api_descriptor;
use miden_node_store::state::State;
use miden_node_utils::clap::GrpcOptionsExternal;
use miden_node_utils::cors::cors_for_grpc_web_layer;
use miden_node_utils::grpc;
use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
use miden_node_utils::tracing::grpc::grpc_trace_fn;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::metadata::AsciiMetadataValue;
use tonic_reflection::server;
use tonic_web::GrpcWebLayer;
use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier};
use tower_http::trace::TraceLayer;
use tracing::info;
use crate::COMPONENT;
use crate::server::health::HealthCheckLayer;
mod accept;
pub(crate) mod api;
mod health;
pub struct Rpc {
pub listener: TcpListener,
pub store: Arc<State>,
pub mode: RpcMode,
pub ntx_builder: Option<NtxBuilderClient>,
pub grpc_options: GrpcOptionsExternal,
pub network_tx_auth: Option<AsciiMetadataValue>,
}
#[derive(Clone, Debug)]
pub(crate) struct NetworkTxAuth(pub(crate) AsciiMetadataValue);
#[derive(Clone, Debug)]
pub enum RpcMode {
Sequencer {
block_producer: Box<BlockProducerApi>,
validator: Box<ValidatorClient>,
},
FullNode { source_rpc: Box<SourceRpcClient> },
}
impl RpcMode {
pub fn sequencer(block_producer: BlockProducerApi, validator: ValidatorClient) -> Self {
Self::Sequencer {
block_producer: Box::new(block_producer),
validator: Box::new(validator),
}
}
pub fn full_node(source_rpc: SourceRpcClient) -> Self {
Self::FullNode { source_rpc: Box::new(source_rpc) }
}
}
impl Rpc {
pub async fn serve(self) -> anyhow::Result<()> {
let mut api = api::RpcService::new(
self.store.clone(),
self.mode.clone(),
self.ntx_builder.clone(),
NonZeroUsize::new(1_000_000).unwrap(),
self.network_tx_auth.map(NetworkTxAuth),
);
let genesis = api
.get_genesis_header_with_retry()
.await
.context("Fetching genesis header from store")?;
api.set_genesis_commitment(genesis.commitment())?;
let api_service = api_server::ApiServer::new(api);
let reflection_service = server::Builder::configure()
.register_file_descriptor_set(rpc_api_descriptor())
.build_v1()
.context("failed to build reflection service")?;
info!(target: COMPONENT, endpoint=?self.listener, mode=?self.mode, "Server initialized");
let rpc_version = env!("CARGO_PKG_VERSION");
let rpc_version =
semver::Version::parse(rpc_version).context("failed to parse crate version")?;
tonic::transport::Server::builder()
.accept_http1(true)
.max_connection_age(self.grpc_options.max_connection_age)
.timeout(self.grpc_options.request_timeout)
.layer(CatchPanicLayer::custom(catch_panic_layer_fn))
.layer(
TraceLayer::new(SharedClassifier::new(
GrpcErrorsAsFailures::new()
.with_success(GrpcCode::InvalidArgument)
.with_success(GrpcCode::NotFound)
.with_success(GrpcCode::ResourceExhausted)
.with_success(GrpcCode::Unimplemented)
.with_success(GrpcCode::Unknown),
))
.make_span_with(grpc_trace_fn),
)
.layer(HealthCheckLayer)
.layer(cors_for_grpc_web_layer())
.layer(GrpcWebLayer::new())
.layer(grpc::rate_limit_concurrent_connections(self.grpc_options))
.layer(grpc::rate_limit_per_ip(self.grpc_options)?)
.layer(
AcceptHeaderLayer::new(&rpc_version, genesis.commitment())
.with_genesis_enforced_method("SubmitProvenTx")
.with_genesis_enforced_method("SubmitProvenTxBatch"),
)
.add_service(api_service)
.add_service(reflection_service)
.serve_with_incoming(TcpListenerStream::new(self.listener))
.await
.context("failed to serve RPC API")
}
}