use std::net::TcpListener;
use http::{HeaderMap, HeaderName, HeaderValue};
use miden_remote_prover::error::RemoteProverError;
use pingora::http::ResponseHeader;
use pingora::protocols::http::ServerSession;
use pingora::{Error, ErrorType};
use pingora_proxy::Session;
use prost::Message;
use tonic::Code;
use tracing::debug;
use crate::COMPONENT;
use crate::commands::PROXY_HOST;
use crate::proxy::metrics::QUEUE_DROP_COUNT;
const GRPC_CONTENT_TYPE: HeaderValue = HeaderValue::from_static("application/grpc");
const GRPC_STATUS_HEADER: HeaderName = HeaderName::from_static("grpc-status");
const GRPC_MESSAGE_HEADER: HeaderName = HeaderName::from_static("grpc-message");
fn build_grpc_trailers(
grpc_status: Code,
error_message: Option<&str>,
) -> pingora_core::Result<HeaderMap> {
let mut trailers = HeaderMap::new();
let status_code = (grpc_status as i32).to_string();
trailers.insert(
GRPC_STATUS_HEADER,
status_code.parse().map_err(|e| {
Error::because(ErrorType::InternalError, format!("Failed to parse grpc-status: {e}"), e)
})?,
);
if let Some(message) = error_message {
trailers.insert(
GRPC_MESSAGE_HEADER,
message.parse().map_err(|e| {
Error::because(
ErrorType::InternalError,
format!("Failed to parse grpc-message: {e}"),
e,
)
})?,
);
}
Ok(trailers)
}
pub async fn write_grpc_response_to_session<T>(
session: &mut Session,
message: T,
) -> pingora_core::Result<()>
where
T: Message,
{
let mut response_body = Vec::new();
message.encode(&mut response_body).map_err(|e| {
Error::because(ErrorType::InternalError, format!("Failed to encode proto response: {e}"), e)
})?;
let mut grpc_message = Vec::new();
grpc_message.push(0u8);
let msg_len = response_body.len() as u32;
grpc_message.extend_from_slice(&msg_len.to_be_bytes());
grpc_message.extend_from_slice(&response_body);
let mut header = ResponseHeader::build(200, None)?;
header.insert_header(http::header::CONTENT_TYPE, GRPC_CONTENT_TYPE)?;
session.set_keepalive(None);
session.write_response_header(Box::new(header), false).await?;
session.write_response_body(Some(grpc_message.into()), false).await?;
let trailers = build_grpc_trailers(Code::Ok, None)?;
session.write_response_trailers(trailers).await?;
Ok(())
}
pub async fn write_grpc_error_to_session(
session: &mut Session,
grpc_status: Code,
error_message: &str,
) -> pingora_core::Result<()> {
let mut header = ResponseHeader::build(200, None)?;
header.insert_header(http::header::CONTENT_TYPE, GRPC_CONTENT_TYPE)?;
session.set_keepalive(None);
session.write_response_header(Box::new(header), false).await?;
session.write_response_body(None, false).await?;
let trailers = build_grpc_trailers(grpc_status, Some(error_message))?;
session.write_response_trailers(trailers).await?;
Ok(())
}
pub(crate) async fn create_queue_full_response(session: &mut Session) -> pingora_core::Result<()> {
QUEUE_DROP_COUNT.inc();
write_grpc_error_to_session(session, Code::ResourceExhausted, "Too many requests in the queue")
.await
}
pub async fn create_too_many_requests_response(
session: &mut Session,
max_request_per_second: isize,
) -> pingora_core::Result<()> {
let error_message =
format!("Rate limit exceeded: {max_request_per_second} requests per second");
write_grpc_error_to_session(session, Code::ResourceExhausted, &error_message).await
}
pub async fn create_response_with_error_message(
session: &mut ServerSession,
error_msg: String,
) -> pingora_core::Result<()> {
let mut header = ResponseHeader::build(400, None)?;
header.insert_header("X-Error-Message", error_msg)?;
session.set_keepalive(None);
session.write_response_header(Box::new(header)).await?;
Ok(())
}
pub fn check_port_availability(
port: u16,
service: &str,
) -> Result<std::net::TcpListener, RemoteProverError> {
let addr = format!("{PROXY_HOST}:{port}");
TcpListener::bind(&addr)
.inspect(|_| debug!(target: COMPONENT, %service, %port, %addr, "Port is available"))
.map_err(|err| RemoteProverError::PortAlreadyInUse(err, port))
}