use crate::config::{ListenerConfig, ProxyConfig, UpstreamConfig};
use crate::convert::ConversionRegistry;
use crate::error::{ProxyError, ProxyResult};
use crate::inspector::{EventBroadcaster, EventStore};
use crate::middleware::MiddlewareStack;
use crate::middleware::inspector::InspectorLayer;
use crate::middleware::logging::LoggingLayer;
use crate::proxy::{ListenerFactory, ProxyService};
use crate::transport::{TransportConnector, TransportListener};
use std::net::SocketAddr;
use std::sync::Arc;
pub struct ProxyBuilder {
config: ProxyConfig,
middleware: MiddlewareStack,
conversions: ConversionRegistry,
custom_listener: Option<Box<dyn TransportListener>>,
custom_connector: Option<Box<dyn TransportConnector>>,
enable_logging: bool,
log_bodies: bool,
enable_inspector: bool,
}
impl ProxyBuilder {
pub fn new() -> Self {
Self {
config: ProxyConfig::default(),
middleware: MiddlewareStack::new(),
conversions: ConversionRegistry::new(),
custom_listener: None,
custom_connector: None,
enable_logging: false,
log_bodies: false,
enable_inspector: false,
}
}
pub fn listen_on(mut self, addr: &str) -> Self {
if let Ok(socket_addr) = addr.parse::<SocketAddr>() {
self.config.listener = ListenerConfig::Tcp { addr: socket_addr };
}
self
}
pub fn upstream_url(mut self, url: &str) -> Self {
self.config.upstream = UpstreamConfig::Url {
url: url.to_string(),
};
self
}
pub fn upstream_tcp(mut self, host: &str, port: u16) -> Self {
self.config.upstream = UpstreamConfig::Tcp {
host: host.to_string(),
port,
};
self
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.config.timeout = timeout;
self
}
pub fn max_body_size(mut self, size: usize) -> Self {
self.config.max_body_size = size;
self
}
pub fn with_logging(mut self) -> Self {
self.enable_logging = true;
self
}
pub fn with_body_logging(mut self) -> Self {
self.enable_logging = true;
self.log_bodies = true;
self
}
pub fn with_inspector(mut self) -> Self {
self.enable_inspector = true;
self.config.inspector.enabled = true;
self
}
#[cfg(feature = "inspector-api")]
pub fn with_inspector_api(mut self, addr: SocketAddr) -> Self {
self.enable_inspector = true;
self.config.inspector.enabled = true;
self.config.inspector.api_addr = Some(addr);
self
}
pub fn inspector_capacity(mut self, capacity: usize) -> Self {
self.config.inspector.event_capacity = capacity;
self
}
pub fn layer(mut self, layer: impl crate::middleware::ProxyLayer + 'static) -> Self {
self.middleware.push(layer);
self
}
pub fn listener(mut self, listener: impl TransportListener + 'static) -> Self {
self.custom_listener = Some(Box::new(listener));
self
}
pub fn connector(mut self, connector: impl TransportConnector + 'static) -> Self {
self.custom_connector = Some(Box::new(connector));
self
}
pub fn conversions(mut self, registry: ConversionRegistry) -> Self {
self.conversions = registry;
self
}
pub fn config(mut self, config: ProxyConfig) -> Self {
self.config = config;
self
}
pub fn build(mut self) -> ProxyResult<ProxyService> {
let event_store = Arc::new(EventStore::new(self.config.inspector.event_capacity));
let event_broadcaster = Arc::new(EventBroadcaster::new(
self.config.inspector.broadcast_capacity,
));
if self.enable_inspector {
let inspector_layer =
InspectorLayer::new(event_store.clone(), event_broadcaster.clone());
let mut new_stack = MiddlewareStack::new();
new_stack.push(inspector_layer);
let old_stack = std::mem::replace(&mut self.middleware, new_stack);
self.middleware = {
let mut stack = MiddlewareStack::new();
stack.push(InspectorLayer::new(
event_store.clone(),
event_broadcaster.clone(),
));
stack
};
let _ = old_stack; }
if self.enable_logging {
self.middleware
.push(LoggingLayer::new().with_bodies(self.log_bodies));
}
let connector: Box<dyn TransportConnector> = if let Some(c) = self.custom_connector {
c
} else {
build_connector(&self.config)?
};
let listener_factory: ListenerFactory = if let Some(listener) = self.custom_listener {
let listener = Arc::new(listener);
Box::new(move |tx, shutdown| {
let listener = listener.clone();
Box::pin(async move { listener.listen(tx, shutdown).await })
})
} else {
build_listener_factory(&self.config)?
};
Ok(ProxyService {
config: self.config,
middleware: self.middleware,
connector,
listener_factory,
conversions: self.conversions,
event_store,
event_broadcaster,
})
}
}
impl Default for ProxyBuilder {
fn default() -> Self {
Self::new()
}
}
fn build_connector(config: &ProxyConfig) -> ProxyResult<Box<dyn TransportConnector>> {
match &config.upstream {
#[cfg(feature = "http")]
UpstreamConfig::Url { url } => {
let parsed = url::Url::parse(url)
.map_err(|e| ProxyError::Config(format!("invalid upstream URL: {e}")))?;
Ok(Box::new(crate::transport::http::HttpConnector::new(parsed)))
}
#[cfg(not(feature = "http"))]
UpstreamConfig::Url { .. } => Err(ProxyError::Config(
"HTTP transport requires the 'http' feature".into(),
)),
UpstreamConfig::Tcp { .. } => Err(ProxyError::Config(
"TCP upstream connector unsupported; use a custom connector".into(),
)),
UpstreamConfig::Unix { .. } => Err(ProxyError::Config(
"Unix upstream connector unsupported; use a custom connector".into(),
)),
}
}
fn build_listener_factory(config: &ProxyConfig) -> ProxyResult<ListenerFactory> {
match &config.listener {
#[cfg(feature = "http")]
ListenerConfig::Tcp { addr } => {
let addr = *addr;
Ok(Box::new(move |tx, shutdown| {
let listener = crate::transport::http::HttpListener::new(addr);
Box::pin(async move { listener.listen(tx, shutdown).await })
}))
}
#[cfg(not(feature = "http"))]
ListenerConfig::Tcp { .. } => Err(ProxyError::Config(
"TCP listener requires the 'http' feature".into(),
)),
ListenerConfig::Unix { path } => {
let path = path.clone();
Ok(Box::new(move |tx, shutdown| {
let listener = crate::transport::unix::UnixListener::new(path.clone());
Box::pin(async move { listener.listen(tx, shutdown).await })
}))
}
}
}