use core::fmt;
use std::sync::Arc;
use miden_node_utils::ErrorReport;
use miden_remote_prover::COMPONENT;
use pingora::apps::{HttpServerApp, HttpServerOptions};
use pingora::http::ResponseHeader;
use pingora::protocols::Stream;
use pingora::protocols::http::ServerSession;
use pingora::server::ShutdownWatch;
use tonic::async_trait;
use tracing::{error, info};
use super::LoadBalancerState;
use crate::commands::update_workers::UpdateWorkers;
use crate::utils::create_response_with_error_message;
pub(crate) struct LoadBalancerUpdateService {
lb_state: Arc<LoadBalancerState>,
server_opts: HttpServerOptions,
}
impl fmt::Debug for LoadBalancerUpdateService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LBUpdaterService")
.field("lb_state", &self.lb_state)
.finish_non_exhaustive()
}
}
impl LoadBalancerUpdateService {
pub(crate) fn new(lb_state: Arc<LoadBalancerState>) -> Self {
let mut server_opts = HttpServerOptions::default();
server_opts.h2c = true;
Self { lb_state, server_opts }
}
}
#[async_trait]
impl HttpServerApp for LoadBalancerUpdateService {
#[tracing::instrument(target = COMPONENT, name = "lb_updater_service.process_new_http", skip(http))]
async fn process_new_http(
self: &Arc<Self>,
mut http: ServerSession,
_shutdown: &ShutdownWatch,
) -> Option<Stream> {
match http.read_request().await {
Ok(res) => {
if !res {
error!("Failed to read request header");
create_response_with_error_message(
&mut http,
"Failed to read request header".to_string(),
)
.await
.ok();
return None;
}
},
Err(e) => {
error!("HTTP server fails to read from downstream: {e}");
create_response_with_error_message(
&mut http,
format!("HTTP server fails to read from downstream: {e}"),
)
.await
.ok();
return None;
},
}
info!("Successfully get a new request to update workers");
let Some(query_params) = http.req_header().as_ref().uri.query() else {
let error_message = "No query parameters provided".to_string();
error!("{}", error_message);
create_response_with_error_message(&mut http, error_message).await.ok();
return None;
};
let update_workers: Result<UpdateWorkers, _> = serde_qs::from_str(query_params);
let update_workers = match update_workers {
Ok(workers) => workers,
Err(err) => {
let error_message = err.as_report_context("failed to parse query parameters");
error!("{}", error_message);
create_response_with_error_message(&mut http, error_message).await.ok();
return None;
},
};
if let Err(err) = self.lb_state.update_workers(update_workers).await {
let error_message = err.as_report_context("failed to update workers");
error!("{}", error_message);
create_response_with_error_message(&mut http, error_message).await.ok();
return None;
}
create_workers_updated_response(&mut http, self.lb_state.num_workers().await)
.await
.ok();
info!("Successfully updated workers");
None
}
fn server_options(&self) -> Option<&HttpServerOptions> {
Some(&self.server_opts)
}
}
async fn create_workers_updated_response(
session: &mut ServerSession,
workers: usize,
) -> pingora_core::Result<bool> {
let mut header = ResponseHeader::build(200, None)?;
header.insert_header("X-Worker-Count", workers.to_string())?;
session.set_keepalive(None);
session.write_response_header(Box::new(header)).await?;
Ok(true)
}