use std::convert::Infallible;
use futures::TryFutureExt;
use miden_crypto::dsa::ecdsa_k256_keccak::Signature;
use miden_node_proto::errors::MissingFieldHelper;
use miden_node_proto::generated::store::block_producer_server;
use miden_node_proto::generated::{self as proto};
use miden_node_proto::try_convert;
use miden_node_utils::ErrorReport;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::Word;
use miden_protocol::batch::OrderedBatches;
use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, SignedBlock};
use miden_protocol::utils::Deserializable;
use tonic::{Request, Response, Status};
use tracing::Instrument;
use crate::errors::ApplyBlockError;
use crate::server::api::{
StoreApi,
conversion_error_to_status,
read_account_id,
read_account_ids,
read_block_numbers,
validate_note_commitments,
validate_nullifiers,
};
#[tonic::async_trait]
impl block_producer_server::BlockProducer for StoreApi {
async fn get_block_header_by_number(
&self,
request: Request<proto::rpc::BlockHeaderByNumberRequest>,
) -> Result<Response<proto::rpc::BlockHeaderByNumberResponse>, Status> {
self.get_block_header_by_number_inner(request).await
}
async fn apply_block(
&self,
request: Request<proto::store::ApplyBlockRequest>,
) -> Result<Response<()>, Status> {
let request = request.into_inner();
let ordered_batches =
OrderedBatches::read_from_bytes(&request.ordered_batches).map_err(|err| {
Status::invalid_argument(
err.as_report_context("failed to deserialize ordered batches"),
)
})?;
let block = request
.block
.ok_or(proto::store::ApplyBlockRequest::missing_field(stringify!(block)))?;
let header: BlockHeader = block
.header
.ok_or(proto::blockchain::SignedBlock::missing_field(stringify!(header)))?
.try_into()?;
let body: BlockBody = block
.body
.ok_or(proto::blockchain::SignedBlock::missing_field(stringify!(body)))?
.try_into()?;
let signature: Signature = block
.signature
.ok_or(proto::blockchain::SignedBlock::missing_field(stringify!(signature)))?
.try_into()?;
let block_inputs =
self.block_inputs_from_ordered_batches(&ordered_batches).await.map_err(|err| {
Status::invalid_argument(
err.as_report_context("failed to get block inputs from ordered batches"),
)
})?;
let span = tracing::Span::current();
span.set_attribute("block.number", header.block_num());
span.set_attribute("block.commitment", header.commitment());
span.set_attribute("block.accounts.count", body.updated_accounts().len());
span.set_attribute("block.output_notes.count", body.output_notes().count());
span.set_attribute("block.nullifiers.count", body.created_nullifiers().len());
let this = self.clone();
let _block_proof = tokio::spawn(
async move {
let signed_block = SignedBlock::new_unchecked(header.clone(), body, signature); this.state
.apply_block(signed_block)
.inspect_err(|err| {
span.set_error(err);
})
.map_err(|err| {
let code = match err {
ApplyBlockError::InvalidBlockError(_) => tonic::Code::InvalidArgument,
_ => tonic::Code::Internal,
};
Status::new(code, err.as_report())
})
.and_then(|_| {
this.block_prover
.prove(ordered_batches, block_inputs, &header)
.map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))
})
.await
.map(Response::new)
}
.in_current_span(),
)
.await
.map_err(|err| {
tonic::Status::internal(err.as_report_context("joining apply_block task failed"))
})
.flatten()?;
Ok(Response::new(()))
}
async fn get_block_inputs(
&self,
request: Request<proto::store::BlockInputsRequest>,
) -> Result<Response<proto::store::BlockInputs>, Status> {
let request = request.into_inner();
let account_ids = read_account_ids::<Status>(&request.account_ids)?;
let nullifiers = validate_nullifiers(&request.nullifiers)
.map_err(|err| conversion_error_to_status(&err))?;
let unauthenticated_note_commitments =
validate_note_commitments(&request.unauthenticated_notes)?;
let reference_blocks = read_block_numbers(&request.reference_blocks);
let unauthenticated_note_commitments =
unauthenticated_note_commitments.into_iter().collect();
self.state
.get_block_inputs(
account_ids,
nullifiers,
unauthenticated_note_commitments,
reference_blocks,
)
.await
.map(proto::store::BlockInputs::from)
.map(Response::new)
.inspect_err(|err| tracing::Span::current().set_error(err))
.map_err(|err| tonic::Status::internal(err.as_report()))
}
async fn get_batch_inputs(
&self,
request: Request<proto::store::BatchInputsRequest>,
) -> Result<Response<proto::store::BatchInputs>, Status> {
let request = request.into_inner();
let note_commitments: Vec<Word> = try_convert(request.note_commitments)
.collect::<Result<_, _>>()
.map_err(|err| Status::invalid_argument(format!("Invalid note commitment: {err}")))?;
let reference_blocks: Vec<u32> =
try_convert::<_, Infallible, _, _>(request.reference_blocks)
.collect::<Result<Vec<_>, _>>()
.expect("operation should be infallible");
let reference_blocks = reference_blocks.into_iter().map(BlockNumber::from).collect();
self.state
.get_batch_inputs(reference_blocks, note_commitments.into_iter().collect())
.await
.map(Into::into)
.map(Response::new)
.inspect_err(|err| tracing::Span::current().set_error(err))
.map_err(|err| tonic::Status::internal(err.as_report()))
}
async fn get_transaction_inputs(
&self,
request: Request<proto::store::TransactionInputsRequest>,
) -> Result<Response<proto::store::TransactionInputs>, Status> {
let request = request.into_inner();
let account_id = read_account_id::<Status>(request.account_id)?;
let nullifiers = validate_nullifiers(&request.nullifiers)
.map_err(|err| conversion_error_to_status(&err))?;
let unauthenticated_note_commitments =
validate_note_commitments(&request.unauthenticated_notes)?;
let tx_inputs = self
.state
.get_transaction_inputs(account_id, &nullifiers, unauthenticated_note_commitments)
.await
.inspect_err(|err| tracing::Span::current().set_error(err))
.map_err(|err| tonic::Status::internal(err.as_report()))?;
let block_height = self.state.latest_block_num().await.as_u32();
Ok(Response::new(proto::store::TransactionInputs {
account_state: Some(proto::store::transaction_inputs::AccountTransactionInputRecord {
account_id: Some(account_id.into()),
account_commitment: Some(tx_inputs.account_commitment.into()),
}),
nullifiers: tx_inputs
.nullifiers
.into_iter()
.map(|nullifier| {
proto::store::transaction_inputs::NullifierTransactionInputRecord {
nullifier: Some(nullifier.nullifier.into()),
block_num: nullifier.block_num.as_u32(),
}
})
.collect(),
found_unauthenticated_notes: tx_inputs
.found_unauthenticated_notes
.into_iter()
.map(Into::into)
.collect(),
new_account_id_prefix_is_unique: tx_inputs.new_account_id_prefix_is_unique,
block_height,
}))
}
}