miden_node_validator/server/
mod.rs1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::Context;
6use miden_node_db::Db;
7use miden_node_proto::generated::validator::api_server;
8use miden_node_proto::generated::{self as proto};
9use miden_node_proto_build::validator_api_descriptor;
10use miden_node_utils::ErrorReport;
11use miden_node_utils::clap::GrpcOptionsInternal;
12use miden_node_utils::panic::catch_panic_layer_fn;
13use miden_node_utils::tracing::OpenTelemetrySpanExt;
14use miden_node_utils::tracing::grpc::grpc_trace_fn;
15use miden_protocol::block::ProposedBlock;
16use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
17use miden_protocol::utils::serde::{Deserializable, Serializable};
18use tokio::net::TcpListener;
19use tokio::sync::Semaphore;
20use tokio_stream::wrappers::TcpListenerStream;
21use tonic::Status;
22use tower_http::catch_panic::CatchPanicLayer;
23use tower_http::trace::TraceLayer;
24use tracing::{info_span, instrument};
25
26use crate::block_validation::validate_block;
27use crate::db::{insert_transaction, load, load_chain_tip, upsert_block_header};
28use crate::tx_validation::validate_transaction;
29use crate::{COMPONENT, ValidatorSigner};
30
31#[cfg(test)]
32mod tests;
33
34pub struct Validator {
41 pub address: SocketAddr,
43 pub grpc_options: GrpcOptionsInternal,
47
48 pub signer: ValidatorSigner,
50
51 pub data_directory: PathBuf,
53}
54
55impl Validator {
56 pub async fn serve(self) -> anyhow::Result<()> {
61 tracing::info!(target: COMPONENT, endpoint=?self.address, "Initializing server");
62
63 let db = load(self.data_directory.join("validator.sqlite3"))
65 .await
66 .context("failed to initialize validator database")?;
67
68 let listener = TcpListener::bind(self.address)
69 .await
70 .context("failed to bind to block producer address")?;
71
72 let reflection_service = tonic_reflection::server::Builder::configure()
73 .register_file_descriptor_set(validator_api_descriptor())
74 .build_v1()
75 .context("failed to build reflection service")?;
76
77 tonic::transport::Server::builder()
79 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
80 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
81 .timeout(self.grpc_options.request_timeout)
82 .add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer, db)))
83 .add_service(reflection_service)
84 .serve_with_incoming(TcpListenerStream::new(listener))
85 .await
86 .context("failed to serve validator API")
87 }
88}
89
90struct ValidatorServer {
97 signer: ValidatorSigner,
98 db: Arc<Db>,
99 sign_block_semaphore: Semaphore,
102}
103
104impl ValidatorServer {
105 fn new(signer: ValidatorSigner, db: Db) -> Self {
106 Self {
107 signer,
108 db: db.into(),
109 sign_block_semaphore: Semaphore::new(1),
110 }
111 }
112}
113
114#[tonic::async_trait]
115impl api_server::Api for ValidatorServer {
116 async fn status(
118 &self,
119 _request: tonic::Request<()>,
120 ) -> Result<tonic::Response<proto::validator::ValidatorStatus>, tonic::Status> {
121 Ok(tonic::Response::new(proto::validator::ValidatorStatus {
122 version: env!("CARGO_PKG_VERSION").to_string(),
123 status: "OK".to_string(),
124 }))
125 }
126
127 #[instrument(target = COMPONENT, skip_all, err)]
129 async fn submit_proven_transaction(
130 &self,
131 request: tonic::Request<proto::transaction::ProvenTransaction>,
132 ) -> Result<tonic::Response<()>, tonic::Status> {
133 let (tx, inputs) = info_span!("deserialize").in_scope(|| {
134 let request = request.into_inner();
135 let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| {
136 Status::invalid_argument(err.as_report_context("Invalid proven transaction"))
137 })?;
138 let inputs = request
139 .transaction_inputs
140 .ok_or(Status::invalid_argument("Missing transaction inputs"))?;
141 let inputs = TransactionInputs::read_from_bytes(&inputs).map_err(|err| {
142 Status::invalid_argument(err.as_report_context("Invalid transaction inputs"))
143 })?;
144
145 Result::<_, tonic::Status>::Ok((tx, inputs))
146 })?;
147
148 tracing::Span::current().set_attribute("transaction.id", tx.id());
149
150 let tx_info = validate_transaction(tx, inputs).await.map_err(|err| {
152 Status::invalid_argument(err.as_report_context("Invalid transaction"))
153 })?;
154
155 self.db
157 .transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info))
158 .await
159 .map_err(|err| {
160 Status::internal(err.as_report_context("Failed to insert transaction"))
161 })?;
162 Ok(tonic::Response::new(()))
163 }
164
165 async fn sign_block(
168 &self,
169 request: tonic::Request<proto::blockchain::ProposedBlock>,
170 ) -> Result<tonic::Response<proto::blockchain::BlockSignature>, tonic::Status> {
171 let proposed_block = info_span!("deserialize").in_scope(|| {
172 let proposed_block_bytes = request.into_inner().proposed_block;
173
174 ProposedBlock::read_from_bytes(&proposed_block_bytes).map_err(|err| {
175 tonic::Status::invalid_argument(format!(
176 "Failed to deserialize proposed block: {err}",
177 ))
178 })
179 })?;
180
181 let _permit = self.sign_block_semaphore.acquire().await.map_err(|err| {
184 tonic::Status::internal(format!("sign_block semaphore closed: {err}"))
185 })?;
186
187 let chain_tip = self
189 .db
190 .query("load_chain_tip", load_chain_tip)
191 .await
192 .map_err(|err| {
193 tonic::Status::internal(format!("Failed to load chain tip: {}", err.as_report()))
194 })?
195 .ok_or_else(|| tonic::Status::internal("Chain tip not found in database"))?;
196
197 let (signature, header) = validate_block(proposed_block, &self.signer, &self.db, chain_tip)
199 .await
200 .map_err(|err| {
201 tonic::Status::invalid_argument(format!(
202 "Failed to validate block: {}",
203 err.as_report()
204 ))
205 })?;
206
207 self.db
209 .transact("upsert_block_header", move |conn| upsert_block_header(conn, &header))
210 .await
211 .map_err(|err| {
212 tonic::Status::internal(format!(
213 "Failed to persist block header: {}",
214 err.as_report()
215 ))
216 })?;
217
218 let response = proto::blockchain::BlockSignature { signature: signature.to_bytes() };
220 Ok(tonic::Response::new(response))
221 }
222}