foxy/core/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Core primitives – requests, responses, filters & routing.
6//!
7//! Everything that physically moves through the proxy pipeline is defined
8//! in this module.  No protocol-level logic lives here; that sits in
9//! `server.rs` (IO) and `filters.rs` (behaviour).
10
11#[cfg(test)]
12mod tests;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use std::{fmt, mem};
18use std::borrow::Cow;
19use thiserror::Error;
20use crate::security::{ProviderConfig, SecurityChain, SecurityProvider, SecurityStage};
21use tokio::sync::RwLock;
22use tokio::time::timeout;
23use serde::{Serialize, Deserialize};
24
25use crate::config::Config;
26
27#[cfg(feature = "opentelemetry")]
28use opentelemetry::{
29    global,
30    trace::Tracer,
31    KeyValue,
32    Context,
33    context::FutureExt,
34    trace::{Span, SpanBuilder, SpanKind, TraceContextExt, Status}
35};
36#[cfg(feature = "opentelemetry")]
37use opentelemetry_http::HeaderInjector;
38#[cfg(feature = "opentelemetry")]
39use opentelemetry_semantic_conventions::attribute::HTTP_RESPONSE_STATUS_CODE;
40use crate::log_info;
41
42/// Errors that can occur during proxy operations.
43#[derive(Error, Debug)]
44pub enum ProxyError {
45    /// HTTP client error
46    #[error("HTTP client error: {0}")]
47    ClientError(#[from] reqwest::Error),
48
49    /// IO error
50    #[error("IO error: {0}")]
51    IoError(#[from] std::io::Error),
52
53    /// Timeout error
54    #[error("request timed out after {0:?}")]
55    Timeout(Duration),
56
57    /// Router error
58    #[error("routing error: {0}")]
59    RoutingError(String),
60
61    /// Filter error
62    #[error("filter error: {0}")]
63    FilterError(String),
64
65    /// Configuration error
66    #[error("configuration error: {0}")]
67    ConfigError(String),
68
69    /// Security provider error
70    #[error("security error: {0}")]
71    SecurityError(String),
72
73    /// Generic error
74    #[error("{0}")]
75    Other(String),
76}
77
78impl From<crate::config::error::ConfigError> for ProxyError {
79    fn from(err: crate::config::error::ConfigError) -> Self {
80        ProxyError::ConfigError(err.to_string())
81    }
82}
83
84impl From<globset::Error> for ProxyError {
85    fn from(e: globset::Error) -> Self {
86        ProxyError::SecurityError(e.to_string())
87    }
88}
89
90impl From<jsonwebtoken::errors::Error> for ProxyError {
91    fn from(e: jsonwebtoken::errors::Error) -> Self {
92        ProxyError::SecurityError(e.to_string())
93    }
94}
95
96/// HTTP methods supported by the proxy.
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
98#[serde(rename_all = "UPPERCASE")]
99pub enum HttpMethod {
100    Get,
101    Post,
102    Put,
103    Delete,
104    Head,
105    Options,
106    Patch,
107    Trace,
108    Connect,
109}
110
111impl fmt::Display for HttpMethod {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        match self {
114            HttpMethod::Get => write!(f, "GET"),
115            HttpMethod::Post => write!(f, "POST"),
116            HttpMethod::Put => write!(f, "PUT"),
117            HttpMethod::Delete => write!(f, "DELETE"),
118            HttpMethod::Head => write!(f, "HEAD"),
119            HttpMethod::Options => write!(f, "OPTIONS"),
120            HttpMethod::Patch => write!(f, "PATCH"),
121            HttpMethod::Trace => write!(f, "TRACE"),
122            HttpMethod::Connect => write!(f, "CONNECT"),
123        }
124    }
125}
126
127impl From<&reqwest::Method> for HttpMethod {
128    fn from(method: &reqwest::Method) -> Self {
129        match *method {
130            reqwest::Method::GET => HttpMethod::Get,
131            reqwest::Method::POST => HttpMethod::Post,
132            reqwest::Method::PUT => HttpMethod::Put,
133            reqwest::Method::DELETE => HttpMethod::Delete,
134            reqwest::Method::HEAD => HttpMethod::Head,
135            reqwest::Method::OPTIONS => HttpMethod::Options,
136            reqwest::Method::PATCH => HttpMethod::Patch,
137            reqwest::Method::TRACE => HttpMethod::Trace,
138            reqwest::Method::CONNECT => HttpMethod::Connect,
139            _ => HttpMethod::Get, // Default to GET for unsupported methods
140        }
141    }
142}
143
144impl From<HttpMethod> for reqwest::Method {
145    fn from(method: HttpMethod) -> Self {
146        match method {
147            HttpMethod::Get => reqwest::Method::GET,
148            HttpMethod::Post => reqwest::Method::POST,
149            HttpMethod::Put => reqwest::Method::PUT,
150            HttpMethod::Delete => reqwest::Method::DELETE,
151            HttpMethod::Head => reqwest::Method::HEAD,
152            HttpMethod::Options => reqwest::Method::OPTIONS,
153            HttpMethod::Patch => reqwest::Method::PATCH,
154            HttpMethod::Trace => reqwest::Method::TRACE,
155            HttpMethod::Connect => reqwest::Method::CONNECT,
156        }
157    }
158}
159
160/// Represents an HTTP request that can be processed by the proxy.
161#[derive(Debug)]
162pub struct ProxyRequest {
163    pub method: HttpMethod,
164    pub path: String,
165    pub query: Option<String>,
166    pub headers: reqwest::header::HeaderMap,
167    pub body: reqwest::Body,
168    pub context: Arc<RwLock<RequestContext>>,
169}
170
171impl Clone for ProxyRequest {
172    fn clone(&self) -> Self {
173        // A streaming body can't be duplicated.  Give filters an empty one.
174        Self {
175            method:   self.method,
176            path:     self.path.clone(),
177            query:    self.query.clone(),
178            headers:  self.headers.clone(),
179            body:     reqwest::Body::from(""),
180            context:  self.context.clone(),
181        }
182    }
183}
184
185/// Represents an HTTP response returned by the proxy.
186#[derive(Debug)]
187pub struct ProxyResponse {
188    pub status: u16,
189    pub headers: reqwest::header::HeaderMap,
190    pub body: reqwest::Body,
191    pub context: Arc<RwLock<ResponseContext>>,
192}
193
194/// Context data that can be attached to a request and accessed by filters.
195#[derive(Debug, Default, Clone)]
196pub struct RequestContext {
197    /// The original client's IP address
198    pub client_ip: Option<String>,
199    /// The start time of the request
200    pub start_time: Option<std::time::Instant>,
201    /// Custom attributes that can be set by filters
202    pub attributes: std::collections::HashMap<String, serde_json::Value>,
203}
204
205/// Context data that can be attached to a response and accessed by filters.
206#[derive(Debug, Default, Clone)]
207pub struct ResponseContext {
208    /// The time when the response was received from the target
209    pub receive_time: Option<std::time::Instant>,
210    /// Custom attributes that can be set by filters
211    pub attributes: std::collections::HashMap<String, serde_json::Value>,
212}
213
214/// Core proxy server implementation.
215#[derive(Debug)]
216pub struct ProxyCore {
217    /// Configuration for the proxy
218    pub config: Arc<Config>,
219    /// HTTP client for making outbound requests
220    pub client: reqwest::Client,
221    /// Router for matching requests to routes
222    pub router: Arc<dyn Router>,
223    /// Global filters that apply to all routes
224    pub global_filters: Arc<RwLock<Vec<Arc<dyn Filter>>>>,
225    /// Security chain that applies to all routes
226    pub security_chain: Arc<RwLock<SecurityChain>>,
227}
228
229impl ProxyCore {
230    /// Create a new proxy core with the given configuration and router.
231    pub async fn new(config: Arc<Config>, router: Arc<dyn Router>) -> Result<Self, ProxyError> {
232        // Configure the HTTP client based on the configuration
233        let timeout_secs: u64 = config.get_or_default("proxy.timeout", 30_u64)?;
234
235        let client_builder = reqwest::Client::builder();
236        let client = client_builder
237            .timeout(Duration::from_secs(timeout_secs))
238            .build()
239            .map_err(ProxyError::ClientError)?;
240
241        let actual_security_config: Vec<ProviderConfig> = match config.get("proxy.security_chain") {
242            Ok(Some(sc)) => sc,
243            Ok(None) => Vec::new(), // No security chain configured
244            Err(e) => {
245                crate::warn!("Could not parse 'proxy.security_chain', defaulting to empty: {}", e);
246                Vec::new() // Default to empty on error
247            }
248        };
249
250        let security_chain = SecurityChain::from_configs(actual_security_config).await?;
251
252        Ok(Self {
253            config,
254            client,
255            router,
256            global_filters: Arc::new(RwLock::new(Vec::new())),
257            security_chain: Arc::new(RwLock::new(security_chain)),
258        })
259    }
260
261    /// Add a global filter.
262    pub async fn add_global_filter(&self, filter: Arc<dyn Filter>) {
263        let mut filters = self.global_filters.write().await;
264        filters.push(filter);
265    }
266
267    /// Add a security filter to the chain.
268    pub async fn add_security_provider(&self, p: Arc<dyn SecurityProvider>) {
269        self.security_chain.write().await.add(p);
270    }
271
272    /// Process a request through the proxy.
273    pub async fn process_request(
274        &self,
275        request: ProxyRequest,
276        #[cfg(feature = "opentelemetry")]
277        parent_context: Option<Context>,
278    ) -> Result<ProxyResponse, ProxyError> {
279        let overall_start = Instant::now();
280        let method = request.method.to_string();
281        let path = request.path.clone();
282
283        crate::trace!("Processing request: {} {}", method, path);
284
285        #[cfg(feature = "opentelemetry")]
286        let span_context = {
287            let parent  = parent_context
288                .as_ref()
289                .cloned()
290                .unwrap_or_else(Context::current);
291
292            let mut span = global::tracer("foxy::proxy")
293                .build_with_context(SpanBuilder {
294                    name: Cow::from(format!("{method} {path}")),
295                    span_kind: Some(SpanKind::Client),
296                    ..Default::default()
297                }, &parent);
298
299            let span_context = &Context::current_with_span(span);
300            span_context.clone()
301        };
302
303        /* ---------- Security chain pre auth ---------- */
304        let mut request = match self.security_chain.read().await.apply_pre(request).await {
305            Ok(req) => {
306                crate::trace!("Security pre-auth passed for {} {}", method, path);
307                req
308            },
309            Err(e) => {
310                crate::warn!("Security pre-auth failed for {} {}: {}", method, path, e);
311
312                #[cfg(feature = "opentelemetry")]
313                {
314                    span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
315                    span_context.span().end();
316                }
317
318                return Err(e);
319            }
320        };
321
322        /* ---------- PRE-filters ---------- */
323        for f in self.global_filters.read().await.iter() {
324            if f.filter_type().is_pre() || f.filter_type().is_both() {
325                crate::trace!("Applying global pre-filter: {}", f.name());
326                match f.pre_filter(request).await {
327                    Ok(req) => request = req,
328                    Err(e) => {
329                        crate::error!("Global pre-filter '{}' failed: {}", f.name(), e);
330
331                        #[cfg(feature = "opentelemetry")]
332                        {
333                            span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
334                            span_context.span().end();
335                        }
336
337                        return Err(e);
338                    }
339                }
340            }
341        }
342        
343        let route = match self.router.route(&request).await {
344            Ok(r) => {
345                crate::debug!("Request {} {} matched route: {}", method, path, r.id);
346                r
347            },
348            Err(e) => {
349                crate::warn!("No route found for {} {}: {}", method, path, e);
350
351                #[cfg(feature = "opentelemetry")]
352                {
353                    span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
354                    span_context.span().end();
355                }
356
357                return Err(e);
358            }
359        };
360        
361        let route_filters = route.filters.clone().unwrap_or_default();
362        for f in &route_filters {
363            if f.filter_type().is_pre() || f.filter_type().is_both() {
364                crate::trace!("Applying route pre-filter: {}", f.name());
365                match f.pre_filter(request).await {
366                    Ok(req) => request = req,
367                    Err(e) => {
368                        crate::error!("Route pre-filter '{}' failed: {}", f.name(), e);
369                        return Err(e);
370                    }
371                }
372            }
373        }
374
375        /* ---------- build outbound req ---------- */
376        let url = format!("{}{}", route.target_base_url, request.path);
377        crate::debug!("Forwarding to target: {}", url);
378        let outbound_body = mem::replace(&mut request.body, reqwest::Body::from(""));
379
380        let mut outbound_headers = request.headers.clone();
381        #[cfg(feature = "opentelemetry")]
382        {
383            span_context.span().set_attribute(KeyValue::new("target", url.clone()));
384
385            global::get_text_map_propagator(|prop| {
386                prop.inject_context(&span_context, &mut HeaderInjector(&mut outbound_headers));
387            });
388        }
389
390        let mut builder = self
391            .client
392            .request(request.method.into(), &url)
393            .headers(outbound_headers)
394            .body(outbound_body);
395
396        if let Some(q) = &request.query {
397            builder = builder.query(&[(q, "")]);
398        }
399
400        /* ---------- send with timeout ---------- */
401        // The client already has a timeout configured.
402        // If a per-request timeout from a TimeoutFilter is present in context, it should override.
403        // For now, let's rely on the client's global timeout.
404        // A more advanced implementation could check request.context for a specific timeout.
405        let request_specific_timeout_ms: Option<u64> = request.context.read().await
406            .attributes
407            .get("timeout_ms")
408            .and_then(|v| v.as_u64());
409
410        let timeout_duration = if let Some(ms) = request_specific_timeout_ms {
411            Duration::from_millis(ms)
412        } else {
413            // Fallback to the client's configured timeout, or a default if not available.
414            // However, the client *is* configured with a timeout in ProxyCore::new.
415            // For consistency, we could fetch it from config again or store it in ProxyCore.
416            // For now, let's assume the client's timeout is sufficient.
417            // If we want to re-fetch:
418            self.config.get_or_default("proxy.timeout", 30_u64).map(Duration::from_secs)?
419        };
420
421        let upstream_start = Instant::now();
422        crate::trace!("Sending request to upstream with timeout: {:?}", timeout_duration);
423        
424        let resp = match timeout(timeout_duration, builder.send()).await {
425            Ok(result) => match result {
426                Ok(response) => response,
427                Err(e) => {
428                    crate::error!("Upstream request failed: {}", e);
429
430                    #[cfg(feature = "opentelemetry")]
431                    {
432                        span_context.span().set_status(Status::Error {description: Cow::from(e.to_string()) });
433                        span_context.span().end();
434                    }
435
436                    return Err(ProxyError::ClientError(e));
437                }
438            },
439            Err(_) => {
440                crate::warn!("Request to {} timed out after {:?}", url, timeout_duration);
441
442                #[cfg(feature = "opentelemetry")]
443                {
444                    span_context.span().set_status(Status::Error {description: Cow::from("Request timed out") });
445                    span_context.span().end();
446                }
447
448                return Err(ProxyError::Timeout(timeout_duration));
449            }
450        };
451
452        #[cfg(feature = "opentelemetry")]
453        {
454            let client_span = span_context.span();
455
456            client_span.set_attribute(KeyValue::new(
457                HTTP_RESPONSE_STATUS_CODE,
458                resp.status().as_u16() as i64,
459            ));
460            client_span.end();
461        }
462        
463        let upstream_elapsed = upstream_start.elapsed();
464        crate::trace!("Received response from upstream in {:?}", upstream_elapsed);
465
466        /* ---------- wrap streaming response ---------- */
467        let status = resp.status().as_u16();
468        let headers = resp.headers().clone();
469        let body = reqwest::Body::wrap_stream(resp.bytes_stream());
470
471        let mut proxy_resp = ProxyResponse {
472            status,
473            headers,
474            body,
475            context: Arc::new(RwLock::new(ResponseContext::default())),
476        };
477        proxy_resp.context.write().await.receive_time = Some(Instant::now());
478
479        crate::debug!("Upstream responded with status: {}", status);
480
481        /* ---------- POST-filters ---------- */
482        for f in &route_filters {
483            if f.filter_type().is_post() || f.filter_type().is_both() {
484                crate::trace!("Applying route post-filter: {}", f.name());
485                match f.post_filter(request.clone(), proxy_resp).await {
486                    Ok(resp) => proxy_resp = resp,
487                    Err(e) => {
488                        crate::error!("Route post-filter '{}' failed: {}", f.name(), e);
489                        return Err(e);
490                    }
491                }
492            }
493        }
494        
495        for f in self.global_filters.read().await.iter() {
496            if f.filter_type().is_post() || f.filter_type().is_both() {
497                crate::trace!("Applying global post-filter: {}", f.name());
498                match f.post_filter(request.clone(), proxy_resp).await {
499                    Ok(resp) => proxy_resp = resp,
500                    Err(e) => {
501                        crate::error!("Global post-filter '{}' failed: {}", f.name(), e);
502                        return Err(e);
503                    }
504                }
505            }
506        }
507
508        /* ---------- Security chain post auth ---------- */
509        proxy_resp = match self.security_chain.read().await.apply_post(request.clone(), proxy_resp).await {
510            Ok(resp) => {
511                crate::trace!("Security post-auth passed for {} {}", method, path);
512                resp
513            },
514            Err(e) => {
515                crate::warn!("Security post-auth failed for {} {}: {}", method, path, e);
516                return Err(e);
517            }
518        };
519        
520        /* ---------- timing log ---------- */
521        let overall_elapsed = overall_start.elapsed();
522        let internal_elapsed = overall_elapsed.saturating_sub(upstream_elapsed);
523
524        crate::debug!(
525            "[timing] {} {} -> {} | total={:?} upstream={:?} internal={:?}",
526            request.method,
527            request.path,
528            proxy_resp.status,
529            overall_elapsed,
530            upstream_elapsed,
531            internal_elapsed
532        );
533
534        Ok(proxy_resp)
535    }
536}
537
538/// Describes when a filter should be applied.
539#[derive(Debug, Clone, Copy, PartialEq, Eq)]
540pub enum FilterType {
541    /// Filter applied before the request is sent to the target
542    Pre,
543    /// Filter applied after the response is received from the target
544    Post,
545    /// Filter applied both before and after
546    Both,
547}
548
549impl FilterType {
550    /// Returns true if this is a pre-filter or both.
551    pub fn is_pre(&self) -> bool {
552        matches!(self, FilterType::Pre | FilterType::Both)
553    }
554
555    /// Returns true if this is a post-filter or both.
556    pub fn is_post(&self) -> bool {
557        matches!(self, FilterType::Post | FilterType::Both)
558    }
559
560    /// Returns true if this is both a pre and post filter.
561    pub fn is_both(&self) -> bool {
562        matches!(self, FilterType::Both)
563    }
564}
565
566/// A filter that processes requests and responses.
567#[async_trait::async_trait]
568pub trait Filter: fmt::Debug + Send + Sync {
569    /// Get the filter type.
570    fn filter_type(&self) -> FilterType;
571
572    /// Get the filter name.
573    fn name(&self) -> &str;
574
575    /// Process a request before it is sent to the target.
576    async fn pre_filter(&self, request: ProxyRequest) -> Result<ProxyRequest, ProxyError> {
577        // Default implementation: pass through the request unchanged
578        Ok(request)
579    }
580
581    /// Process a response after it is received from the target.
582    async fn post_filter(&self, _request: ProxyRequest, response: ProxyResponse) -> Result<ProxyResponse, ProxyError> {
583        // Default implementation: pass through the response unchanged
584        Ok(response)
585    }
586}
587
588/// A route that the proxy can forward requests to.
589#[derive(Debug, Clone)]
590pub struct Route {
591    /// The ID of the route (for logging and reference)
592    pub id: String,
593    /// The base URL of the target
594    pub target_base_url: String,
595    /// The path pattern that this route matches
596    pub path_pattern: String,
597    /// The filters that should be applied to this route
598    pub filters: Option<Vec<Arc<dyn Filter>>>,
599}
600
601/// A router that matches requests to routes.
602#[async_trait::async_trait]
603pub trait Router: fmt::Debug + Send + Sync {
604    /// Find a route for the given request.
605    async fn route(&self, request: &ProxyRequest) -> Result<Route, ProxyError>;
606
607    /// Get all routes managed by this router.
608    async fn get_routes(&self) -> Vec<Route>;
609
610    /// Add a new route to the router.
611    async fn add_route(&self, route: Route) -> Result<(), ProxyError>;
612
613    /// Remove a route from the router.
614    async fn remove_route(&self, route_id: &str) -> Result<(), ProxyError>;
615}