use alloy_primitives::Address;
use alloy_provider::{Provider, ProviderBuilder};
use blueprint_client_tangle::{TangleClientConfig, TangleSettings};
use blueprint_core::info;
use clap::Parser;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use url::Url;
use blueprint_pricing_engine_lib::{
cleanup,
error::{PricingError, Result},
handle_blueprint_update, init_benchmark_cache, init_job_pricing_config, init_operator_signer,
init_pricing_config, init_subscription_pricing_config, init_tee_pricing_config,
load_operator_config,
service::blockchain::event::BlockchainEvent,
service::rpc::server::{JobPricingConfig, run_rpc_server_with_tee},
signer::QuoteSigningDomain,
spawn_event_processor, start_blockchain_listener, wait_for_shutdown,
};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Cli {
#[arg(short, long, value_name = "FILE", env = "OPERATOR_CONFIG_PATH")]
pub config: PathBuf,
#[arg(long, value_name = "FILE", env = "PRICING_CONFIG_PATH")]
pub pricing_config: PathBuf,
#[arg(long, value_name = "FILE", env = "JOB_PRICING_CONFIG_PATH")]
pub job_pricing_config: Option<PathBuf>,
#[arg(long, value_name = "URL", env = "OPERATOR_HTTP_RPC")]
pub http_rpc_endpoint: String,
#[arg(long, value_name = "URL", env = "OPERATOR_WS_RPC")]
pub ws_rpc_endpoint: String,
#[arg(long, env = "OPERATOR_BLUEPRINT_ID")]
pub blueprint_id: u64,
#[arg(long, env = "OPERATOR_SERVICE_ID")]
pub service_id: Option<u64>,
#[arg(long, env = "OPERATOR_TANGLE_CONTRACT")]
pub tangle_contract: String,
#[arg(long, env = "OPERATOR_STAKING_CONTRACT")]
pub staking_contract: String,
#[arg(long, env = "OPERATOR_STATUS_REGISTRY_CONTRACT")]
pub status_registry_contract: String,
#[arg(long, value_name = "LEVEL", env = "RUST_LOG", default_value = "info")]
pub log_level: String,
}
pub async fn run_app(cli: Cli) -> Result<()> {
info!("Starting Tangle Cloud Pricing Engine");
let config = load_operator_config(&cli.config).await?;
let tangle_contract = parse_address(&cli.tangle_contract)?;
let staking_contract = parse_address(&cli.staking_contract)?;
let status_registry_contract = parse_address(&cli.status_registry_contract)?;
if tangle_contract == Address::ZERO {
return Err(PricingError::Config(
"missing OPERATOR_TANGLE_CONTRACT (required for EIP-712 quote signatures)".to_string(),
));
}
let evm_settings = TangleSettings {
blueprint_id: cli.blueprint_id,
service_id: cli.service_id,
tangle_contract,
staking_contract,
status_registry_contract,
};
let http_rpc_endpoint = Url::parse(&cli.http_rpc_endpoint).map_err(|e| {
PricingError::Config(format!(
"invalid HTTP RPC endpoint {}: {e}",
cli.http_rpc_endpoint
))
})?;
let ws_rpc_endpoint = Url::parse(&cli.ws_rpc_endpoint).map_err(|e| {
PricingError::Config(format!(
"invalid WS RPC endpoint {}: {e}",
cli.ws_rpc_endpoint
))
})?;
let evm_config = TangleClientConfig::new(
http_rpc_endpoint,
ws_rpc_endpoint,
config.keystore_path.to_string_lossy().to_string(),
evm_settings,
);
let provider = ProviderBuilder::new()
.connect(cli.http_rpc_endpoint.as_str())
.await
.map_err(|e| PricingError::Config(format!("failed to connect HTTP RPC: {e}")))?;
let chain_id = provider
.get_chain_id()
.await
.map_err(|e| PricingError::Config(format!("failed to read chain id: {e}")))?;
let (event_tx, event_rx) = mpsc::channel::<BlockchainEvent>(100);
let listener_handle = start_blockchain_listener(evm_config, event_tx).await;
let benchmark_cache = init_benchmark_cache(&config).await?;
match benchmark_cache.get_profile(cli.blueprint_id) {
Ok(Some(_)) => {
info!(
"Benchmark profile already exists for blueprint {}",
cli.blueprint_id
);
}
_ => {
info!(
"No benchmark profile for blueprint {}, running initial benchmark...",
cli.blueprint_id
);
if let Err(e) =
handle_blueprint_update(cli.blueprint_id, benchmark_cache.clone(), config.clone())
.await
{
blueprint_core::error!(
"Initial benchmark failed for blueprint {}: {e}",
cli.blueprint_id
);
}
}
}
let pricing_config_path = cli.pricing_config.to_str().ok_or_else(|| {
PricingError::Config("pricing config path is not valid UTF-8".to_string())
})?;
let pricing_config = init_pricing_config(pricing_config_path).await?;
let job_pricing_config = match &cli.job_pricing_config {
Some(path) => init_job_pricing_config(path).await?,
None => {
info!("No job pricing config provided; GetJobPrice will return NOT_FOUND");
Arc::new(Mutex::new(JobPricingConfig::new()))
}
};
let subscription_config = init_subscription_pricing_config(pricing_config_path).await?;
let tee_config = init_tee_pricing_config(pricing_config_path).await?;
let operator_signer = init_operator_signer(
&config,
&config.keystore_path,
QuoteSigningDomain {
chain_id,
verifying_contract: tangle_contract,
},
)?;
info!("Operator signer initialized successfully");
let _event_processor = spawn_event_processor(event_rx, benchmark_cache.clone(), config.clone());
let server_handle = tokio::spawn(async move {
if let Err(e) = run_rpc_server_with_tee(
config,
benchmark_cache,
pricing_config,
job_pricing_config,
subscription_config,
operator_signer,
tee_config,
)
.await
{
blueprint_core::error!("gRPC server error: {}", e);
}
});
wait_for_shutdown().await;
cleanup(listener_handle).await;
server_handle.abort();
Ok(())
}
fn parse_address(input: &str) -> Result<Address> {
Address::from_str(input)
.map_err(|e| PricingError::Config(format!("invalid address {}: {e}", input)))
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
run_app(cli).await
}