use std::net::SocketAddr;
use std::time::Duration;
use http::header::{ACCEPT, CONTENT_TYPE};
use http::{HeaderMap, HeaderValue};
use miden_node_proto::clients::{Builder, RpcClient};
use miden_node_proto::generated::rpc::api_client::ApiClient as ProtoClient;
use miden_node_proto::generated::{self as proto};
use miden_node_store::Store;
use miden_node_store::genesis::config::GenesisConfig;
use miden_node_utils::fee::test_fee;
use miden_node_utils::limiter::{
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteIdLimit,
QueryParamNullifierLimit,
};
use miden_protocol::Word;
use miden_protocol::account::delta::AccountUpdateDetails;
use miden_protocol::account::{
Account,
AccountBuilder,
AccountDelta,
AccountId,
AccountIdVersion,
AccountStorageMode,
AccountType,
};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::testing::noop_auth_component::NoopAuthComponent;
use miden_protocol::transaction::{ProvenTransaction, ProvenTransactionBuilder};
use miden_protocol::utils::Serializable;
use miden_protocol::vm::ExecutionProof;
use miden_standards::account::wallets::BasicWallet;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::runtime::{self, Runtime};
use tokio::task;
use url::Url;
use crate::Rpc;
const DELTA_COMMITMENT_BYTE_OFFSET: usize = 15 + 32 + 32;
fn build_test_account(seed: [u8; 32]) -> (Account, AccountDelta) {
let account = AccountBuilder::new(seed)
.account_type(AccountType::RegularAccountImmutableCode)
.storage_mode(AccountStorageMode::Public)
.with_assets(vec![])
.with_component(BasicWallet)
.with_auth_component(NoopAuthComponent)
.build_existing()
.unwrap();
let delta: AccountDelta = account.clone().try_into().unwrap();
(account, delta)
}
fn build_test_proven_tx(account: &Account, delta: &AccountDelta) -> ProvenTransaction {
let account_id = AccountId::dummy(
[0; 15],
AccountIdVersion::Version0,
AccountType::RegularAccountImmutableCode,
AccountStorageMode::Public,
);
ProvenTransactionBuilder::new(
account_id,
[8; 32].try_into().unwrap(),
account.to_commitment(),
delta.to_commitment(),
0.into(),
Word::default(),
test_fee(),
u32::MAX.into(),
ExecutionProof::new_dummy(),
)
.account_update_details(AccountUpdateDetails::Delta(delta.clone()))
.build()
.unwrap()
}
#[tokio::test]
async fn rpc_server_accepts_requests_without_accept_header() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let mut rpc_client = {
let endpoint = tonic::transport::Endpoint::try_from(format!("http://{rpc_addr}")).unwrap();
ProtoClient::connect(endpoint).await.unwrap()
};
let request = proto::rpc::BlockHeaderByNumberRequest {
block_num: Some(0),
include_mmr_proof: None,
};
let response = rpc_client.get_block_header_by_number(request).await;
assert!(response.is_ok());
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_accepts_requests_with_accept_header() {
let (mut rpc_client, _, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let response = send_request(&mut rpc_client).await;
assert!(response.is_ok());
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_requests_with_accept_header_invalid_version() {
for version in ["1.9.0", "0.8.1", "0.8.0", "0.999.0", "99.0.0"] {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) =
start_store(store_listener).await;
let url = rpc_addr.to_string();
let url = Url::parse(format!("http://{}", &url).as_str()).unwrap();
let mut rpc_client: RpcClient = Builder::new(url)
.without_tls()
.with_timeout(Duration::from_secs(10))
.with_metadata_version(version.to_string())
.without_metadata_genesis()
.without_otel_context_injection()
.connect::<RpcClient>()
.await
.unwrap();
let response = send_request(&mut rpc_client).await;
assert!(response.is_err());
assert_eq!(response.as_ref().err().unwrap().code(), tonic::Code::InvalidArgument);
assert!(response.as_ref().err().unwrap().message().contains("server does not support"),);
shutdown_store(store_runtime).await;
}
}
#[tokio::test]
async fn rpc_startup_is_robust_to_network_failures() {
let (mut rpc_client, _, store_listener) = start_rpc().await;
let response = send_request(&mut rpc_client).await;
assert!(response.is_err());
let (store_runtime, data_directory, _genesis, store_addr) = start_store(store_listener).await;
let response = send_request(&mut rpc_client).await;
assert!(response.unwrap().into_inner().block_header.is_some());
shutdown_store(store_runtime).await;
let response = send_request(&mut rpc_client).await;
assert!(response.is_err());
let store_runtime = restart_store(store_addr, data_directory.path()).await;
let response = send_request(&mut rpc_client).await;
assert_eq!(response.unwrap().into_inner().block_header.unwrap().block_num, 0);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_has_web_support() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let client = reqwest::Client::new();
let mut headers = HeaderMap::new();
let accept_header = concat!("application/vnd.miden; version=", env!("CARGO_PKG_VERSION"));
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/grpc-web+proto"));
headers.insert(ACCEPT, HeaderValue::from_static(accept_header));
let mut message = Vec::new();
message.push(0);
message.extend_from_slice(&0u32.to_be_bytes());
let response = client
.post(format!("http://{rpc_addr}/rpc.Api/Status"))
.headers(headers)
.body(message)
.send()
.await
.unwrap();
let headers = response.headers();
assert!(headers.get("access-control-allow-credentials").is_some());
assert!(headers.get("access-control-expose-headers").is_some());
assert!(headers.get("vary").is_some());
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_proven_transactions_with_invalid_commitment() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, genesis, _store_addr) = start_store(store_listener).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut rpc_client =
miden_node_proto::clients::Builder::new(Url::parse(&format!("http://{rpc_addr}")).unwrap())
.without_tls()
.with_timeout(Duration::from_secs(5))
.without_metadata_version()
.with_metadata_genesis(genesis.to_hex())
.without_otel_context_injection()
.connect_lazy::<miden_node_proto::clients::RpcClient>();
let (account, account_delta) = build_test_account([0; 32]);
let tx = build_test_proven_tx(&account, &account_delta);
let (other_account, _) = build_test_account([1; 32]);
let incorrect_delta: AccountDelta = other_account.try_into().unwrap();
let incorrect_commitment_bytes = incorrect_delta.to_commitment().as_bytes();
let mut tx_bytes = tx.to_bytes();
tx_bytes[DELTA_COMMITMENT_BYTE_OFFSET..DELTA_COMMITMENT_BYTE_OFFSET + 32]
.copy_from_slice(&incorrect_commitment_bytes);
let request = proto::transaction::ProvenTransaction {
transaction: tx_bytes,
transaction_inputs: None,
};
let response = rpc_client.submit_proven_transaction(request).await;
assert!(response.is_err());
let err = response.as_ref().unwrap_err().message();
assert!(
err.contains("failed to validate account delta in transaction account update"),
"expected error message to contain delta commitment error but got: {err}"
);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn rpc_server_rejects_tx_submissions_without_genesis() {
let (_, rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let mut rpc_client =
miden_node_proto::clients::Builder::new(Url::parse(&format!("http://{rpc_addr}")).unwrap())
.without_tls()
.with_timeout(Duration::from_secs(5))
.without_metadata_version()
.without_metadata_genesis()
.without_otel_context_injection()
.connect_lazy::<miden_node_proto::clients::RpcClient>();
let (account, account_delta) = build_test_account([0; 32]);
let tx = build_test_proven_tx(&account, &account_delta);
let request = proto::transaction::ProvenTransaction {
transaction: tx.to_bytes(),
transaction_inputs: None,
};
let response = rpc_client.submit_proven_transaction(request).await;
assert!(response.is_err());
let err = response.as_ref().unwrap_err().message();
assert!(
err.contains(
"server does not support any of the specified application/vnd.miden content types"
),
"expected error message to reference incompatible content media types but got: {err:?}"
);
shutdown_store(store_runtime).await;
}
async fn send_request(
rpc_client: &mut RpcClient,
) -> std::result::Result<tonic::Response<proto::rpc::BlockHeaderByNumberResponse>, tonic::Status> {
let request = proto::rpc::BlockHeaderByNumberRequest {
block_num: Some(0),
include_mmr_proof: None,
};
rpc_client.get_block_header_by_number(request).await
}
async fn start_rpc() -> (RpcClient, std::net::SocketAddr, TcpListener) {
let store_listener = TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port");
let store_addr = store_listener.local_addr().expect("store should get a local address");
let block_producer_addr = {
let block_producer_listener =
TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind block-producer");
block_producer_listener
.local_addr()
.expect("Failed to get block-producer address")
};
let rpc_listener = TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind rpc");
let rpc_addr = rpc_listener.local_addr().expect("Failed to get rpc address");
task::spawn(async move {
let store_url = Url::parse(&format!("http://{store_addr}")).unwrap();
let block_producer_url = Url::parse(&format!("http://{block_producer_addr}")).unwrap();
let validator_url = Url::parse("http://127.0.0.1:0").unwrap();
Rpc {
listener: rpc_listener,
store_url,
block_producer_url: Some(block_producer_url),
validator_url,
grpc_timeout: Duration::from_secs(30),
}
.serve()
.await
.expect("Failed to start serving store");
});
let url = rpc_addr.to_string();
let url = Url::parse(format!("http://{}", &url).as_str()).unwrap();
let rpc_client: RpcClient = Builder::new(url)
.without_tls()
.with_timeout(Duration::from_secs(10))
.without_metadata_version()
.without_metadata_genesis()
.without_otel_context_injection()
.connect::<RpcClient>()
.await
.expect("Failed to build client");
(rpc_client, rpc_addr, store_listener)
}
async fn start_store(store_listener: TcpListener) -> (Runtime, TempDir, Word, SocketAddr) {
let data_directory = tempfile::tempdir().expect("tempdir should be created");
let config = GenesisConfig::default();
let signer = SecretKey::new();
let (genesis_state, _) = config.into_state(signer).unwrap();
Store::bootstrap(genesis_state.clone(), data_directory.path())
.await
.expect("store should bootstrap");
let dir = data_directory.path().to_path_buf();
let store_addr =
store_listener.local_addr().expect("store listener should get a local address");
let rpc_listener = store_listener;
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store ntx-builder gRPC endpoint");
let block_producer_listener =
TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port");
let store_runtime =
runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap();
store_runtime.spawn(async move {
Store {
rpc_listener,
block_prover_url: None,
ntx_builder_listener,
block_producer_listener,
data_directory: dir,
grpc_timeout: Duration::from_secs(30),
}
.serve()
.await
.expect("store should start serving");
});
(
store_runtime,
data_directory,
genesis_state.into_block().await.unwrap().inner().header().commitment(),
store_addr,
)
}
async fn shutdown_store(store_runtime: Runtime) {
task::spawn_blocking(move || store_runtime.shutdown_timeout(Duration::from_secs(3)))
.await
.expect("shutdown should complete");
tokio::time::sleep(Duration::from_millis(200)).await;
}
async fn restart_store(store_addr: SocketAddr, data_directory: &std::path::Path) -> Runtime {
let rpc_listener = TcpListener::bind(store_addr).await.expect("Failed to bind store");
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store ntx-builder gRPC endpoint");
let block_producer_listener =
TcpListener::bind("127.0.0.1:0").await.expect("store should bind a port");
let dir = data_directory.to_path_buf();
let store_runtime =
runtime::Builder::new_multi_thread().enable_time().enable_io().build().unwrap();
store_runtime.spawn(async move {
Store {
rpc_listener,
block_prover_url: None,
ntx_builder_listener,
block_producer_listener,
data_directory: dir,
grpc_timeout: Duration::from_secs(10),
}
.serve()
.await
.expect("store should start serving");
});
store_runtime
}
#[tokio::test]
async fn get_limits_endpoint() {
let (mut rpc_client, _rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let response = rpc_client.get_limits(()).await.expect("get_limits should succeed");
let limits = response.into_inner();
assert!(!limits.endpoints.is_empty(), "endpoints should not be empty");
let check_nullifiers =
limits.endpoints.get("CheckNullifiers").expect("CheckNullifiers should exist");
assert_eq!(
check_nullifiers.parameters.get(QueryParamNullifierLimit::PARAM_NAME),
Some(&(QueryParamNullifierLimit::LIMIT as u32)),
"CheckNullifiers {} limit should be {}",
QueryParamNullifierLimit::PARAM_NAME,
QueryParamNullifierLimit::LIMIT
);
let sync_transactions =
limits.endpoints.get("SyncTransactions").expect("SyncTransactions should exist");
assert_eq!(
sync_transactions.parameters.get(QueryParamAccountIdLimit::PARAM_NAME),
Some(&(QueryParamAccountIdLimit::LIMIT as u32)),
"SyncTransactions {} limit should be {}",
QueryParamAccountIdLimit::PARAM_NAME,
QueryParamAccountIdLimit::LIMIT
);
assert!(
!limits.endpoints.contains_key("SyncAccountVault"),
"SyncAccountVault should not have list parameter limits"
);
assert!(
!limits.endpoints.contains_key("SyncAccountStorageMaps"),
"SyncAccountStorageMaps should not have list parameter limits"
);
let get_notes_by_id = limits.endpoints.get("GetNotesById").expect("GetNotesById should exist");
assert_eq!(
get_notes_by_id.parameters.get(QueryParamNoteIdLimit::PARAM_NAME),
Some(&(QueryParamNoteIdLimit::LIMIT as u32)),
"GetNotesById {} limit should be {}",
QueryParamNoteIdLimit::PARAM_NAME,
QueryParamNoteIdLimit::LIMIT
);
shutdown_store(store_runtime).await;
}
#[tokio::test]
async fn sync_chain_mmr_returns_delta() {
let (mut rpc_client, _rpc_addr, store_listener) = start_rpc().await;
let (store_runtime, _data_directory, _genesis, _store_addr) = start_store(store_listener).await;
let request = proto::rpc::SyncChainMmrRequest {
block_range: Some(proto::rpc::BlockRange { block_from: 0, block_to: None }),
};
let response = rpc_client.sync_chain_mmr(request).await.expect("sync_chain_mmr should succeed");
let response = response.into_inner();
let mmr_delta = response.mmr_delta.expect("mmr_delta should exist");
assert_eq!(mmr_delta.forest, 0);
assert!(mmr_delta.data.is_empty());
shutdown_store(store_runtime).await;
}