Skip to main content

miden_node_validator/server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::Context;
6use miden_node_db::Db;
7use miden_node_proto::generated::validator::api_server;
8use miden_node_proto::generated::{self as proto};
9use miden_node_proto_build::validator_api_descriptor;
10use miden_node_utils::ErrorReport;
11use miden_node_utils::clap::GrpcOptionsInternal;
12use miden_node_utils::panic::catch_panic_layer_fn;
13use miden_node_utils::tracing::OpenTelemetrySpanExt;
14use miden_node_utils::tracing::grpc::grpc_trace_fn;
15use miden_protocol::block::ProposedBlock;
16use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
17use miden_tx::utils::{Deserializable, Serializable};
18use tokio::net::TcpListener;
19use tokio::sync::Semaphore;
20use tokio_stream::wrappers::TcpListenerStream;
21use tonic::Status;
22use tower_http::catch_panic::CatchPanicLayer;
23use tower_http::trace::TraceLayer;
24use tracing::{info_span, instrument};
25
26use crate::block_validation::validate_block;
27use crate::db::{insert_transaction, load, load_chain_tip, upsert_block_header};
28use crate::tx_validation::validate_transaction;
29use crate::{COMPONENT, ValidatorSigner};
30
31// VALIDATOR
32// ================================================================================
33
34/// The handle into running the gRPC validator server.
35///
36/// Facilitates the running of the gRPC server which implements the validator API.
37pub struct Validator {
38    /// The address of the validator component.
39    pub address: SocketAddr,
40    /// gRPC server options for internal services (timeouts, connection caps).
41    ///
42    /// If the handler takes longer than this duration, the server cancels the call.
43    pub grpc_options: GrpcOptionsInternal,
44
45    /// The signer used to sign blocks.
46    pub signer: ValidatorSigner,
47
48    /// The data directory for the validator component's database files.
49    pub data_directory: PathBuf,
50}
51
52impl Validator {
53    /// Serves the validator RPC API.
54    ///
55    /// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is
56    /// encountered.
57    pub async fn serve(self) -> anyhow::Result<()> {
58        tracing::info!(target: COMPONENT, endpoint=?self.address, "Initializing server");
59
60        // Initialize database connection.
61        let db = load(self.data_directory.join("validator.sqlite3"))
62            .await
63            .context("failed to initialize validator database")?;
64
65        let listener = TcpListener::bind(self.address)
66            .await
67            .context("failed to bind to block producer address")?;
68
69        let reflection_service = tonic_reflection::server::Builder::configure()
70            .register_file_descriptor_set(validator_api_descriptor())
71            .build_v1()
72            .context("failed to build reflection service")?;
73
74        // Build the gRPC server with the API service and trace layer.
75        tonic::transport::Server::builder()
76            .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
77            .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
78            .timeout(self.grpc_options.request_timeout)
79            .add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer, db)))
80            .add_service(reflection_service)
81            .serve_with_incoming(TcpListenerStream::new(listener))
82            .await
83            .context("failed to serve validator API")
84    }
85}
86
87// VALIDATOR SERVER
88// ================================================================================
89
90/// The underlying implementation of the gRPC validator server.
91///
92/// Implements the gRPC API for the validator.
93struct ValidatorServer {
94    signer: ValidatorSigner,
95    db: Arc<Db>,
96    /// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
97    /// ensuring consistent chain tip reads and preventing race conditions.
98    sign_block_semaphore: Semaphore,
99}
100
101impl ValidatorServer {
102    fn new(signer: ValidatorSigner, db: Db) -> Self {
103        Self {
104            signer,
105            db: db.into(),
106            sign_block_semaphore: Semaphore::new(1),
107        }
108    }
109}
110
111#[tonic::async_trait]
112impl api_server::Api for ValidatorServer {
113    /// Returns the status of the validator.
114    async fn status(
115        &self,
116        _request: tonic::Request<()>,
117    ) -> Result<tonic::Response<proto::validator::ValidatorStatus>, tonic::Status> {
118        Ok(tonic::Response::new(proto::validator::ValidatorStatus {
119            version: env!("CARGO_PKG_VERSION").to_string(),
120            status: "OK".to_string(),
121        }))
122    }
123
124    /// Receives a proven transaction, then validates and stores it.
125    #[instrument(target = COMPONENT, skip_all, err)]
126    async fn submit_proven_transaction(
127        &self,
128        request: tonic::Request<proto::transaction::ProvenTransaction>,
129    ) -> Result<tonic::Response<()>, tonic::Status> {
130        let (tx, inputs) = info_span!("deserialize").in_scope(|| {
131            let request = request.into_inner();
132            let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| {
133                Status::invalid_argument(err.as_report_context("Invalid proven transaction"))
134            })?;
135            let inputs = request
136                .transaction_inputs
137                .ok_or(Status::invalid_argument("Missing transaction inputs"))?;
138            let inputs = TransactionInputs::read_from_bytes(&inputs).map_err(|err| {
139                Status::invalid_argument(err.as_report_context("Invalid transaction inputs"))
140            })?;
141
142            Result::<_, tonic::Status>::Ok((tx, inputs))
143        })?;
144
145        tracing::Span::current().set_attribute("transaction.id", tx.id());
146
147        // Validate the transaction.
148        let tx_info = validate_transaction(tx, inputs).await.map_err(|err| {
149            Status::invalid_argument(err.as_report_context("Invalid transaction"))
150        })?;
151
152        // Store the validated transaction.
153        self.db
154            .transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info))
155            .await
156            .map_err(|err| {
157                Status::internal(err.as_report_context("Failed to insert transaction"))
158            })?;
159        Ok(tonic::Response::new(()))
160    }
161
162    /// Validates a proposed block, verifies chain continuity, signs the block header, and updates
163    /// the chain tip.
164    async fn sign_block(
165        &self,
166        request: tonic::Request<proto::blockchain::ProposedBlock>,
167    ) -> Result<tonic::Response<proto::blockchain::BlockSignature>, tonic::Status> {
168        let proposed_block = info_span!("deserialize").in_scope(|| {
169            let proposed_block_bytes = request.into_inner().proposed_block;
170
171            ProposedBlock::read_from_bytes(&proposed_block_bytes).map_err(|err| {
172                tonic::Status::invalid_argument(format!(
173                    "Failed to deserialize proposed block: {err}",
174                ))
175            })
176        })?;
177
178        // Serialize sign_block requests to prevent race conditions between loading the
179        // chain tip and persisting the validated block header.
180        let _permit = self.sign_block_semaphore.acquire().await.map_err(|err| {
181            tonic::Status::internal(format!("sign_block semaphore closed: {err}"))
182        })?;
183
184        // Load the current chain tip from the database.
185        let chain_tip = self
186            .db
187            .query("load_chain_tip", load_chain_tip)
188            .await
189            .map_err(|err| {
190                tonic::Status::internal(format!("Failed to load chain tip: {}", err.as_report()))
191            })?
192            .ok_or_else(|| tonic::Status::internal("Chain tip not found in database"))?;
193
194        // Validate the block against the current chain tip.
195        let (signature, header) = validate_block(proposed_block, &self.signer, &self.db, chain_tip)
196            .await
197            .map_err(|err| {
198                tonic::Status::invalid_argument(format!(
199                    "Failed to validate block: {}",
200                    err.as_report()
201                ))
202            })?;
203
204        // Persist the validated block header.
205        self.db
206            .transact("upsert_block_header", move |conn| upsert_block_header(conn, &header))
207            .await
208            .map_err(|err| {
209                tonic::Status::internal(format!(
210                    "Failed to persist block header: {}",
211                    err.as_report()
212                ))
213            })?;
214
215        // Send the signature.
216        let response = proto::blockchain::BlockSignature { signature: signature.to_bytes() };
217        Ok(tonic::Response::new(response))
218    }
219}