use clap::Parser;
use miden_remote_prover::COMPONENT;
use miden_remote_prover::error::RemoteProverError;
use pingora::apps::HttpServerOptions;
use pingora::prelude::{Opt, background_service};
use pingora::server::Server;
use pingora::server::configuration::ServerConf;
use pingora::services::listening::Service;
use pingora_proxy::http_proxy_service;
use tracing::{info, warn};
use super::ProxyConfig;
use crate::commands::PROXY_HOST;
use crate::proxy::update_workers::LoadBalancerUpdateService;
use crate::proxy::{LoadBalancer, LoadBalancerState};
use crate::utils::check_port_availability;
#[derive(Debug, Parser)]
pub struct StartProxy {
#[arg(long, env = "MRP_PROXY_WORKERS_LIST", value_delimiter = ',')]
workers: Vec<String>,
#[command(flatten)]
proxy_config: ProxyConfig,
}
impl StartProxy {
#[tracing::instrument(target = COMPONENT, name = "proxy.execute")]
pub async fn execute(&self) -> anyhow::Result<()> {
check_port_availability(self.proxy_config.port, "Proxy")?;
check_port_availability(self.proxy_config.control_port, "Control")?;
if let Some(metrics_port) = self.proxy_config.metrics_config.metrics_port {
check_port_availability(metrics_port, "Metrics")?;
}
let mut conf = ServerConf::new().ok_or(RemoteProverError::PingoraConfigFailed(
"Failed to create server conf".to_string(),
))?;
conf.grace_period_seconds = Some(self.proxy_config.grace_period.as_secs());
conf.graceful_shutdown_timeout_seconds =
Some(self.proxy_config.graceful_shutdown_timeout.as_secs());
let mut server = Server::new_with_opt_and_conf(Some(Opt::default()), conf);
server.bootstrap();
if self.workers.is_empty() {
warn!(target: COMPONENT, "Starting proxy without any workers");
} else {
info!(target: COMPONENT,
worker_count = %self.workers.len(),
workers = ?self.workers,
"Proxy starting with workers"
);
}
let worker_lb = LoadBalancerState::new(self.workers.clone(), &self.proxy_config).await?;
let health_check_service = background_service("health_check", worker_lb);
let worker_lb = health_check_service.task();
let updater_service = LoadBalancerUpdateService::new(worker_lb.clone());
let mut update_workers_service =
Service::new("update_workers".to_string(), updater_service);
update_workers_service
.add_tcp(format!("{}:{}", PROXY_HOST, self.proxy_config.control_port).as_str());
let mut lb = http_proxy_service(&server.configuration, LoadBalancer(worker_lb.clone()));
lb.add_tcp(format!("{}:{}", PROXY_HOST, self.proxy_config.port).as_str());
info!(target: COMPONENT,
endpoint = %format!("{}:{}", PROXY_HOST, self.proxy_config.port),
"Proxy service listening"
);
let logic = lb
.app_logic_mut()
.ok_or(RemoteProverError::PingoraConfigFailed("app logic not found".to_string()))?;
let mut http_server_options = HttpServerOptions::default();
http_server_options.h2c = true;
logic.server_options = Some(http_server_options);
if let Some(metrics_port) = self.proxy_config.metrics_config.metrics_port {
let metrics_addr = format!("{PROXY_HOST}:{metrics_port}");
info!(target: COMPONENT,
endpoint = %metrics_addr,
"Metrics service initialized"
);
let mut prometheus_service =
pingora::services::listening::Service::prometheus_http_service();
prometheus_service.add_tcp(&metrics_addr);
server.add_service(prometheus_service);
} else {
info!(target: COMPONENT, "Metrics service disabled");
}
server.add_service(health_check_service);
server.add_service(update_workers_service);
server.add_service(lb);
tokio::task::spawn_blocking(|| server.run_forever()).await?;
Ok(())
}
}