brainwires_proxy/
builder.rs1use crate::config::{ListenerConfig, ProxyConfig, UpstreamConfig};
4use crate::convert::ConversionRegistry;
5use crate::error::{ProxyError, ProxyResult};
6use crate::inspector::{EventBroadcaster, EventStore};
7use crate::middleware::MiddlewareStack;
8use crate::middleware::inspector::InspectorLayer;
9use crate::middleware::logging::LoggingLayer;
10use crate::proxy::{ListenerFactory, ProxyService};
11use crate::transport::{TransportConnector, TransportListener};
12
13use std::net::SocketAddr;
14use std::sync::Arc;
15
16pub struct ProxyBuilder {
18 config: ProxyConfig,
19 middleware: MiddlewareStack,
20 conversions: ConversionRegistry,
21 custom_listener: Option<Box<dyn TransportListener>>,
22 custom_connector: Option<Box<dyn TransportConnector>>,
23 enable_logging: bool,
24 log_bodies: bool,
25 enable_inspector: bool,
26}
27
28impl ProxyBuilder {
29 pub fn new() -> Self {
30 Self {
31 config: ProxyConfig::default(),
32 middleware: MiddlewareStack::new(),
33 conversions: ConversionRegistry::new(),
34 custom_listener: None,
35 custom_connector: None,
36 enable_logging: false,
37 log_bodies: false,
38 enable_inspector: false,
39 }
40 }
41
42 pub fn listen_on(mut self, addr: &str) -> Self {
44 if let Ok(socket_addr) = addr.parse::<SocketAddr>() {
45 self.config.listener = ListenerConfig::Tcp { addr: socket_addr };
46 }
47 self
48 }
49
50 pub fn upstream_url(mut self, url: &str) -> Self {
52 self.config.upstream = UpstreamConfig::Url {
53 url: url.to_string(),
54 };
55 self
56 }
57
58 pub fn upstream_tcp(mut self, host: &str, port: u16) -> Self {
60 self.config.upstream = UpstreamConfig::Tcp {
61 host: host.to_string(),
62 port,
63 };
64 self
65 }
66
67 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
69 self.config.timeout = timeout;
70 self
71 }
72
73 pub fn max_body_size(mut self, size: usize) -> Self {
75 self.config.max_body_size = size;
76 self
77 }
78
79 pub fn with_logging(mut self) -> Self {
81 self.enable_logging = true;
82 self
83 }
84
85 pub fn with_body_logging(mut self) -> Self {
87 self.enable_logging = true;
88 self.log_bodies = true;
89 self
90 }
91
92 pub fn with_inspector(mut self) -> Self {
94 self.enable_inspector = true;
95 self.config.inspector.enabled = true;
96 self
97 }
98
99 #[cfg(feature = "inspector-api")]
101 pub fn with_inspector_api(mut self, addr: SocketAddr) -> Self {
102 self.enable_inspector = true;
103 self.config.inspector.enabled = true;
104 self.config.inspector.api_addr = Some(addr);
105 self
106 }
107
108 pub fn inspector_capacity(mut self, capacity: usize) -> Self {
110 self.config.inspector.event_capacity = capacity;
111 self
112 }
113
114 pub fn layer(mut self, layer: impl crate::middleware::ProxyLayer + 'static) -> Self {
116 self.middleware.push(layer);
117 self
118 }
119
120 pub fn listener(mut self, listener: impl TransportListener + 'static) -> Self {
122 self.custom_listener = Some(Box::new(listener));
123 self
124 }
125
126 pub fn connector(mut self, connector: impl TransportConnector + 'static) -> Self {
128 self.custom_connector = Some(Box::new(connector));
129 self
130 }
131
132 pub fn conversions(mut self, registry: ConversionRegistry) -> Self {
134 self.conversions = registry;
135 self
136 }
137
138 pub fn config(mut self, config: ProxyConfig) -> Self {
140 self.config = config;
141 self
142 }
143
144 pub fn build(mut self) -> ProxyResult<ProxyService> {
146 let event_store = Arc::new(EventStore::new(self.config.inspector.event_capacity));
147 let event_broadcaster = Arc::new(EventBroadcaster::new(
148 self.config.inspector.broadcast_capacity,
149 ));
150
151 if self.enable_inspector {
153 let inspector_layer =
154 InspectorLayer::new(event_store.clone(), event_broadcaster.clone());
155 let mut new_stack = MiddlewareStack::new();
157 new_stack.push(inspector_layer);
158 let old_stack = std::mem::replace(&mut self.middleware, new_stack);
161 self.middleware = {
164 let mut stack = MiddlewareStack::new();
165 stack.push(InspectorLayer::new(
166 event_store.clone(),
167 event_broadcaster.clone(),
168 ));
169 stack
170 };
171 let _ = old_stack; }
177
178 if self.enable_logging {
179 self.middleware
180 .push(LoggingLayer::new().with_bodies(self.log_bodies));
181 }
182
183 let connector: Box<dyn TransportConnector> = if let Some(c) = self.custom_connector {
185 c
186 } else {
187 build_connector(&self.config)?
188 };
189
190 let listener_factory: ListenerFactory = if let Some(listener) = self.custom_listener {
192 let listener = Arc::new(listener);
193 Box::new(move |tx, shutdown| {
194 let listener = listener.clone();
195 Box::pin(async move { listener.listen(tx, shutdown).await })
196 })
197 } else {
198 build_listener_factory(&self.config)?
199 };
200
201 Ok(ProxyService {
202 config: self.config,
203 middleware: self.middleware,
204 connector,
205 listener_factory,
206 conversions: self.conversions,
207 event_store,
208 event_broadcaster,
209 })
210 }
211}
212
213impl Default for ProxyBuilder {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219fn build_connector(config: &ProxyConfig) -> ProxyResult<Box<dyn TransportConnector>> {
220 match &config.upstream {
221 #[cfg(feature = "http")]
222 UpstreamConfig::Url { url } => {
223 let parsed = url::Url::parse(url)
224 .map_err(|e| ProxyError::Config(format!("invalid upstream URL: {e}")))?;
225 Ok(Box::new(crate::transport::http::HttpConnector::new(parsed)))
226 }
227 #[cfg(not(feature = "http"))]
228 UpstreamConfig::Url { .. } => Err(ProxyError::Config(
229 "HTTP transport requires the 'http' feature".into(),
230 )),
231 UpstreamConfig::Tcp { .. } => Err(ProxyError::Config(
232 "TCP upstream connector not yet implemented; use a custom connector".into(),
233 )),
234 UpstreamConfig::Unix { .. } => Err(ProxyError::Config(
235 "Unix upstream connector not yet implemented; use a custom connector".into(),
236 )),
237 }
238}
239
240fn build_listener_factory(config: &ProxyConfig) -> ProxyResult<ListenerFactory> {
241 match &config.listener {
242 #[cfg(feature = "http")]
243 ListenerConfig::Tcp { addr } => {
244 let addr = *addr;
245 Ok(Box::new(move |tx, shutdown| {
246 let listener = crate::transport::http::HttpListener::new(addr);
247 Box::pin(async move { listener.listen(tx, shutdown).await })
248 }))
249 }
250 #[cfg(not(feature = "http"))]
251 ListenerConfig::Tcp { .. } => Err(ProxyError::Config(
252 "TCP listener requires the 'http' feature".into(),
253 )),
254 ListenerConfig::Unix { path } => {
255 let path = path.clone();
256 Ok(Box::new(move |tx, shutdown| {
257 let listener = crate::transport::unix::UnixListener::new(path.clone());
258 Box::pin(async move { listener.listen(tx, shutdown).await })
259 }))
260 }
261 }
262}