use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use anyhow::Context;
use miden_node_block_producer::BlockProducer;
use miden_node_rpc::Rpc;
use miden_node_store::{DEFAULT_MAX_CONCURRENT_PROOFS, Store};
use miden_node_utils::clap::{GrpcOptionsExternal, StorageOptions};
use miden_node_utils::grpc::UrlExt;
use miden_node_validator::{Validator, ValidatorSigner};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey;
use miden_protocol::utils::serde::Deserializable;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use url::Url;
use super::{ENV_DATA_DIRECTORY, ENV_RPC_URL};
use crate::commands::{
BlockProducerConfig,
BundledValidatorConfig,
ENV_BLOCK_PROVER_URL,
ENV_ENABLE_OTEL,
ENV_GENESIS_CONFIG_FILE,
NtxBuilderConfig,
ValidatorKey,
};
#[derive(clap::Subcommand)]
#[expect(clippy::large_enum_variant, reason = "This is a single use enum")]
pub enum BundledCommand {
Bootstrap {
#[arg(long, env = ENV_DATA_DIRECTORY, value_name = "DIR")]
data_directory: PathBuf,
#[arg(long, value_name = "DIR")]
accounts_directory: PathBuf,
#[arg(long, env = ENV_GENESIS_CONFIG_FILE, value_name = "FILE")]
genesis_config_file: Option<PathBuf>,
#[command(flatten)]
validator_key: ValidatorKey,
},
Start {
#[arg(long = "rpc.url", env = ENV_RPC_URL, value_name = "URL")]
rpc_url: Url,
#[arg(long = "block-prover.url", env = ENV_BLOCK_PROVER_URL, value_name = "URL")]
block_prover_url: Option<Url>,
#[arg(long = "data-directory", env = ENV_DATA_DIRECTORY, value_name = "DIR")]
data_directory: PathBuf,
#[command(flatten)]
block_producer: BlockProducerConfig,
#[command(flatten)]
ntx_builder: NtxBuilderConfig,
#[command(flatten)]
validator: BundledValidatorConfig,
#[arg(long = "enable-otel", default_value_t = false, env = ENV_ENABLE_OTEL, value_name = "BOOL")]
enable_otel: bool,
#[arg(
long = "max-concurrent-proofs",
default_value_t = DEFAULT_MAX_CONCURRENT_PROOFS,
value_name = "NUM"
)]
max_concurrent_proofs: NonZeroUsize,
#[command(flatten)]
grpc_options: GrpcOptionsExternal,
#[command(flatten)]
storage_options: StorageOptions,
},
}
impl BundledCommand {
pub async fn handle(self) -> anyhow::Result<()> {
match self {
BundledCommand::Bootstrap {
data_directory,
accounts_directory,
genesis_config_file,
validator_key,
} => {
crate::commands::validator::ValidatorCommand::bootstrap_genesis(
&data_directory,
&accounts_directory,
&data_directory,
genesis_config_file.as_ref(),
validator_key,
)
.await
.context("failed to bootstrap genesis block")?;
let genesis_block_path =
data_directory.join(crate::commands::validator::GENESIS_BLOCK_FILENAME);
crate::commands::store::bootstrap_store(&data_directory, &genesis_block_path)
.context("failed to bootstrap the store component")
},
BundledCommand::Start {
rpc_url,
block_prover_url,
data_directory,
block_producer,
ntx_builder,
validator,
enable_otel: _,
grpc_options,
max_concurrent_proofs,
storage_options,
} => {
Self::start(
rpc_url,
block_prover_url,
data_directory,
block_producer,
ntx_builder,
validator,
grpc_options,
max_concurrent_proofs,
storage_options,
)
.await
},
}
}
#[expect(clippy::too_many_lines, clippy::too_many_arguments)]
async fn start(
rpc_url: Url,
block_prover_url: Option<Url>,
data_directory: PathBuf,
block_producer: BlockProducerConfig,
ntx_builder: NtxBuilderConfig,
validator: BundledValidatorConfig,
grpc_options: GrpcOptionsExternal,
max_concurrent_proofs: NonZeroUsize,
storage_options: StorageOptions,
) -> anyhow::Result<()> {
let grpc_rpc = rpc_url.to_socket().context("Failed to to RPC gRPC socket")?;
let grpc_rpc = TcpListener::bind(grpc_rpc)
.await
.context("Failed to bind to RPC gRPC endpoint")?;
let (block_producer_url, block_producer_address) = {
let socket_addr = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to block-producer gRPC endpoint")?
.local_addr()
.context("Failed to retrieve the block-producer's gRPC address")?;
let url = Url::parse(&format!("http://{socket_addr}"))
.context("Failed to parse Block Producer URL")?;
(url, socket_addr)
};
let (validator_url, validator_socket_address) = validator.to_addresses().await?;
let store_rpc_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to store RPC gRPC endpoint")?;
let store_ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to store ntx-builder gRPC endpoint")?;
let store_block_producer_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to store block-producer gRPC endpoint")?;
let store_rpc_address = store_rpc_listener
.local_addr()
.context("Failed to retrieve the store's RPC gRPC address")?;
let store_block_producer_address = store_block_producer_listener
.local_addr()
.context("Failed to retrieve the store's block-producer gRPC address")?;
let store_ntx_builder_address = store_ntx_builder_listener
.local_addr()
.context("Failed to retrieve the store's ntx-builder gRPC address")?;
let mut join_set = JoinSet::new();
let data_directory_clone = data_directory.clone();
let store_id = join_set
.spawn(async move {
Store {
rpc_listener: store_rpc_listener,
block_producer_listener: store_block_producer_listener,
ntx_builder_listener: store_ntx_builder_listener,
data_directory: data_directory_clone,
block_prover_url,
grpc_options: grpc_options.into(),
max_concurrent_proofs,
storage_options,
}
.serve()
.await
.context("failed while serving store component")
})
.id();
let should_start_ntx_builder = !ntx_builder.disabled;
let block_producer_id = {
let validator_url = validator_url.clone();
join_set
.spawn({
let store_url = Url::parse(&format!("http://{store_block_producer_address}"))
.context("Failed to parse URL")?;
async move {
BlockProducer {
block_producer_address,
store_url,
validator_url,
batch_prover_url: block_producer.batch_prover_url,
batch_interval: block_producer.batch_interval,
block_interval: block_producer.block_interval,
max_batches_per_block: block_producer.max_batches_per_block,
max_txs_per_batch: block_producer.max_txs_per_batch,
grpc_options: grpc_options.into(),
mempool_tx_capacity: block_producer.mempool_tx_capacity,
}
.serve()
.await
.context("failed while serving block-producer component")
}
})
.id()
};
let mut ntx_builder_url_for_rpc = None;
let ntx_builder_prepared = if should_start_ntx_builder {
let store_ntx_builder_url = Url::parse(&format!("http://{store_ntx_builder_address}"))
.context("Failed to parse URL")?;
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();
let builder_config = ntx_builder.into_builder_config(
store_ntx_builder_url,
block_producer_url,
validator_url,
&data_directory,
);
let ntx_builder_listener = TcpListener::bind("127.0.0.1:0")
.await
.context("Failed to bind to ntx-builder gRPC endpoint")?;
let ntx_builder_address = ntx_builder_listener
.local_addr()
.context("Failed to retrieve the ntx-builder's gRPC address")?;
ntx_builder_url_for_rpc = Some(
Url::parse(&format!("http://{ntx_builder_address}"))
.context("Failed to parse ntx-builder URL")?,
);
Some((builder_config, ntx_builder_listener))
} else {
None
};
let rpc_id = {
let block_producer_url = block_producer_url.clone();
let validator_url = validator_url.clone();
let ntx_builder_url = ntx_builder_url_for_rpc;
join_set
.spawn(async move {
let store_url = Url::parse(&format!("http://{store_rpc_address}"))
.context("Failed to parse URL")?;
Rpc {
listener: grpc_rpc,
store_url,
block_producer_url: Some(block_producer_url),
validator_url,
ntx_builder_url,
grpc_options,
}
.serve()
.await
.context("failed while serving RPC component")
})
.id()
};
let mut component_ids = HashMap::from([
(store_id, "store"),
(block_producer_id, "block-producer"),
(rpc_id, "rpc"),
]);
if let Some((builder_config, ntx_builder_listener)) = ntx_builder_prepared {
let id = join_set
.spawn(async move {
builder_config
.build()
.await
.context("failed to initialize ntx builder")?
.run(Some(ntx_builder_listener))
.await
.context("failed while serving ntx builder component")
})
.id();
component_ids.insert(id, "ntx-builder");
}
if let Some(address) = validator_socket_address {
let secret_key_bytes = hex::decode(validator.validator_key)?;
let signer = SecretKey::read_from_bytes(&secret_key_bytes)?;
let signer = ValidatorSigner::new_local(signer);
let id = join_set
.spawn({
async move {
Validator {
address,
grpc_options: grpc_options.into(),
signer,
data_directory,
}
.serve()
.await
.context("failed while serving validator component")
}
})
.id();
component_ids.insert(id, "validator");
}
let component_result = join_set.join_next_with_id().await.unwrap();
let (id, err) = match component_result {
Ok((id, Ok(_))) => (id, Err(anyhow::anyhow!("Component completed unexpectedly"))),
Ok((id, Err(err))) => (id, Err(err)),
Err(join_err) => (join_err.id(), Err(join_err).context("Joining component task")),
};
let component = component_ids.get(&id).unwrap_or(&"unknown");
err.context(format!("Component {component} failed"))
}
pub fn is_open_telemetry_enabled(&self) -> bool {
if let Self::Start { enable_otel, .. } = self {
*enable_otel
} else {
false
}
}
}