use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use miden_node_db::Db;
use miden_node_proto::generated::validator::api_server;
use miden_node_proto::generated::{self as proto};
use miden_node_proto_build::validator_api_descriptor;
use miden_node_utils::ErrorReport;
use miden_node_utils::clap::GrpcOptionsInternal;
use miden_node_utils::panic::catch_panic_layer_fn;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_node_utils::tracing::grpc::grpc_trace_fn;
use miden_protocol::block::ProposedBlock;
use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
use miden_protocol::utils::serde::{Deserializable, Serializable};
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::Status;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::trace::TraceLayer;
use tracing::{info_span, instrument};
use crate::block_validation::validate_block;
use crate::db::{insert_transaction, load, load_chain_tip, upsert_block_header};
use crate::tx_validation::validate_transaction;
use crate::{COMPONENT, ValidatorSigner};
#[cfg(test)]
mod tests;
pub struct Validator {
pub address: SocketAddr,
pub grpc_options: GrpcOptionsInternal,
pub signer: ValidatorSigner,
pub data_directory: PathBuf,
}
impl Validator {
pub async fn serve(self) -> anyhow::Result<()> {
tracing::info!(target: COMPONENT, endpoint=?self.address, "Initializing server");
let db = load(self.data_directory.join("validator.sqlite3"))
.await
.context("failed to initialize validator database")?;
let listener = TcpListener::bind(self.address)
.await
.context("failed to bind to block producer address")?;
let reflection_service = tonic_reflection::server::Builder::configure()
.register_file_descriptor_set(validator_api_descriptor())
.build_v1()
.context("failed to build reflection service")?;
tonic::transport::Server::builder()
.layer(CatchPanicLayer::custom(catch_panic_layer_fn))
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.timeout(self.grpc_options.request_timeout)
.add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer, db)))
.add_service(reflection_service)
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.context("failed to serve validator API")
}
}
struct ValidatorServer {
signer: ValidatorSigner,
db: Arc<Db>,
sign_block_semaphore: Semaphore,
}
impl ValidatorServer {
fn new(signer: ValidatorSigner, db: Db) -> Self {
Self {
signer,
db: db.into(),
sign_block_semaphore: Semaphore::new(1),
}
}
}
#[tonic::async_trait]
impl api_server::Api for ValidatorServer {
async fn status(
&self,
_request: tonic::Request<()>,
) -> Result<tonic::Response<proto::validator::ValidatorStatus>, tonic::Status> {
Ok(tonic::Response::new(proto::validator::ValidatorStatus {
version: env!("CARGO_PKG_VERSION").to_string(),
status: "OK".to_string(),
}))
}
#[instrument(target = COMPONENT, skip_all, err)]
async fn submit_proven_transaction(
&self,
request: tonic::Request<proto::transaction::ProvenTransaction>,
) -> Result<tonic::Response<()>, tonic::Status> {
let (tx, inputs) = info_span!("deserialize").in_scope(|| {
let request = request.into_inner();
let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| {
Status::invalid_argument(err.as_report_context("Invalid proven transaction"))
})?;
let inputs = request
.transaction_inputs
.ok_or(Status::invalid_argument("Missing transaction inputs"))?;
let inputs = TransactionInputs::read_from_bytes(&inputs).map_err(|err| {
Status::invalid_argument(err.as_report_context("Invalid transaction inputs"))
})?;
Result::<_, tonic::Status>::Ok((tx, inputs))
})?;
tracing::Span::current().set_attribute("transaction.id", tx.id());
let tx_info = validate_transaction(tx, inputs).await.map_err(|err| {
Status::invalid_argument(err.as_report_context("Invalid transaction"))
})?;
self.db
.transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info))
.await
.map_err(|err| {
Status::internal(err.as_report_context("Failed to insert transaction"))
})?;
Ok(tonic::Response::new(()))
}
async fn sign_block(
&self,
request: tonic::Request<proto::blockchain::ProposedBlock>,
) -> Result<tonic::Response<proto::blockchain::BlockSignature>, tonic::Status> {
let proposed_block = info_span!("deserialize").in_scope(|| {
let proposed_block_bytes = request.into_inner().proposed_block;
ProposedBlock::read_from_bytes(&proposed_block_bytes).map_err(|err| {
tonic::Status::invalid_argument(format!(
"Failed to deserialize proposed block: {err}",
))
})
})?;
let _permit = self.sign_block_semaphore.acquire().await.map_err(|err| {
tonic::Status::internal(format!("sign_block semaphore closed: {err}"))
})?;
let chain_tip = self
.db
.query("load_chain_tip", load_chain_tip)
.await
.map_err(|err| {
tonic::Status::internal(format!("Failed to load chain tip: {}", err.as_report()))
})?
.ok_or_else(|| tonic::Status::internal("Chain tip not found in database"))?;
let (signature, header) = validate_block(proposed_block, &self.signer, &self.db, chain_tip)
.await
.map_err(|err| {
tonic::Status::invalid_argument(format!(
"Failed to validate block: {}",
err.as_report()
))
})?;
self.db
.transact("upsert_block_header", move |conn| upsert_block_header(conn, &header))
.await
.map_err(|err| {
tonic::Status::internal(format!(
"Failed to persist block header: {}",
err.as_report()
))
})?;
let response = proto::blockchain::BlockSignature { signature: signature.to_bytes() };
Ok(tonic::Response::new(response))
}
}