cardinal_proxy/
lib.rs

1mod utils;
2
3use crate::utils::requests::{
4    compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
5    set_upstream_host_headers,
6};
7use cardinal_base::context::CardinalContext;
8use cardinal_base::destinations::container::DestinationContainer;
9use cardinal_plugins::request_context::RequestContext;
10use cardinal_plugins::runner::MiddlewareResult;
11use pingora::http::ResponseHeader;
12use pingora::prelude::*;
13use pingora::protocols::Digest;
14use pingora::upstreams::peer::Peer;
15use std::sync::Arc;
16use tracing::{debug, error, info, warn};
17
18pub mod pingora {
19    pub use pingora::*;
20}
21
22#[derive(Debug, Clone)]
23pub enum HealthCheckStatus {
24    Ready,
25    Unavailable {
26        status_code: u16,
27        reason: Option<String>,
28    },
29}
30
31pub trait CardinalContextProvider: Send + Sync {
32    fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
33    fn health_check(&self, _session: &Session) -> HealthCheckStatus {
34        HealthCheckStatus::Ready
35    }
36}
37
38#[derive(Clone)]
39pub struct StaticContextProvider {
40    context: Arc<CardinalContext>,
41}
42
43impl StaticContextProvider {
44    pub fn new(context: Arc<CardinalContext>) -> Self {
45        Self { context }
46    }
47}
48
49impl CardinalContextProvider for StaticContextProvider {
50    fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
51        Some(self.context.clone())
52    }
53}
54
55pub struct CardinalProxy {
56    provider: Arc<dyn CardinalContextProvider>,
57}
58
59impl CardinalProxy {
60    pub fn new(context: Arc<CardinalContext>) -> Self {
61        Self::builder(context).build()
62    }
63
64    pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
65        Self { provider }
66    }
67
68    pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
69        CardinalProxyBuilder::new(context)
70    }
71}
72
73pub struct CardinalProxyBuilder {
74    provider: Arc<dyn CardinalContextProvider>,
75}
76
77impl CardinalProxyBuilder {
78    pub fn new(context: Arc<CardinalContext>) -> Self {
79        Self {
80            provider: Arc::new(StaticContextProvider::new(context)),
81        }
82    }
83
84    pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
85        Self { provider }
86    }
87
88    pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
89        self.provider = provider;
90        self
91    }
92
93    pub fn build(self) -> CardinalProxy {
94        CardinalProxy::with_provider(self.provider)
95    }
96}
97
98#[async_trait::async_trait]
99impl ProxyHttp for CardinalProxy {
100    type CTX = Option<RequestContext>;
101
102    fn new_ctx(&self) -> Self::CTX {
103        None
104    }
105
106    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
107        let path = session.req_header().uri.path().to_string();
108        info!(%path, "Request received");
109
110        match self.provider.health_check(session) {
111            HealthCheckStatus::Ready => {}
112            HealthCheckStatus::Unavailable {
113                status_code,
114                reason,
115            } => {
116                if let Some(reason) = reason {
117                    warn!(%path, status = status_code, reason = %reason, "Health check failed");
118                } else {
119                    warn!(%path, status = status_code, "Health check failed");
120                }
121                let _ = session.respond_error(status_code).await;
122                return Ok(true);
123            }
124        }
125
126        let context = match self.provider.resolve(session) {
127            Some(ctx) => ctx,
128            None => {
129                warn!(%path, "No context found for request host");
130                let _ = session.respond_error(421).await;
131                return Ok(true);
132            }
133        };
134
135        let destination_container = context
136            .get::<DestinationContainer>()
137            .await
138            .map_err(|_| Error::new_str("Destination Container is not present"))?;
139
140        let force_path = context.config.server.force_path_parameter;
141        let backend =
142            match destination_container.get_backend_for_request(session.req_header(), force_path) {
143                Some(b) => b,
144                None => {
145                    warn!(%path, "No matching backend, returning 404");
146                    let _ = session.respond_error(404).await;
147                    return Ok(true);
148                }
149            };
150
151        let destination_name = backend.destination.name.clone();
152        let _ = set_upstream_host_headers(session, &backend);
153        info!(backend_id = %destination_name, "Routing to backend");
154
155        rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
156
157        let mut request_state = RequestContext::new(
158            context.clone(),
159            backend,
160            execution_context_from_request(session),
161        );
162
163        let plugin_runner = request_state.plugin_runner.clone();
164
165        let run_filters = plugin_runner
166            .run_request_filters(session, &mut request_state)
167            .await;
168
169        let res = match run_filters {
170            Ok(filter_result) => filter_result,
171            Err(err) => {
172                error!(%err, "Error running request filters");
173                let _ = session.respond_error(500).await;
174                return Ok(true);
175            }
176        };
177
178        *ctx = Some(request_state);
179
180        match res {
181            MiddlewareResult::Continue(resp_headers) => {
182                ctx.as_mut().unwrap().response_headers = Some(resp_headers);
183
184                Ok(false)
185            }
186            MiddlewareResult::Responded => Ok(true),
187        }
188    }
189
190    async fn upstream_peer(
191        &self,
192        _session: &mut Session,
193        ctx: &mut Self::CTX,
194    ) -> Result<Box<HttpPeer>> {
195        if let Some(state) = ctx.as_ref() {
196            // Determine origin parts for TLS and SNI
197            let (host, port, is_tls) = parse_origin(&state.backend.destination.url)
198                .map_err(|_| Error::new_str("Origin could not be parsed "))?;
199            let hostport = format!("{host}:{port}");
200
201            // Compose full upstream URL for logging with normalized scheme
202            let path_and_query = _session
203                .req_header()
204                .uri
205                .path_and_query()
206                .map(|pq| pq.as_str())
207                .unwrap_or("/");
208            let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
209
210            info!(%upstream_url, backend_id = %state.backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
211            debug!(upstream_origin = %hostport, "Connecting to upstream origin");
212
213            let mut peer = HttpPeer::new(&hostport, is_tls, host);
214            if let Some(opts) = peer.get_mut_peer_options() {
215                // Allow both HTTP/1.1 and HTTP/2 so plain HTTP backends keep working.
216                opts.set_http_version(2, 1);
217            }
218            let peer = Box::new(peer);
219            Ok(peer)
220        } else {
221            Err(Error::new(ErrorType::InternalError))
222        }
223    }
224
225    async fn connected_to_upstream(
226        &self,
227        _session: &mut Session,
228        reused: bool,
229        peer: &HttpPeer,
230        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
231        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
232        _digest: Option<&Digest>,
233        ctx: &mut Self::CTX,
234    ) -> Result<()> {
235        let backend_id = ctx
236            .as_ref()
237            .map(|b| b.backend.destination.name.as_str())
238            .unwrap_or("<unknown>");
239        info!(backend_id, reused, peer = %peer, "Connected to upstream");
240        Ok(())
241    }
242
243    async fn response_filter(
244        &self,
245        session: &mut Session,
246        upstream_response: &mut ResponseHeader,
247        ctx: &mut Self::CTX,
248    ) -> Result<()> {
249        if let Some(state) = ctx.as_mut() {
250            if let Some(resp_headers) = state.response_headers.take() {
251                for (key, val) in resp_headers {
252                    let _ = upstream_response.insert_header(key, val);
253                }
254            }
255
256            let runner = state.plugin_runner.clone();
257
258            runner
259                .run_response_filters(session, state, upstream_response)
260                .await;
261
262            if !state.cardinal_context.config.server.log_upstream_response {
263                return Ok(());
264            }
265
266            let status = upstream_response.status.as_u16();
267            let location = upstream_response
268                .headers
269                .get("location")
270                .and_then(|v| v.to_str().ok())
271                .map(|s| s.to_string());
272            let backend_id = &state.backend.destination.name;
273            if let Some(loc) = location {
274                info!(backend_id, status, location = %loc, "Upstream responded");
275            } else {
276                info!(backend_id, status, "Upstream responded");
277            }
278        }
279
280        Ok(())
281    }
282}