1#[cfg(test)]
12mod tests;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use std::{fmt, mem};
18use std::borrow::Cow;
19use thiserror::Error;
20use crate::security::{ProviderConfig, SecurityChain, SecurityProvider, SecurityStage};
21use tokio::sync::RwLock;
22use tokio::time::timeout;
23use serde::{Serialize, Deserialize};
24
25use crate::config::Config;
26
27#[cfg(feature = "opentelemetry")]
28use opentelemetry::{
29 global,
30 trace::Tracer,
31 KeyValue,
32 Context,
33 context::FutureExt,
34 trace::{Span, SpanBuilder, SpanKind, TraceContextExt, Status}
35};
36#[cfg(feature = "opentelemetry")]
37use opentelemetry_http::HeaderInjector;
38#[cfg(feature = "opentelemetry")]
39use opentelemetry_semantic_conventions::attribute::HTTP_RESPONSE_STATUS_CODE;
40use crate::log_info;
41
42#[derive(Error, Debug)]
44pub enum ProxyError {
45 #[error("HTTP client error: {0}")]
47 ClientError(#[from] reqwest::Error),
48
49 #[error("IO error: {0}")]
51 IoError(#[from] std::io::Error),
52
53 #[error("request timed out after {0:?}")]
55 Timeout(Duration),
56
57 #[error("routing error: {0}")]
59 RoutingError(String),
60
61 #[error("filter error: {0}")]
63 FilterError(String),
64
65 #[error("configuration error: {0}")]
67 ConfigError(String),
68
69 #[error("security error: {0}")]
71 SecurityError(String),
72
73 #[error("{0}")]
75 Other(String),
76}
77
78impl From<crate::config::error::ConfigError> for ProxyError {
79 fn from(err: crate::config::error::ConfigError) -> Self {
80 ProxyError::ConfigError(err.to_string())
81 }
82}
83
84impl From<globset::Error> for ProxyError {
85 fn from(e: globset::Error) -> Self {
86 ProxyError::SecurityError(e.to_string())
87 }
88}
89
90impl From<jsonwebtoken::errors::Error> for ProxyError {
91 fn from(e: jsonwebtoken::errors::Error) -> Self {
92 ProxyError::SecurityError(e.to_string())
93 }
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "UPPERCASE")]
99pub enum HttpMethod {
100 Get,
101 Post,
102 Put,
103 Delete,
104 Head,
105 Options,
106 Patch,
107 Trace,
108 Connect,
109}
110
111impl fmt::Display for HttpMethod {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 match self {
114 HttpMethod::Get => write!(f, "GET"),
115 HttpMethod::Post => write!(f, "POST"),
116 HttpMethod::Put => write!(f, "PUT"),
117 HttpMethod::Delete => write!(f, "DELETE"),
118 HttpMethod::Head => write!(f, "HEAD"),
119 HttpMethod::Options => write!(f, "OPTIONS"),
120 HttpMethod::Patch => write!(f, "PATCH"),
121 HttpMethod::Trace => write!(f, "TRACE"),
122 HttpMethod::Connect => write!(f, "CONNECT"),
123 }
124 }
125}
126
127impl From<&reqwest::Method> for HttpMethod {
128 fn from(method: &reqwest::Method) -> Self {
129 match *method {
130 reqwest::Method::GET => HttpMethod::Get,
131 reqwest::Method::POST => HttpMethod::Post,
132 reqwest::Method::PUT => HttpMethod::Put,
133 reqwest::Method::DELETE => HttpMethod::Delete,
134 reqwest::Method::HEAD => HttpMethod::Head,
135 reqwest::Method::OPTIONS => HttpMethod::Options,
136 reqwest::Method::PATCH => HttpMethod::Patch,
137 reqwest::Method::TRACE => HttpMethod::Trace,
138 reqwest::Method::CONNECT => HttpMethod::Connect,
139 _ => HttpMethod::Get, }
141 }
142}
143
144impl From<HttpMethod> for reqwest::Method {
145 fn from(method: HttpMethod) -> Self {
146 match method {
147 HttpMethod::Get => reqwest::Method::GET,
148 HttpMethod::Post => reqwest::Method::POST,
149 HttpMethod::Put => reqwest::Method::PUT,
150 HttpMethod::Delete => reqwest::Method::DELETE,
151 HttpMethod::Head => reqwest::Method::HEAD,
152 HttpMethod::Options => reqwest::Method::OPTIONS,
153 HttpMethod::Patch => reqwest::Method::PATCH,
154 HttpMethod::Trace => reqwest::Method::TRACE,
155 HttpMethod::Connect => reqwest::Method::CONNECT,
156 }
157 }
158}
159
160#[derive(Debug)]
162pub struct ProxyRequest {
163 pub method: HttpMethod,
164 pub path: String,
165 pub query: Option<String>,
166 pub headers: reqwest::header::HeaderMap,
167 pub body: reqwest::Body,
168 pub context: Arc<RwLock<RequestContext>>,
169}
170
171impl Clone for ProxyRequest {
172 fn clone(&self) -> Self {
173 Self {
175 method: self.method,
176 path: self.path.clone(),
177 query: self.query.clone(),
178 headers: self.headers.clone(),
179 body: reqwest::Body::from(""),
180 context: self.context.clone(),
181 }
182 }
183}
184
185#[derive(Debug)]
187pub struct ProxyResponse {
188 pub status: u16,
189 pub headers: reqwest::header::HeaderMap,
190 pub body: reqwest::Body,
191 pub context: Arc<RwLock<ResponseContext>>,
192}
193
194#[derive(Debug, Default, Clone)]
196pub struct RequestContext {
197 pub client_ip: Option<String>,
199 pub start_time: Option<std::time::Instant>,
201 pub attributes: std::collections::HashMap<String, serde_json::Value>,
203}
204
205#[derive(Debug, Default, Clone)]
207pub struct ResponseContext {
208 pub receive_time: Option<std::time::Instant>,
210 pub attributes: std::collections::HashMap<String, serde_json::Value>,
212}
213
214#[derive(Debug)]
216pub struct ProxyCore {
217 pub config: Arc<Config>,
219 pub client: reqwest::Client,
221 pub router: Arc<dyn Router>,
223 pub global_filters: Arc<RwLock<Vec<Arc<dyn Filter>>>>,
225 pub security_chain: Arc<RwLock<SecurityChain>>,
227}
228
229impl ProxyCore {
230 pub async fn new(config: Arc<Config>, router: Arc<dyn Router>) -> Result<Self, ProxyError> {
232 let timeout_secs: u64 = config.get_or_default("proxy.timeout", 30_u64)?;
234
235 let client_builder = reqwest::Client::builder();
236 let client = client_builder
237 .timeout(Duration::from_secs(timeout_secs))
238 .build()
239 .map_err(ProxyError::ClientError)?;
240
241 let actual_security_config: Vec<ProviderConfig> = match config.get("proxy.security_chain") {
242 Ok(Some(sc)) => sc,
243 Ok(None) => Vec::new(), Err(e) => {
245 crate::warn!("Could not parse 'proxy.security_chain', defaulting to empty: {}", e);
246 Vec::new() }
248 };
249
250 let security_chain = SecurityChain::from_configs(actual_security_config).await?;
251
252 Ok(Self {
253 config,
254 client,
255 router,
256 global_filters: Arc::new(RwLock::new(Vec::new())),
257 security_chain: Arc::new(RwLock::new(security_chain)),
258 })
259 }
260
261 pub async fn add_global_filter(&self, filter: Arc<dyn Filter>) {
263 let mut filters = self.global_filters.write().await;
264 filters.push(filter);
265 }
266
267 pub async fn add_security_provider(&self, p: Arc<dyn SecurityProvider>) {
269 self.security_chain.write().await.add(p);
270 }
271
272 pub async fn process_request(
274 &self,
275 request: ProxyRequest,
276 #[cfg(feature = "opentelemetry")]
277 parent_context: Option<Context>,
278 ) -> Result<ProxyResponse, ProxyError> {
279 let overall_start = Instant::now();
280 let method = request.method.to_string();
281 let path = request.path.clone();
282
283 crate::trace!("Processing request: {} {}", method, path);
284
285 #[cfg(feature = "opentelemetry")]
286 let span_context = {
287 let parent = parent_context
288 .as_ref()
289 .cloned()
290 .unwrap_or_else(Context::current);
291
292 let mut span = global::tracer("foxy::proxy")
293 .build_with_context(SpanBuilder {
294 name: Cow::from(format!("{method} {path}")),
295 span_kind: Some(SpanKind::Client),
296 ..Default::default()
297 }, &parent);
298
299 let span_context = &Context::current_with_span(span);
300 span_context.clone()
301 };
302
303 let mut request = match self.security_chain.read().await.apply_pre(request).await {
305 Ok(req) => {
306 crate::trace!("Security pre-auth passed for {} {}", method, path);
307 req
308 },
309 Err(e) => {
310 crate::warn!("Security pre-auth failed for {} {}: {}", method, path, e);
311
312 #[cfg(feature = "opentelemetry")]
313 {
314 span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
315 span_context.span().end();
316 }
317
318 return Err(e);
319 }
320 };
321
322 for f in self.global_filters.read().await.iter() {
324 if f.filter_type().is_pre() || f.filter_type().is_both() {
325 crate::trace!("Applying global pre-filter: {}", f.name());
326 match f.pre_filter(request).await {
327 Ok(req) => request = req,
328 Err(e) => {
329 crate::error!("Global pre-filter '{}' failed: {}", f.name(), e);
330
331 #[cfg(feature = "opentelemetry")]
332 {
333 span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
334 span_context.span().end();
335 }
336
337 return Err(e);
338 }
339 }
340 }
341 }
342
343 let route = match self.router.route(&request).await {
344 Ok(r) => {
345 crate::debug!("Request {} {} matched route: {}", method, path, r.id);
346 r
347 },
348 Err(e) => {
349 crate::warn!("No route found for {} {}: {}", method, path, e);
350
351 #[cfg(feature = "opentelemetry")]
352 {
353 span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
354 span_context.span().end();
355 }
356
357 return Err(e);
358 }
359 };
360
361 let route_filters = route.filters.clone().unwrap_or_default();
362 for f in &route_filters {
363 if f.filter_type().is_pre() || f.filter_type().is_both() {
364 crate::trace!("Applying route pre-filter: {}", f.name());
365 match f.pre_filter(request).await {
366 Ok(req) => request = req,
367 Err(e) => {
368 crate::error!("Route pre-filter '{}' failed: {}", f.name(), e);
369 return Err(e);
370 }
371 }
372 }
373 }
374
375 let url = format!("{}{}", route.target_base_url, request.path);
377 crate::debug!("Forwarding to target: {}", url);
378 let outbound_body = mem::replace(&mut request.body, reqwest::Body::from(""));
379
380 let mut outbound_headers = request.headers.clone();
381 #[cfg(feature = "opentelemetry")]
382 {
383 span_context.span().set_attribute(KeyValue::new("target", url.clone()));
384
385 global::get_text_map_propagator(|prop| {
386 prop.inject_context(&span_context, &mut HeaderInjector(&mut outbound_headers));
387 });
388 }
389
390 let mut builder = self
391 .client
392 .request(request.method.into(), &url)
393 .headers(outbound_headers)
394 .body(outbound_body);
395
396 if let Some(q) = &request.query {
397 builder = builder.query(&[(q, "")]);
398 }
399
400 let request_specific_timeout_ms: Option<u64> = request.context.read().await
406 .attributes
407 .get("timeout_ms")
408 .and_then(|v| v.as_u64());
409
410 let timeout_duration = if let Some(ms) = request_specific_timeout_ms {
411 Duration::from_millis(ms)
412 } else {
413 self.config.get_or_default("proxy.timeout", 30_u64).map(Duration::from_secs)?
419 };
420
421 let upstream_start = Instant::now();
422 crate::trace!("Sending request to upstream with timeout: {:?}", timeout_duration);
423
424 let resp = match timeout(timeout_duration, builder.send()).await {
425 Ok(result) => match result {
426 Ok(response) => response,
427 Err(e) => {
428 crate::error!("Upstream request failed: {}", e);
429
430 #[cfg(feature = "opentelemetry")]
431 {
432 span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
433 span_context.span().end();
434 }
435
436 return Err(ProxyError::ClientError(e));
437 }
438 },
439 Err(_) => {
440 crate::warn!("Request to {} timed out after {:?}", url, timeout_duration);
441
442 #[cfg(feature = "opentelemetry")]
443 {
444 span_context.span().set_status(Status::Error {description: Cow::from("Request timed out") });
445 span_context.span().end();
446 }
447
448 return Err(ProxyError::Timeout(timeout_duration));
449 }
450 };
451
452 #[cfg(feature = "opentelemetry")]
453 {
454 let client_span = span_context.span();
455
456 client_span.set_attribute(KeyValue::new(
457 HTTP_RESPONSE_STATUS_CODE,
458 resp.status().as_u16() as i64,
459 ));
460 client_span.end();
461 }
462
463 let upstream_elapsed = upstream_start.elapsed();
464 crate::trace!("Received response from upstream in {:?}", upstream_elapsed);
465
466 let status = resp.status().as_u16();
468 let headers = resp.headers().clone();
469 let body = reqwest::Body::wrap_stream(resp.bytes_stream());
470
471 let mut proxy_resp = ProxyResponse {
472 status,
473 headers,
474 body,
475 context: Arc::new(RwLock::new(ResponseContext::default())),
476 };
477 proxy_resp.context.write().await.receive_time = Some(Instant::now());
478
479 crate::debug!("Upstream responded with status: {}", status);
480
481 for f in &route_filters {
483 if f.filter_type().is_post() || f.filter_type().is_both() {
484 crate::trace!("Applying route post-filter: {}", f.name());
485 match f.post_filter(request.clone(), proxy_resp).await {
486 Ok(resp) => proxy_resp = resp,
487 Err(e) => {
488 crate::error!("Route post-filter '{}' failed: {}", f.name(), e);
489 return Err(e);
490 }
491 }
492 }
493 }
494
495 for f in self.global_filters.read().await.iter() {
496 if f.filter_type().is_post() || f.filter_type().is_both() {
497 crate::trace!("Applying global post-filter: {}", f.name());
498 match f.post_filter(request.clone(), proxy_resp).await {
499 Ok(resp) => proxy_resp = resp,
500 Err(e) => {
501 crate::error!("Global post-filter '{}' failed: {}", f.name(), e);
502 return Err(e);
503 }
504 }
505 }
506 }
507
508 proxy_resp = match self.security_chain.read().await.apply_post(request.clone(), proxy_resp).await {
510 Ok(resp) => {
511 crate::trace!("Security post-auth passed for {} {}", method, path);
512 resp
513 },
514 Err(e) => {
515 crate::warn!("Security post-auth failed for {} {}: {}", method, path, e);
516 return Err(e);
517 }
518 };
519
520 let overall_elapsed = overall_start.elapsed();
522 let internal_elapsed = overall_elapsed.saturating_sub(upstream_elapsed);
523
524 crate::debug!(
525 "[timing] {} {} -> {} | total={:?} upstream={:?} internal={:?}",
526 request.method,
527 request.path,
528 proxy_resp.status,
529 overall_elapsed,
530 upstream_elapsed,
531 internal_elapsed
532 );
533
534 Ok(proxy_resp)
535 }
536}
537
538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
540pub enum FilterType {
541 Pre,
543 Post,
545 Both,
547}
548
549impl FilterType {
550 pub fn is_pre(&self) -> bool {
552 matches!(self, FilterType::Pre | FilterType::Both)
553 }
554
555 pub fn is_post(&self) -> bool {
557 matches!(self, FilterType::Post | FilterType::Both)
558 }
559
560 pub fn is_both(&self) -> bool {
562 matches!(self, FilterType::Both)
563 }
564}
565
566#[async_trait::async_trait]
568pub trait Filter: fmt::Debug + Send + Sync {
569 fn filter_type(&self) -> FilterType;
571
572 fn name(&self) -> &str;
574
575 async fn pre_filter(&self, request: ProxyRequest) -> Result<ProxyRequest, ProxyError> {
577 Ok(request)
579 }
580
581 async fn post_filter(&self, _request: ProxyRequest, response: ProxyResponse) -> Result<ProxyResponse, ProxyError> {
583 Ok(response)
585 }
586}
587
588#[derive(Debug, Clone)]
590pub struct Route {
591 pub id: String,
593 pub target_base_url: String,
595 pub path_pattern: String,
597 pub filters: Option<Vec<Arc<dyn Filter>>>,
599}
600
601#[async_trait::async_trait]
603pub trait Router: fmt::Debug + Send + Sync {
604 async fn route(&self, request: &ProxyRequest) -> Result<Route, ProxyError>;
606
607 async fn get_routes(&self) -> Vec<Route>;
609
610 async fn add_route(&self, route: Route) -> Result<(), ProxyError>;
612
613 async fn remove_route(&self, route_id: &str) -> Result<(), ProxyError>;
615}