miden_node_validator/server/
mod.rs1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::Context;
6use miden_node_db::Db;
7use miden_node_proto::generated::validator::api_server;
8use miden_node_proto::generated::{self as proto};
9use miden_node_proto_build::validator_api_descriptor;
10use miden_node_utils::ErrorReport;
11use miden_node_utils::clap::GrpcOptionsInternal;
12use miden_node_utils::panic::catch_panic_layer_fn;
13use miden_node_utils::tracing::OpenTelemetrySpanExt;
14use miden_node_utils::tracing::grpc::grpc_trace_fn;
15use miden_protocol::block::ProposedBlock;
16use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
17use miden_tx::utils::{Deserializable, Serializable};
18use tokio::net::TcpListener;
19use tokio::sync::Semaphore;
20use tokio_stream::wrappers::TcpListenerStream;
21use tonic::Status;
22use tower_http::catch_panic::CatchPanicLayer;
23use tower_http::trace::TraceLayer;
24use tracing::{info_span, instrument};
25
26use crate::block_validation::validate_block;
27use crate::db::{insert_transaction, load, load_chain_tip, upsert_block_header};
28use crate::tx_validation::validate_transaction;
29use crate::{COMPONENT, ValidatorSigner};
30
31pub struct Validator {
38 pub address: SocketAddr,
40 pub grpc_options: GrpcOptionsInternal,
44
45 pub signer: ValidatorSigner,
47
48 pub data_directory: PathBuf,
50}
51
52impl Validator {
53 pub async fn serve(self) -> anyhow::Result<()> {
58 tracing::info!(target: COMPONENT, endpoint=?self.address, "Initializing server");
59
60 let db = load(self.data_directory.join("validator.sqlite3"))
62 .await
63 .context("failed to initialize validator database")?;
64
65 let listener = TcpListener::bind(self.address)
66 .await
67 .context("failed to bind to block producer address")?;
68
69 let reflection_service = tonic_reflection::server::Builder::configure()
70 .register_file_descriptor_set(validator_api_descriptor())
71 .build_v1()
72 .context("failed to build reflection service")?;
73
74 tonic::transport::Server::builder()
76 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
77 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
78 .timeout(self.grpc_options.request_timeout)
79 .add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer, db)))
80 .add_service(reflection_service)
81 .serve_with_incoming(TcpListenerStream::new(listener))
82 .await
83 .context("failed to serve validator API")
84 }
85}
86
87struct ValidatorServer {
94 signer: ValidatorSigner,
95 db: Arc<Db>,
96 sign_block_semaphore: Semaphore,
99}
100
101impl ValidatorServer {
102 fn new(signer: ValidatorSigner, db: Db) -> Self {
103 Self {
104 signer,
105 db: db.into(),
106 sign_block_semaphore: Semaphore::new(1),
107 }
108 }
109}
110
111#[tonic::async_trait]
112impl api_server::Api for ValidatorServer {
113 async fn status(
115 &self,
116 _request: tonic::Request<()>,
117 ) -> Result<tonic::Response<proto::validator::ValidatorStatus>, tonic::Status> {
118 Ok(tonic::Response::new(proto::validator::ValidatorStatus {
119 version: env!("CARGO_PKG_VERSION").to_string(),
120 status: "OK".to_string(),
121 }))
122 }
123
124 #[instrument(target = COMPONENT, skip_all, err)]
126 async fn submit_proven_transaction(
127 &self,
128 request: tonic::Request<proto::transaction::ProvenTransaction>,
129 ) -> Result<tonic::Response<()>, tonic::Status> {
130 let (tx, inputs) = info_span!("deserialize").in_scope(|| {
131 let request = request.into_inner();
132 let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| {
133 Status::invalid_argument(err.as_report_context("Invalid proven transaction"))
134 })?;
135 let inputs = request
136 .transaction_inputs
137 .ok_or(Status::invalid_argument("Missing transaction inputs"))?;
138 let inputs = TransactionInputs::read_from_bytes(&inputs).map_err(|err| {
139 Status::invalid_argument(err.as_report_context("Invalid transaction inputs"))
140 })?;
141
142 Result::<_, tonic::Status>::Ok((tx, inputs))
143 })?;
144
145 tracing::Span::current().set_attribute("transaction.id", tx.id());
146
147 let tx_info = validate_transaction(tx, inputs).await.map_err(|err| {
149 Status::invalid_argument(err.as_report_context("Invalid transaction"))
150 })?;
151
152 self.db
154 .transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info))
155 .await
156 .map_err(|err| {
157 Status::internal(err.as_report_context("Failed to insert transaction"))
158 })?;
159 Ok(tonic::Response::new(()))
160 }
161
162 async fn sign_block(
165 &self,
166 request: tonic::Request<proto::blockchain::ProposedBlock>,
167 ) -> Result<tonic::Response<proto::blockchain::BlockSignature>, tonic::Status> {
168 let proposed_block = info_span!("deserialize").in_scope(|| {
169 let proposed_block_bytes = request.into_inner().proposed_block;
170
171 ProposedBlock::read_from_bytes(&proposed_block_bytes).map_err(|err| {
172 tonic::Status::invalid_argument(format!(
173 "Failed to deserialize proposed block: {err}",
174 ))
175 })
176 })?;
177
178 let _permit = self.sign_block_semaphore.acquire().await.map_err(|err| {
181 tonic::Status::internal(format!("sign_block semaphore closed: {err}"))
182 })?;
183
184 let chain_tip = self
186 .db
187 .query("load_chain_tip", load_chain_tip)
188 .await
189 .map_err(|err| {
190 tonic::Status::internal(format!("Failed to load chain tip: {}", err.as_report()))
191 })?
192 .ok_or_else(|| tonic::Status::internal("Chain tip not found in database"))?;
193
194 let (signature, header) = validate_block(proposed_block, &self.signer, &self.db, chain_tip)
196 .await
197 .map_err(|err| {
198 tonic::Status::invalid_argument(format!(
199 "Failed to validate block: {}",
200 err.as_report()
201 ))
202 })?;
203
204 self.db
206 .transact("upsert_block_header", move |conn| upsert_block_header(conn, &header))
207 .await
208 .map_err(|err| {
209 tonic::Status::internal(format!(
210 "Failed to persist block header: {}",
211 err.as_report()
212 ))
213 })?;
214
215 let response = proto::blockchain::BlockSignature { signature: signature.to_bytes() };
217 Ok(tonic::Response::new(response))
218 }
219}