cardinal_proxy/
lib.rs

1pub mod req;
2pub mod retry;
3mod utils;
4
5use crate::req::ReqCtx;
6use crate::retry::RetryState;
7use crate::utils::requests::{
8    compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
9    set_upstream_host_headers,
10};
11use bytes::Bytes;
12use cardinal_base::context::CardinalContext;
13use cardinal_base::destinations::container::DestinationContainer;
14use cardinal_config::DestinationRetryBackoffType;
15use cardinal_plugins::request_context::{RequestContext, RequestContextBase};
16use cardinal_plugins::runner::MiddlewareResult;
17use pingora::http::ResponseHeader;
18use pingora::prelude::*;
19use pingora::protocols::Digest;
20use pingora::upstreams::peer::Peer;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24
25pub mod pingora {
26    pub use pingora::*;
27}
28
29#[derive(Debug, Clone)]
30pub enum HealthCheckStatus {
31    None,
32    Ready,
33    Unavailable {
34        status_code: u16,
35        reason: Option<String>,
36    },
37}
38
39pub trait CardinalContextProvider: Send + Sync {
40    fn ctx(&self) -> ReqCtx {
41        ReqCtx::default()
42    }
43
44    fn resolve(&self, session: &Session, ctx: &mut ReqCtx) -> Option<Arc<CardinalContext>>;
45    fn health_check(&self, _session: &Session) -> HealthCheckStatus {
46        HealthCheckStatus::None
47    }
48
49    fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut ReqCtx) {}
50}
51
52#[derive(Clone)]
53pub struct StaticContextProvider {
54    context: Arc<CardinalContext>,
55}
56
57impl StaticContextProvider {
58    pub fn new(context: Arc<CardinalContext>) -> Self {
59        Self { context }
60    }
61}
62
63impl CardinalContextProvider for StaticContextProvider {
64    fn resolve(&self, _session: &Session, _ctx: &mut ReqCtx) -> Option<Arc<CardinalContext>> {
65        Some(self.context.clone())
66    }
67}
68
69pub struct CardinalProxy {
70    provider: Arc<dyn CardinalContextProvider>,
71}
72
73impl CardinalProxy {
74    pub fn new(context: Arc<CardinalContext>) -> Self {
75        Self::builder(context).build()
76    }
77
78    pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
79        Self { provider }
80    }
81
82    pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
83        CardinalProxyBuilder::new(context)
84    }
85}
86
87pub struct CardinalProxyBuilder {
88    provider: Arc<dyn CardinalContextProvider>,
89}
90
91impl CardinalProxyBuilder {
92    pub fn new(context: Arc<CardinalContext>) -> Self {
93        Self {
94            provider: Arc::new(StaticContextProvider::new(context)),
95        }
96    }
97
98    pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
99        Self { provider }
100    }
101
102    pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
103        self.provider = provider;
104        self
105    }
106
107    pub fn build(self) -> CardinalProxy {
108        CardinalProxy::with_provider(self.provider)
109    }
110}
111
112#[async_trait::async_trait]
113impl ProxyHttp for CardinalProxy {
114    type CTX = ReqCtx;
115
116    fn new_ctx(&self) -> Self::CTX {
117        self.provider.ctx()
118    }
119
120    async fn logging(&self, _session: &mut Session, _e: Option<&Error>, ctx: &mut Self::CTX)
121    where
122        Self::CTX: Send + Sync,
123    {
124        self.provider.logging(_session, _e, ctx);
125    }
126
127    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
128        let path = session.req_header().uri.path().to_string();
129        info!(%path, "Request received");
130
131        match self.provider.health_check(session) {
132            HealthCheckStatus::None => {}
133            HealthCheckStatus::Ready => {
134                debug!(%path, "Health check ready");
135                // Build a 200 OK header
136                let mut resp = ResponseHeader::build(200, None)?;
137                resp.insert_header("Content-Type", "text/plain")?;
138                resp.set_content_length("healthy\n".len())?;
139
140                // Send header + body to the client
141                session
142                    .write_response_header(Box::new(resp), /*end_of_stream*/ false)
143                    .await?;
144                session
145                    .write_response_body(Some(Bytes::from_static(b"healthy\n")), /*end*/ true)
146                    .await?;
147
148                // Returning Ok(true) means "handled", stop further processing.
149                return Ok(true);
150            }
151            HealthCheckStatus::Unavailable {
152                status_code,
153                reason,
154            } => {
155                if let Some(reason) = reason {
156                    warn!(%path, status = status_code, reason = %reason, "Health check failed");
157                } else {
158                    warn!(%path, status = status_code, "Health check failed");
159                }
160                let _ = session.respond_error(status_code).await;
161                return Ok(true);
162            }
163        }
164
165        let context = match self.provider.resolve(session, ctx) {
166            Some(ctx) => ctx,
167            None => {
168                warn!(%path, "No context found for request host");
169                let _ = session.respond_error(421).await;
170                return Ok(true);
171            }
172        };
173
174        let destination_container = context
175            .get::<DestinationContainer>()
176            .await
177            .map_err(|_| Error::new_str("Destination Container is not present"))?;
178
179        let force_path = context.config.server.force_path_parameter;
180        let backend =
181            match destination_container.get_backend_for_request(session.req_header(), force_path) {
182                Some(b) => b,
183                None => {
184                    warn!(%path, "No matching backend, returning 404");
185                    let _ = session.respond_error(404).await;
186                    return Ok(true);
187                }
188            };
189
190        let destination_name = backend.destination.name.clone();
191        let _ = set_upstream_host_headers(session, &backend);
192        info!(backend_id = %destination_name, "Routing to backend");
193
194        rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
195
196        let mut request_state = RequestContext::new(
197            context.clone(),
198            backend,
199            execution_context_from_request(session),
200        );
201
202        let plugin_runner = request_state.plugin_runner.clone();
203
204        let run_filters = plugin_runner
205            .run_request_filters(session, &mut request_state)
206            .await;
207
208        let res = match run_filters {
209            Ok(filter_result) => filter_result,
210            Err(err) => {
211                error!(%err, "Error running request filters");
212                let _ = session.respond_error(500).await;
213                return Ok(true);
214            }
215        };
216
217        ctx.set_resolved_request(request_state);
218
219        match res {
220            MiddlewareResult::Continue(resp_headers) => {
221                ctx.ctx_base
222                    .resolved_request
223                    .as_mut()
224                    .unwrap()
225                    .response_headers = Some(resp_headers);
226
227                Ok(false)
228            }
229            MiddlewareResult::Responded => Ok(true),
230        }
231    }
232
233    fn fail_to_connect(
234        &self,
235        _session: &mut Session,
236        _peer: &HttpPeer,
237        ctx: &mut Self::CTX,
238        mut e: Box<Error>,
239    ) -> Box<Error> {
240        let backend_config = ctx.req_unsafe().backend.destination.retry.clone();
241        if let Some(mut retry_state) = ctx.retry_state.take() {
242            retry_state.register_attempt();
243            if retry_state.can_retry() {
244                e.set_retry(true);
245                ctx.retry_state = Some(retry_state);
246            } else {
247                ctx.retry_state = None;
248            }
249        } else if let Some(retry_config) = backend_config {
250            let mut retry_state = RetryState::from(retry_config);
251            retry_state.register_attempt();
252            if retry_state.can_retry() {
253                e.set_retry(true);
254                ctx.retry_state = Some(retry_state);
255            } else {
256                ctx.retry_state = None;
257            }
258        }
259
260        e
261    }
262
263    async fn upstream_peer(
264        &self,
265        _session: &mut Session,
266        ctx: &mut Self::CTX,
267    ) -> Result<Box<HttpPeer>> {
268        if let Some(retry_state) = ctx.retry_state.as_mut() {
269            if !retry_state.sleep_if_retry_allowed().await {
270                ctx.retry_state = None;
271                return Err(Error::new_str("Retry attempts exhausted"));
272            }
273        }
274
275        let backend = &ctx.req_unsafe().backend;
276        // Determine origin parts for TLS and SNI
277        let (host, port, is_tls) = parse_origin(&backend.destination.url)
278            .map_err(|_| Error::new_str("Origin could not be parsed "))?;
279        let hostport = format!("{host}:{port}");
280
281        // Compose full upstream URL for logging with normalized scheme
282        let path_and_query = _session
283            .req_header()
284            .uri
285            .path_and_query()
286            .map(|pq| pq.as_str())
287            .unwrap_or("/");
288        let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
289
290        info!(%upstream_url, backend_id = %&backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
291        debug!(upstream_origin = %hostport, "Connecting to upstream origin");
292
293        let mut peer = HttpPeer::new(&hostport, is_tls, host);
294        if let Some(opts) = peer.get_mut_peer_options() {
295            // Allow both HTTP/1.1 and HTTP/2 so plain HTTP backends keep working.
296            opts.set_http_version(2, 1);
297            if let Some(timeout) = &backend.destination.timeout {
298                opts.idle_timeout = timeout
299                    .idle
300                    .as_ref()
301                    .map(|idle| Duration::from_millis(*idle));
302                opts.write_timeout = timeout
303                    .write
304                    .as_ref()
305                    .map(|idle| Duration::from_millis(*idle));
306                opts.total_connection_timeout = timeout
307                    .connect
308                    .as_ref()
309                    .map(|idle| Duration::from_millis(*idle));
310                opts.read_timeout = timeout
311                    .read
312                    .as_ref()
313                    .map(|idle| Duration::from_millis(*idle));
314            }
315        }
316        let peer = Box::new(peer);
317        Ok(peer)
318    }
319
320    async fn connected_to_upstream(
321        &self,
322        _session: &mut Session,
323        reused: bool,
324        peer: &HttpPeer,
325        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
326        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
327        _digest: Option<&Digest>,
328        ctx: &mut Self::CTX,
329    ) -> Result<()> {
330        ctx.retry_state = None;
331        let backend_id = ctx.req_unsafe().backend.destination.name.to_string();
332
333        info!(backend_id, reused, peer = %peer, "Connected to upstream");
334        Ok(())
335    }
336
337    async fn response_filter(
338        &self,
339        session: &mut Session,
340        upstream_response: &mut ResponseHeader,
341        ctx: &mut Self::CTX,
342    ) -> Result<()> {
343        if let Some(resp_headers) = ctx.req_unsafe_mut().response_headers.take() {
344            for (key, val) in resp_headers {
345                let _ = upstream_response.insert_header(key, val);
346            }
347        }
348
349        {
350            // Run response filters first
351            {
352                let runner = {
353                    let req = ctx.req_unsafe_mut();
354                    req.plugin_runner.clone()
355                };
356
357                runner
358                    .run_response_filters(
359                        session,
360                        {
361                            let req = ctx.req_unsafe_mut();
362                            req
363                        },
364                        upstream_response,
365                    )
366                    .await;
367            }
368
369            ctx.set("status", upstream_response.status.as_str());
370
371            // Safe to get another mutable reference now
372            let req = ctx.req_unsafe_mut();
373
374            if !req.cardinal_context.config.server.log_upstream_response {
375                return Ok(());
376            }
377
378            let status = upstream_response.status.as_u16();
379            let location = upstream_response
380                .headers
381                .get("location")
382                .and_then(|v| v.to_str().ok())
383                .map(str::to_string);
384            let backend_id = &req.backend.destination.name;
385
386            match location {
387                Some(loc) => info!(backend_id, status, location = %loc, "Upstream responded"),
388                None => info!(backend_id, status, "Upstream responded"),
389            }
390        }
391
392        Ok(())
393    }
394}