use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::num::{NonZeroU32, NonZeroU64};
use std::time::Duration;
use http::header::{ACCEPT, CONTENT_TYPE};
use http::{HeaderMap, HeaderValue};
use miden_node_proto::clients::{Builder, GrpcClient, Interceptor, RpcClient};
use miden_node_proto::generated::rpc::api_client::ApiClient as ProtoClient;
use miden_node_proto::generated::{self as proto};
use miden_node_store::genesis::config::GenesisConfig;
use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, Store};
use miden_node_utils::clap::{GrpcOptionsExternal, GrpcOptionsInternal, StorageOptions};
use miden_node_utils::fee::test_fee;
use miden_node_utils::limiter::{
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteIdLimit,
QueryParamNullifierLimit,
};
use miden_protocol::Word;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::account::{
Account,
AccountBuilder,
AccountDelta,
AccountId,
AccountIdVersion,
AccountStorageMode,
AccountType,
};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::testing::noop_auth_component::NoopAuthComponent;
use miden_protocol::transaction::{ProvenTransaction, TxAccountUpdate};
use miden_protocol::utils::serde::Serializable;
use miden_protocol::vm::ExecutionProof;
use miden_standards::account::wallets::BasicWallet;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::runtime::{self, Runtime};
use tokio::task;
use url::Url;
use crate::Rpc;
const DELTA_COMMITMENT_BYTE_OFFSET: usize = 15 + 32 + 32;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
fn build_test_account(seed: [u8; 32]) -> (Account, AccountDelta) {
let account = AccountBuilder::new(seed)
.account_type(AccountType::RegularAccountImmutableCode)
.storage_mode(AccountStorageMode::Public)
.with_assets(vec![])
.with_component(BasicWallet)
.with_auth_component(NoopAuthComponent)
.build_existing()
.unwrap();
let delta: AccountDelta = account.clone().try_into().unwrap();
(account, delta)
}
fn build_test_proven_tx(
account: &Account,
delta: &AccountDelta,
genesis: Word,
) -> ProvenTransaction {
let account_id = AccountId::dummy(
[0; 15],
AccountIdVersion::Version0,
AccountType::RegularAccountImmutableCode,
AccountStorageMode::Public,
);
let account_update = TxAccountUpdate::new(
account_id,
[8; 32].try_into().unwrap(),
account.to_commitment(),
delta.to_commitment(),
AccountUpdateDetails::Delta(delta.clone()),
)
.unwrap();
ProvenTransaction::new(
account_update,
Vec::<miden_protocol::transaction::InputNoteCommitment>::new(),
Vec::<miden_protocol::transaction::OutputNote>::new(),
0.into(),
genesis,
test_fee(),
u32::MAX.into(),
ExecutionProof::new_dummy(),
)
.unwrap()
}
#[tokio::test]
async fn rpc_server_accepts_requests_without_accept_header() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let mut rpc_client = {
let endpoint = tonic::transport::Endpoint::try_from(format!("http://{rpc_addr}")).unwrap();
ProtoClient::connect(endpoint).await.unwrap()
};
let request = proto::rpc::BlockHeaderByNumberRequest {
block_num: Some(0),
include_mmr_proof: None,
};
let response = rpc_client.get_block_header_by_number(request).await;
assert!(response.is_ok());
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_rate_limits_per_ip() {
let grpc_options = GrpcOptionsExternal {
burst_size: NonZeroU32::new(8).unwrap(),
replenish_n_per_second_per_ip: NonZeroU64::new(1).unwrap(),
..GrpcOptionsExternal::test()
};
let (_, rpc_addr, store_listener) = start_rpc_with_options(grpc_options).await;
let (store_runtime, data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let url = rpc_addr.to_string();
let url = Url::parse(format!("http://{}", &url).as_str()).unwrap();
let mut rpc_client = connect_rpc(url.clone(), Some(IpAddr::V4(Ipv4Addr::LOCALHOST))).await;
let mut results = Vec::new();
let mut last_error = None;
for _ in 0..256 {
let result = send_request(&mut rpc_client).await;
if let Err(err) = &result {
last_error = Some(err.code());
}
results.push(result);
}
assert!(results.iter().any(std::result::Result::is_ok));
assert!(
last_error.is_some_and(|code| code == tonic::Code::ResourceExhausted),
"expected rate limit error but got: {last_error:?}"
);
shutdown_store(store_runtime).await;
drop(data_directory);
}
#[tokio::test]
async fn rpc_server_accepts_requests_with_accept_header() {
let (mut rpc_client, _, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let response = send_request(&mut rpc_client).await;
assert!(response.is_ok());
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_requests_with_accept_header_invalid_version() {
for version in ["1.9.0", "0.8.1", "0.8.0", "0.999.0", "99.0.0"] {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) =
start_store(store_listener).await;
let url = rpc_addr.to_string();
let url = Url::parse(format!("http://{}", &url).as_str()).unwrap();
let mut rpc_client: RpcClient = Builder::new(url)
.without_tls()
.with_timeout(Duration::from_secs(10))
.with_metadata_version(version.to_string())
.without_metadata_genesis()
.without_otel_context_injection()
.connect::<RpcClient>()
.await
.unwrap();
let response = send_request(&mut rpc_client).await;
assert!(response.is_err());
assert_eq!(response.as_ref().err().unwrap().code(), tonic::Code::InvalidArgument);
assert!(response.as_ref().err().unwrap().message().contains("server does not support"),);
shutdown_store(store_runtime).await;
}
}
#[tokio::test]
async fn rpc_startup_is_robust_to_network_failures() {
let (mut rpc_client, _, store_listener) = start_rpc().await;
let response = send_request(&mut rpc_client).await;
assert!(response.is_err());
let (store_runtime, data_directory, _genesis, store_addr) = start_store(store_listener).await;
let response = send_request(&mut rpc_client).await;
assert!(response.unwrap().into_inner().block_header.is_some());
shutdown_store(store_runtime).await;
let response = send_request(&mut rpc_client).await;
assert!(response.is_err());
let store_runtime = restart_store(store_addr, data_directory.path()).await;
let response = send_request(&mut rpc_client).await;
assert_eq!(response.unwrap().into_inner().block_header.unwrap().block_num, 0);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_has_web_support() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let client = reqwest::Client::new();
let mut headers = HeaderMap::new();
let accept_header = concat!("application/vnd.miden; version=", env!("CARGO_PKG_VERSION"));
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc-web+proto"));
headers.insert(ACCEPT, HeaderValue::from_static(accept_header));
let mut message = Vec::new();
message.push(0);
message.extend_from_slice(&0u32.to_be_bytes());
let response = client
.post(format!("http://{rpc_addr}/rpc.Api/Status"))
.headers(headers)
.body(message)
.send()
.await
.unwrap();
let headers = response.headers();
assert!(headers.get("access-control-allow-credentials").is_some());
assert!(headers.get("access-control-expose-headers").is_some());
assert!(headers.get("vary").is_some());
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_proven_transactions_with_invalid_commitment() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, genesis, _store_addr) = start_store(store_listener).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut rpc_client =
miden_node_proto::clients::Builder::new(Url::parse(&format!("http://{rpc_addr}")).unwrap())
.without_tls()
.with_timeout(Duration::from_secs(5))
.without_metadata_version()
.with_metadata_genesis(genesis.to_hex())
.without_otel_context_injection()
.connect_lazy::<miden_node_proto::clients::RpcClient>();
let (account, account_delta) = build_test_account([0; 32]);
let tx = build_test_proven_tx(&account, &account_delta, genesis);
let (other_account, _) = build_test_account([1; 32]);
let incorrect_delta: AccountDelta = other_account.try_into().unwrap();
let incorrect_commitment_bytes = incorrect_delta.to_commitment().as_bytes();
let mut tx_bytes = tx.to_bytes();
tx_bytes[DELTA_COMMITMENT_BYTE_OFFSET..DELTA_COMMITMENT_BYTE_OFFSET + 32]
.copy_from_slice(&incorrect_commitment_bytes);
let request = proto::transaction::ProvenTransaction {
transaction: tx_bytes,
transaction_inputs: None,
};
let response = rpc_client.submit_proven_transaction(request).await;
assert!(response.is_err());
let err = response.as_ref().unwrap_err().message();
assert!(
err.contains("failed to validate account delta in transaction account update"),
"expected error message to contain delta commitment error but got: {err}"
);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_proven_transactions_with_invalid_reference_block() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, genesis, _store_addr) = start_store(store_listener).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut rpc_client =
miden_node_proto::clients::Builder::new(Url::parse(&format!("http://{rpc_addr}")).unwrap())
.without_tls()
.with_timeout(Duration::from_secs(5))
.without_metadata_version()
.with_metadata_genesis(genesis.to_hex())
.without_otel_context_injection()
.connect_lazy::<miden_node_proto::clients::RpcClient>();
let invalid = Word::empty();
let (account, account_delta) = build_test_account([0; 32]);
let tx = build_test_proven_tx(&account, &account_delta, invalid);
let request = proto::transaction::ProvenTransaction {
transaction: tx.to_bytes(),
transaction_inputs: None,
};
let response = rpc_client.submit_proven_transaction(request).await;
assert!(response.is_err());
let err = response.as_ref().unwrap_err().message();
assert!(
err.contains("does not match the chain's commitment of"),
"expected error message to contain reference block error but got: {err}"
);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_tx_submissions_without_genesis() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, genesis, _store_addr) = start_store(store_listener).await;
let mut rpc_client =
miden_node_proto::clients::Builder::new(Url::parse(&format!("http://{rpc_addr}")).unwrap())
.without_tls()
.with_timeout(Duration::from_secs(5))
.without_metadata_version()
.without_metadata_genesis()
.without_otel_context_injection()
.connect_lazy::<miden_node_proto::clients::RpcClient>();
let (account, account_delta) = build_test_account([0; 32]);
let tx = build_test_proven_tx(&account, &account_delta, genesis);
let request = proto::transaction::ProvenTransaction {
transaction: tx.to_bytes(),
transaction_inputs: None,
};
let response = rpc_client.submit_proven_transaction(request).await;
assert!(response.is_err());
let err = response.as_ref().unwrap_err().message();
assert!(
err.contains(
"server does not support any of the specified application/vnd.miden content types"
),
"expected error message to reference incompatible content media types but got: {err:?}"
);
shutdown_store(store_runtime).await;
}
async fn send_request(
rpc_client: &mut RpcClient,
) -> std::result::Result<tonic::Response<proto::rpc::BlockHeaderByNumberResponse>, tonic::Status> {
let request = proto::rpc::BlockHeaderByNumberRequest {
block_num: Some(0),
include_mmr_proof: None,
};
rpc_client.get_block_header_by_number(request).await
}
async fn connect_rpc(url: Url, local_address: Option<IpAddr>) -> RpcClient {
let mut endpoint = tonic::transport::Endpoint::from_shared(url.to_string())
.expect("Url type always results in valid endpoint")
.timeout(REQUEST_TIMEOUT);
if let Some(local_address) = local_address {
endpoint = endpoint.local_address(Some(local_address));
}
let channel = endpoint.connect().await.expect("Failed to build channel");
let interceptor = Interceptor::default();
RpcClient::with_interceptor(channel, interceptor)
}
async fn start_rpc() -> (RpcClient, std::net::SocketAddr, TcpListener) {
start_rpc_with_options(GrpcOptionsExternal::test()).await
}
async fn start_rpc_with_options(
grpc_options: GrpcOptionsExternal,
) -> (RpcClient, std::net::SocketAddr, TcpListener) {
let store_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port");
let store_addr = store_listener.local_addr().expect("store should get a local address");
let block_producer_addr = {
let block_producer_listener =
TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind block-producer");
block_producer_listener
.local_addr()
.expect("Failed to get block-producer address")
};
let rpc_listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind rpc");
let rpc_addr = rpc_listener.local_addr().expect("Failed to get rpc address");
task::spawn(async move {
let store_url = Url::parse(&format!("http://{store_addr}")).unwrap();
let block_producer_url = Url::parse(&format!("http://{block_producer_addr}")).unwrap();
let validator_url = Url::parse("http://127.0.0.1:0").unwrap();
Rpc {
listener: rpc_listener,
store_url,
block_producer_url: Some(block_producer_url),
validator_url,
ntx_builder_url: None,
grpc_options,
}
.serve()
.await
.expect("Failed to start serving store");
});
let url = rpc_addr.to_string();
let url = Url::parse(format!("http://{}", &url).as_str()).unwrap();
let rpc_client = connect_rpc(url, None).await;
(rpc_client, rpc_addr, store_listener)
}
async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, SocketAddr) {
let data_directory = tempfile::tempdir().expect("tempdir should be created");
let config = GenesisConfig::default();
let signer = SecretKey::new();
let (genesis_state, _) = config.into_state(signer).unwrap();
let genesis_block = genesis_state
.clone()
.into_block()
.await
.expect("genesis block should be created");
Store::bootstrap(genesis_block, data_directory.path()).expect("store should bootstrap");
let dir = data_directory.path().to_path_buf();
let store_addr =
store_listener.local_addr().expect("store listener should get a local address");
let rpc_listener = store_listener;
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store ntx-builder gRPC endpoint");
let block_producer_listener =
TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port");
let store_runtime =
runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap();
store_runtime.spawn(async move {
Store {
rpc_listener,
block_prover_url: None,
ntx_builder_listener,
block_producer_listener,
data_directory: dir,
grpc_options: GrpcOptionsInternal::test(),
max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS,
storage_options: StorageOptions::default(),
}
.serve()
.await
.expect("store should start serving");
});
(
store_runtime,
data_directory,
genesis_state.into_block().await.unwrap().inner().header().commitment(),
store_addr,
)
}
async fn shutdown_store(store_runtime: Runtime) {
task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_secs(3)))
.await
.expect("shutdown should complete");
tokio::time::sleep(Duration::from_millis(200)).await;
}
async fn restart_store(store_addr: SocketAddr, data_directory: &std::path::Path) -> Runtime {
let rpc_listener = TcpListener::bind(store_addr).await.expect("Failed to bind store");
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store ntx-builder gRPC endpoint");
let block_producer_listener =
TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port");
let dir = data_directory.to_path_buf();
let store_runtime =
runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap();
store_runtime.spawn(async move {
Store {
rpc_listener,
block_prover_url: None,
ntx_builder_listener,
block_producer_listener,
data_directory: dir,
grpc_options: GrpcOptionsInternal::test(),
max_concurrent_proofs: DEFAULT_MAX_CONCURRENT_PROOFS,
storage_options: StorageOptions::default(),
}
.serve()
.await
.expect("store should start serving");
});
store_runtime
}
#[tokio::test]
async fn get_limits_endpoint() {
let (mut rpc_client, _rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let response = rpc_client.get_limits(()).await.expect("get_limits should succeed");
let limits = response.into_inner();
assert!(!limits.endpoints.is_empty(), "endpoints should not be empty");
let check_nullifiers =
limits.endpoints.get("CheckNullifiers").expect("CheckNullifiers should exist");
assert_eq!(
check_nullifiers.parameters.get(QueryParamNullifierLimit::PARAM_NAME),
Some(&(QueryParamNullifierLimit::LIMIT as u32)),
"CheckNullifiers {} limit should be {}",
QueryParamNullifierLimit::PARAM_NAME,
QueryParamNullifierLimit::LIMIT
);
let sync_transactions =
limits.endpoints.get("SyncTransactions").expect("SyncTransactions should exist");
assert_eq!(
sync_transactions.parameters.get(QueryParamAccountIdLimit::PARAM_NAME),
Some(&(QueryParamAccountIdLimit::LIMIT as u32)),
"SyncTransactions {} limit should be {}",
QueryParamAccountIdLimit::PARAM_NAME,
QueryParamAccountIdLimit::LIMIT
);
assert!(
!limits.endpoints.contains_key("SyncAccountVault"),
"SyncAccountVault should not have list parameter limits"
);
assert!(
!limits.endpoints.contains_key("SyncAccountStorageMaps"),
"SyncAccountStorageMaps should not have list parameter limits"
);
let get_notes_by_id = limits.endpoints.get("GetNotesById").expect("GetNotesById should exist");
assert_eq!(
get_notes_by_id.parameters.get(QueryParamNoteIdLimit::PARAM_NAME),
Some(&(QueryParamNoteIdLimit::LIMIT as u32)),
"GetNotesById {} limit should be {}",
QueryParamNoteIdLimit::PARAM_NAME,
QueryParamNoteIdLimit::LIMIT
);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn sync_chain_mmr_returns_delta() {
let (mut rpc_client, _rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let request = proto::rpc::SyncChainMmrRequest {
block_range: Some(proto::rpc::BlockRange { block_from: 0, block_to: None }),
finality: proto::rpc::Finality::Committed.into(),
};
let response = rpc_client.sync_chain_mmr(request).await.expect("sync_chain_mmr should succeed");
let response = response.into_inner();
let mmr_delta = response.mmr_delta.expect("mmr_delta should exist");
assert_eq!(mmr_delta.forest, 0);
assert!(mmr_delta.data.is_empty());
shutdown_store(store_runtime).await;
}
#[test]
fn sync_chain_mmr_block_header_matches_chain_commitment() {
use miden_protocol::block::BlockHeader;
use miden_protocol::crypto::merkle::mmr::{Forest, Mmr, MmrPeaks, PartialMmr};
let mut server_mmr = Mmr::new();
let mut headers = Vec::new();
for i in 0..5u32 {
let chain_commitment = server_mmr.peaks().hash_peaks();
let header = BlockHeader::mock(i, Some(chain_commitment), None, &[], Word::default());
server_mmr.add(header.commitment());
headers.push(header);
}
let mut client_mmr = PartialMmr::from_peaks(MmrPeaks::new(Forest::new(0), vec![]).unwrap());
client_mmr.add(headers[0].commitment(), false);
let delta = server_mmr.get_delta(Forest::new(1), Forest::new(2)).unwrap();
client_mmr.apply(delta).unwrap();
assert_eq!(client_mmr.peaks().hash_peaks(), headers[2].chain_commitment());
client_mmr.add(headers[2].commitment(), false);
let delta = server_mmr.get_delta(Forest::new(3), Forest::new(4)).unwrap();
client_mmr.apply(delta).unwrap();
assert_eq!(client_mmr.peaks().hash_peaks(), headers[4].chain_commitment());
client_mmr.add(headers[4].commitment(), false);
assert_eq!(client_mmr.peaks().hash_peaks(), server_mmr.peaks().hash_peaks());
}