Skip to main content

miden_node_validator/server/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::Context;
6use miden_node_db::Db;
7use miden_node_proto::generated::validator::api_server;
8use miden_node_proto::generated::{self as proto};
9use miden_node_proto_build::validator_api_descriptor;
10use miden_node_utils::ErrorReport;
11use miden_node_utils::clap::GrpcOptionsInternal;
12use miden_node_utils::panic::catch_panic_layer_fn;
13use miden_node_utils::tracing::OpenTelemetrySpanExt;
14use miden_node_utils::tracing::grpc::grpc_trace_fn;
15use miden_protocol::block::ProposedBlock;
16use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
17use miden_protocol::utils::serde::{Deserializable, Serializable};
18use tokio::net::TcpListener;
19use tokio::sync::Semaphore;
20use tokio_stream::wrappers::TcpListenerStream;
21use tonic::Status;
22use tower_http::catch_panic::CatchPanicLayer;
23use tower_http::trace::TraceLayer;
24use tracing::{info_span, instrument};
25
26use crate::block_validation::validate_block;
27use crate::db::{insert_transaction, load, load_chain_tip, upsert_block_header};
28use crate::tx_validation::validate_transaction;
29use crate::{COMPONENT, ValidatorSigner};
30
31#[cfg(test)]
32mod tests;
33
34// VALIDATOR
35// ================================================================================
36
37/// The handle into running the gRPC validator server.
38///
39/// Facilitates the running of the gRPC server which implements the validator API.
40pub struct Validator {
41    /// The address of the validator component.
42    pub address: SocketAddr,
43    /// gRPC server options for internal services (timeouts, connection caps).
44    ///
45    /// If the handler takes longer than this duration, the server cancels the call.
46    pub grpc_options: GrpcOptionsInternal,
47
48    /// The signer used to sign blocks.
49    pub signer: ValidatorSigner,
50
51    /// The data directory for the validator component's database files.
52    pub data_directory: PathBuf,
53}
54
55impl Validator {
56    /// Serves the validator RPC API.
57    ///
58    /// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is
59    /// encountered.
60    pub async fn serve(self) -> anyhow::Result<()> {
61        tracing::info!(target: COMPONENT, endpoint=?self.address, "Initializing server");
62
63        // Initialize database connection.
64        let db = load(self.data_directory.join("validator.sqlite3"))
65            .await
66            .context("failed to initialize validator database")?;
67
68        let listener = TcpListener::bind(self.address)
69            .await
70            .context("failed to bind to block producer address")?;
71
72        let reflection_service = tonic_reflection::server::Builder::configure()
73            .register_file_descriptor_set(validator_api_descriptor())
74            .build_v1()
75            .context("failed to build reflection service")?;
76
77        // Build the gRPC server with the API service and trace layer.
78        tonic::transport::Server::builder()
79            .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
80            .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
81            .timeout(self.grpc_options.request_timeout)
82            .add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer, db)))
83            .add_service(reflection_service)
84            .serve_with_incoming(TcpListenerStream::new(listener))
85            .await
86            .context("failed to serve validator API")
87    }
88}
89
90// VALIDATOR SERVER
91// ================================================================================
92
93/// The underlying implementation of the gRPC validator server.
94///
95/// Implements the gRPC API for the validator.
96struct ValidatorServer {
97    signer: ValidatorSigner,
98    db: Arc<Db>,
99    /// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
100    /// ensuring consistent chain tip reads and preventing race conditions.
101    sign_block_semaphore: Semaphore,
102}
103
104impl ValidatorServer {
105    fn new(signer: ValidatorSigner, db: Db) -> Self {
106        Self {
107            signer,
108            db: db.into(),
109            sign_block_semaphore: Semaphore::new(1),
110        }
111    }
112}
113
114#[tonic::async_trait]
115impl api_server::Api for ValidatorServer {
116    /// Returns the status of the validator.
117    async fn status(
118        &self,
119        _request: tonic::Request<()>,
120    ) -> Result<tonic::Response<proto::validator::ValidatorStatus>, tonic::Status> {
121        Ok(tonic::Response::new(proto::validator::ValidatorStatus {
122            version: env!("CARGO_PKG_VERSION").to_string(),
123            status: "OK".to_string(),
124        }))
125    }
126
127    /// Receives a proven transaction, then validates and stores it.
128    #[instrument(target = COMPONENT, skip_all, err)]
129    async fn submit_proven_transaction(
130        &self,
131        request: tonic::Request<proto::transaction::ProvenTransaction>,
132    ) -> Result<tonic::Response<()>, tonic::Status> {
133        let (tx, inputs) = info_span!("deserialize").in_scope(|| {
134            let request = request.into_inner();
135            let tx = ProvenTransaction::read_from_bytes(&request.transaction).map_err(|err| {
136                Status::invalid_argument(err.as_report_context("Invalid proven transaction"))
137            })?;
138            let inputs = request
139                .transaction_inputs
140                .ok_or(Status::invalid_argument("Missing transaction inputs"))?;
141            let inputs = TransactionInputs::read_from_bytes(&inputs).map_err(|err| {
142                Status::invalid_argument(err.as_report_context("Invalid transaction inputs"))
143            })?;
144
145            Result::<_, tonic::Status>::Ok((tx, inputs))
146        })?;
147
148        tracing::Span::current().set_attribute("transaction.id", tx.id());
149
150        // Validate the transaction.
151        let tx_info = validate_transaction(tx, inputs).await.map_err(|err| {
152            Status::invalid_argument(err.as_report_context("Invalid transaction"))
153        })?;
154
155        // Store the validated transaction.
156        self.db
157            .transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info))
158            .await
159            .map_err(|err| {
160                Status::internal(err.as_report_context("Failed to insert transaction"))
161            })?;
162        Ok(tonic::Response::new(()))
163    }
164
165    /// Validates a proposed block, verifies chain continuity, signs the block header, and updates
166    /// the chain tip.
167    async fn sign_block(
168        &self,
169        request: tonic::Request<proto::blockchain::ProposedBlock>,
170    ) -> Result<tonic::Response<proto::blockchain::BlockSignature>, tonic::Status> {
171        let proposed_block = info_span!("deserialize").in_scope(|| {
172            let proposed_block_bytes = request.into_inner().proposed_block;
173
174            ProposedBlock::read_from_bytes(&proposed_block_bytes).map_err(|err| {
175                tonic::Status::invalid_argument(format!(
176                    "Failed to deserialize proposed block: {err}",
177                ))
178            })
179        })?;
180
181        // Serialize sign_block requests to prevent race conditions between loading the
182        // chain tip and persisting the validated block header.
183        let _permit = self.sign_block_semaphore.acquire().await.map_err(|err| {
184            tonic::Status::internal(format!("sign_block semaphore closed: {err}"))
185        })?;
186
187        // Load the current chain tip from the database.
188        let chain_tip = self
189            .db
190            .query("load_chain_tip", load_chain_tip)
191            .await
192            .map_err(|err| {
193                tonic::Status::internal(format!("Failed to load chain tip: {}", err.as_report()))
194            })?
195            .ok_or_else(|| tonic::Status::internal("Chain tip not found in database"))?;
196
197        // Validate the block against the current chain tip.
198        let (signature, header) = validate_block(proposed_block, &self.signer, &self.db, chain_tip)
199            .await
200            .map_err(|err| {
201                tonic::Status::invalid_argument(format!(
202                    "Failed to validate block: {}",
203                    err.as_report()
204                ))
205            })?;
206
207        // Persist the validated block header.
208        self.db
209            .transact("upsert_block_header", move |conn| upsert_block_header(conn, &header))
210            .await
211            .map_err(|err| {
212                tonic::Status::internal(format!(
213                    "Failed to persist block header: {}",
214                    err.as_report()
215                ))
216            })?;
217
218        // Send the signature.
219        let response = proto::blockchain::BlockSignature { signature: signature.to_bytes() };
220        Ok(tonic::Response::new(response))
221    }
222}