rust_web_server/proxy_config/
builder.rs1use std::collections::HashMap;
5use std::sync::{Arc, RwLock};
6
7use crate::middleware::WithMiddleware;
8use crate::proxy_config::{
9 ActionConfig, AuthConfig, CompiledRoute, ConfigDrivenApp, DynamicProxy, MiddlewareConfig,
10 ProxyConfig, RedirectAdapter, RespondAdapter, RouteMatcher, arc_app, BearerAuthMiddleware,
11 PerRouteRateLimit,
12};
13
14pub fn build_from_file() -> (ConfigDrivenApp, Vec<std::thread::JoinHandle<()>>) {
17 let config = ProxyConfig::load();
18 build(config)
19}
20
21pub fn build(config: ProxyConfig) -> (ConfigDrivenApp, Vec<std::thread::JoinHandle<()>>) {
23 let mut upstream_lives: HashMap<String, Arc<RwLock<Vec<String>>>> = HashMap::new();
25
26 for upstream in &config.upstreams {
27 let live = Arc::new(RwLock::new(upstream.backends.clone()));
28 upstream_lives.insert(upstream.name.clone(), Arc::clone(&live));
29
30 if let Some(ref hc) = upstream.health_check {
31 crate::proxy_config::health::start_health_checker(
32 upstream.name.clone(),
33 upstream.backends.clone(),
34 Arc::clone(&live),
35 hc.clone(),
36 );
37 }
38 }
39
40 let mut compiled: Vec<CompiledRoute> = Vec::new();
42
43 for route in &config.routes {
44 let matcher = RouteMatcher::from_match_config(&route.match_);
45
46 let base_handler: Arc<dyn crate::application::Application + Send + Sync> =
48 match &route.action {
49 ActionConfig::Proxy {
50 upstream,
51 connect_timeout_ms,
52 read_timeout_ms,
53 strip_path_prefix,
54 add_path_prefix,
55 } => {
56 let live = upstream_lives
57 .get(upstream.as_str())
58 .cloned()
59 .unwrap_or_else(|| {
60 Arc::new(RwLock::new(vec![upstream.clone()]))
62 });
63 let upstream_tls = config.upstreams.iter()
64 .find(|u| u.name == *upstream)
65 .map(|u| u.tls)
66 .unwrap_or(false);
67 arc_app(DynamicProxy::new(
68 live,
69 *connect_timeout_ms,
70 *read_timeout_ms,
71 strip_path_prefix.clone(),
72 add_path_prefix.clone(),
73 upstream_tls,
74 ))
75 }
76
77 ActionConfig::Redirect { location, status } => {
78 arc_app(RedirectAdapter::new(location.clone(), *status))
79 }
80
81 ActionConfig::Respond { status, body, content_type } => {
82 arc_app(RespondAdapter::new(*status, body.clone(), content_type.clone()))
83 }
84
85 ActionConfig::Static { .. } => {
87 use crate::core::New;
88 arc_app(crate::app::App::new())
89 }
90
91 ActionConfig::Grpc { upstream, connect_timeout_ms, read_timeout_ms } => {
94 let live = upstream_lives
95 .get(upstream.as_str())
96 .cloned()
97 .unwrap_or_else(|| Arc::new(RwLock::new(vec![upstream.clone()])));
98 let upstream_tls = config.upstreams.iter()
99 .find(|u| u.name == *upstream)
100 .map(|u| u.tls)
101 .unwrap_or(false);
102 arc_app(DynamicProxy::new(live, *connect_timeout_ms, *read_timeout_ms, None, None, upstream_tls))
103 }
104
105 ActionConfig::Mcp | ActionConfig::Unknown(_) => {
106 arc_app(crate::proxy_config::NullApp)
108 }
109 };
110
111 let handler = apply_middleware(base_handler, &route.middleware);
113
114 compiled.push(CompiledRoute { matcher, handler });
115 }
116
117 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
119
120 for tcp_cfg in &config.tcp_proxies {
121 let listen = tcp_cfg.listen.clone();
122 let backends = tcp_cfg.backends.clone();
123 let timeout_ms = tcp_cfg.connect_timeout_ms;
124 let name = tcp_cfg.name.clone();
125 let h = std::thread::Builder::new()
126 .name(format!("tcp-proxy-{}", name))
127 .spawn(move || {
128 let proxy = crate::tcp_proxy::TcpProxy::new(backends)
129 .connect_timeout_ms(timeout_ms);
130 if let Err(e) = proxy.bind(&listen) {
131 eprintln!("[tcp_proxy:{}] {}", name, e);
132 }
133 })
134 .expect("failed to spawn tcp proxy thread");
135 handles.push(h);
136 }
137
138 for udp_cfg in &config.udp_proxies {
139 let listen = udp_cfg.listen.clone();
140 let backends = udp_cfg.backends.clone();
141 let reply_timeout_ms = udp_cfg.reply_timeout_ms;
142 let buffer_size = udp_cfg.buffer_size;
143 let name = udp_cfg.name.clone();
144 let h = std::thread::Builder::new()
145 .name(format!("udp-proxy-{}", name))
146 .spawn(move || {
147 let proxy = crate::udp_proxy::UdpProxy::new(backends)
148 .reply_timeout_ms(reply_timeout_ms)
149 .buffer_size(buffer_size);
150 if let Err(e) = proxy.bind(&listen) {
151 eprintln!("[udp_proxy:{}] {}", name, e);
152 }
153 })
154 .expect("failed to spawn udp proxy thread");
155 handles.push(h);
156 }
157
158 for ws_cfg in &config.ws_proxies {
159 let listen = ws_cfg.listen.clone();
160 let backends = ws_cfg.backends.clone();
161 let connect_timeout_ms = ws_cfg.connect_timeout_ms;
162 let read_timeout_ms = ws_cfg.read_timeout_ms;
163 let name = ws_cfg.name.clone();
164 let h = std::thread::Builder::new()
165 .name(format!("ws-proxy-{}", name))
166 .spawn(move || {
167 let proxy = crate::ws_proxy::WsProxy::new(backends)
168 .connect_timeout_ms(connect_timeout_ms)
169 .read_timeout_ms(read_timeout_ms);
170 if let Err(e) = proxy.bind(&listen) {
171 eprintln!("[ws_proxy:{}] {}", name, e);
172 }
173 })
174 .expect("failed to spawn ws proxy thread");
175 handles.push(h);
176 }
177
178 (ConfigDrivenApp::new(compiled), handles)
179}
180
181fn apply_middleware(
189 handler: Arc<dyn crate::application::Application + Send + Sync>,
190 mw: &MiddlewareConfig,
191) -> Arc<dyn crate::application::Application + Send + Sync> {
192 let mut app: Arc<dyn crate::application::Application + Send + Sync> = handler;
195
196 if let Some(ref cache_cfg) = mw.cache {
198 let mut layer = crate::cache::CacheLayer::memory(1000).ttl(cache_cfg.ttl_secs);
199 for vh in &cache_cfg.vary_by {
200 layer = layer.vary_by_header(vh.as_str());
201 }
202 app = arc_app(WithMiddleware::new(ArcApp(Arc::clone(&app))).wrap(layer));
203 }
204
205 if !mw.rewrite_request.is_empty() || !mw.rewrite_response.is_empty() {
207 let mut layer = crate::rewrite::RewriteLayer::new();
208 for rule in &mw.rewrite_request {
209 layer = apply_request_rewrite_rule(layer, rule);
210 }
211 for rule in &mw.rewrite_response {
212 layer = apply_response_rewrite_rule(layer, rule);
213 }
214 app = arc_app(WithMiddleware::new(ArcApp(Arc::clone(&app))).wrap(layer));
215 }
216
217 if let Some(ref auth_cfg) = mw.auth {
219 match auth_cfg {
220 AuthConfig::Bearer { token_env } => {
221 let token = std::env::var(token_env).unwrap_or_default();
222 if !token.is_empty() {
223 app = arc_app(
224 WithMiddleware::new(ArcApp(Arc::clone(&app)))
225 .wrap(BearerAuthMiddleware { token: Arc::new(token) }),
226 );
227 }
228 }
229 #[cfg(feature = "auth")]
230 AuthConfig::Jwt { secret_env } => {
231 let _secret = std::env::var(secret_env).unwrap_or_default();
232 }
235 #[cfg(not(feature = "auth"))]
236 AuthConfig::Jwt { .. } => {
237 eprintln!("[proxy_config] JWT auth requires the 'auth' feature; skipping.");
238 }
239 AuthConfig::Basic { .. } => {
240 eprintln!("[proxy_config] Basic auth is not yet implemented; skipping.");
241 }
242 }
243 }
244
245 if let Some(ref rl_cfg) = mw.rate_limit {
247 let limiter = Arc::new(crate::rate_limit::RateLimiter::new(
248 rl_cfg.max_requests,
249 rl_cfg.window_secs,
250 ));
251 app = arc_app(
252 WithMiddleware::new(ArcApp(Arc::clone(&app)))
253 .wrap(PerRouteRateLimit(limiter)),
254 );
255 }
256
257 if !mw.ip_allow.is_empty() {
259 let filter = crate::ip_filter::IpFilter::allow(mw.ip_allow.iter().map(|s| s.as_str()));
260 app = arc_app(WithMiddleware::new(ArcApp(Arc::clone(&app))).wrap(filter));
261 } else if !mw.ip_deny.is_empty() {
262 let filter = crate::ip_filter::IpFilter::deny(mw.ip_deny.iter().map(|s| s.as_str()));
263 app = arc_app(WithMiddleware::new(ArcApp(Arc::clone(&app))).wrap(filter));
264 }
265
266 app
267}
268
269fn apply_request_rewrite_rule(
270 layer: crate::rewrite::RewriteLayer,
271 rule: &crate::proxy_config::RewriteRuleConfig,
272) -> crate::rewrite::RewriteLayer {
273 match rule.type_.as_str() {
274 "header_set" => {
275 if let (Some(name), Some(value)) = (&rule.name, &rule.value) {
276 return layer.request_header_set(name, value);
277 }
278 }
279 "header_remove" => {
280 if let Some(name) = &rule.name {
281 return layer.request_header_remove(name);
282 }
283 }
284 "uri_set" => {
285 if let Some(value) = &rule.value {
286 return layer.request_uri_set(value);
287 }
288 }
289 "uri_strip_prefix" | "strip_prefix" => {
290 if let Some(prefix) = rule.prefix.as_ref().or(rule.value.as_ref()) {
291 return layer.request_uri_strip_prefix(prefix);
292 }
293 }
294 "uri_add_prefix" | "add_prefix" => {
295 if let Some(prefix) = rule.prefix.as_ref().or(rule.value.as_ref()) {
296 return layer.request_uri_add_prefix(prefix);
297 }
298 }
299 _ => {}
300 }
301 layer
302}
303
304fn apply_response_rewrite_rule(
305 layer: crate::rewrite::RewriteLayer,
306 rule: &crate::proxy_config::RewriteRuleConfig,
307) -> crate::rewrite::RewriteLayer {
308 match rule.type_.as_str() {
309 "header_set" => {
310 if let (Some(name), Some(value)) = (&rule.name, &rule.value) {
311 return layer.response_header_set(name, value);
312 }
313 }
314 "header_remove" => {
315 if let Some(name) = &rule.name {
316 return layer.response_header_remove(name);
317 }
318 }
319 "status" => {
320 if let (Some(code), Some(reason)) = (&rule.code, &rule.reason) {
321 return layer.response_status(*code as i16, reason);
322 }
323 }
324 "body_replace" => {
325 if let (Some(from), Some(to)) = (&rule.from, &rule.to) {
326 return layer.response_body_replace(from, to);
327 }
328 }
329 _ => {}
330 }
331 layer
332}
333
334struct ArcApp(Arc<dyn crate::application::Application + Send + Sync>);
339
340impl crate::application::Application for ArcApp {
341 fn execute(
342 &self,
343 request: &crate::request::Request,
344 conn: &crate::server::ConnectionInfo,
345 ) -> Result<crate::response::Response, String> {
346 self.0.execute(request, conn)
347 }
348}
349
350impl Clone for ArcApp {
351 fn clone(&self) -> Self {
352 ArcApp(Arc::clone(&self.0))
353 }
354}