cardinal_proxy/
lib.rs

1mod utils;
2
3use crate::utils::requests::{
4    compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
5    set_upstream_host_headers,
6};
7use bytes::Bytes;
8use cardinal_base::context::CardinalContext;
9use cardinal_base::destinations::container::DestinationContainer;
10use cardinal_plugins::request_context::RequestContext;
11use cardinal_plugins::runner::MiddlewareResult;
12use pingora::http::ResponseHeader;
13use pingora::prelude::*;
14use pingora::protocols::Digest;
15use pingora::upstreams::peer::Peer;
16use std::sync::Arc;
17use tracing::{debug, error, info, warn};
18
19pub mod pingora {
20    pub use pingora::*;
21}
22
23#[derive(Debug, Clone)]
24pub enum HealthCheckStatus {
25    None,
26    Ready,
27    Unavailable {
28        status_code: u16,
29        reason: Option<String>,
30    },
31}
32
33pub trait CardinalContextProvider: Send + Sync {
34    fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
35    fn health_check(&self, _session: &Session) -> HealthCheckStatus {
36        HealthCheckStatus::None
37    }
38}
39
40#[derive(Clone)]
41pub struct StaticContextProvider {
42    context: Arc<CardinalContext>,
43}
44
45impl StaticContextProvider {
46    pub fn new(context: Arc<CardinalContext>) -> Self {
47        Self { context }
48    }
49}
50
51impl CardinalContextProvider for StaticContextProvider {
52    fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
53        Some(self.context.clone())
54    }
55}
56
57pub struct CardinalProxy {
58    provider: Arc<dyn CardinalContextProvider>,
59}
60
61impl CardinalProxy {
62    pub fn new(context: Arc<CardinalContext>) -> Self {
63        Self::builder(context).build()
64    }
65
66    pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
67        Self { provider }
68    }
69
70    pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
71        CardinalProxyBuilder::new(context)
72    }
73}
74
75pub struct CardinalProxyBuilder {
76    provider: Arc<dyn CardinalContextProvider>,
77}
78
79impl CardinalProxyBuilder {
80    pub fn new(context: Arc<CardinalContext>) -> Self {
81        Self {
82            provider: Arc::new(StaticContextProvider::new(context)),
83        }
84    }
85
86    pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
87        Self { provider }
88    }
89
90    pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
91        self.provider = provider;
92        self
93    }
94
95    pub fn build(self) -> CardinalProxy {
96        CardinalProxy::with_provider(self.provider)
97    }
98}
99
100#[async_trait::async_trait]
101impl ProxyHttp for CardinalProxy {
102    type CTX = Option<RequestContext>;
103
104    fn new_ctx(&self) -> Self::CTX {
105        None
106    }
107
108    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
109        let path = session.req_header().uri.path().to_string();
110        info!(%path, "Request received");
111
112        match self.provider.health_check(session) {
113            HealthCheckStatus::None => {}
114            HealthCheckStatus::Ready => {
115                debug!(%path, "Health check ready");
116                // Build a 200 OK header
117                let mut resp = ResponseHeader::build(200, None)?;
118                resp.insert_header("Content-Type", "text/plain")?;
119                resp.set_content_length("healthy\n".len())?;
120
121                // Send header + body to the client
122                session
123                    .write_response_header(Box::new(resp), /*end_of_stream*/ false)
124                    .await?;
125                session
126                    .write_response_body(Some(Bytes::from_static(b"healthy\n")), /*end*/ true)
127                    .await?;
128
129                // Returning Ok(true) means "handled", stop further processing.
130                return Ok(true);
131            }
132            HealthCheckStatus::Unavailable {
133                status_code,
134                reason,
135            } => {
136                if let Some(reason) = reason {
137                    warn!(%path, status = status_code, reason = %reason, "Health check failed");
138                } else {
139                    warn!(%path, status = status_code, "Health check failed");
140                }
141                let _ = session.respond_error(status_code).await;
142                return Ok(true);
143            }
144        }
145
146        let context = match self.provider.resolve(session) {
147            Some(ctx) => ctx,
148            None => {
149                warn!(%path, "No context found for request host");
150                let _ = session.respond_error(421).await;
151                return Ok(true);
152            }
153        };
154
155        let destination_container = context
156            .get::<DestinationContainer>()
157            .await
158            .map_err(|_| Error::new_str("Destination Container is not present"))?;
159
160        let force_path = context.config.server.force_path_parameter;
161        let backend =
162            match destination_container.get_backend_for_request(session.req_header(), force_path) {
163                Some(b) => b,
164                None => {
165                    warn!(%path, "No matching backend, returning 404");
166                    let _ = session.respond_error(404).await;
167                    return Ok(true);
168                }
169            };
170
171        let destination_name = backend.destination.name.clone();
172        let _ = set_upstream_host_headers(session, &backend);
173        info!(backend_id = %destination_name, "Routing to backend");
174
175        rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
176
177        let mut request_state = RequestContext::new(
178            context.clone(),
179            backend,
180            execution_context_from_request(session),
181        );
182
183        let plugin_runner = request_state.plugin_runner.clone();
184
185        let run_filters = plugin_runner
186            .run_request_filters(session, &mut request_state)
187            .await;
188
189        let res = match run_filters {
190            Ok(filter_result) => filter_result,
191            Err(err) => {
192                error!(%err, "Error running request filters");
193                let _ = session.respond_error(500).await;
194                return Ok(true);
195            }
196        };
197
198        *ctx = Some(request_state);
199
200        match res {
201            MiddlewareResult::Continue(resp_headers) => {
202                ctx.as_mut().unwrap().response_headers = Some(resp_headers);
203
204                Ok(false)
205            }
206            MiddlewareResult::Responded => Ok(true),
207        }
208    }
209
210    async fn upstream_peer(
211        &self,
212        _session: &mut Session,
213        ctx: &mut Self::CTX,
214    ) -> Result<Box<HttpPeer>> {
215        if let Some(state) = ctx.as_ref() {
216            // Determine origin parts for TLS and SNI
217            let (host, port, is_tls) = parse_origin(&state.backend.destination.url)
218                .map_err(|_| Error::new_str("Origin could not be parsed "))?;
219            let hostport = format!("{host}:{port}");
220
221            // Compose full upstream URL for logging with normalized scheme
222            let path_and_query = _session
223                .req_header()
224                .uri
225                .path_and_query()
226                .map(|pq| pq.as_str())
227                .unwrap_or("/");
228            let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
229
230            info!(%upstream_url, backend_id = %state.backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
231            debug!(upstream_origin = %hostport, "Connecting to upstream origin");
232
233            let mut peer = HttpPeer::new(&hostport, is_tls, host);
234            if let Some(opts) = peer.get_mut_peer_options() {
235                // Allow both HTTP/1.1 and HTTP/2 so plain HTTP backends keep working.
236                opts.set_http_version(2, 1);
237            }
238            let peer = Box::new(peer);
239            Ok(peer)
240        } else {
241            Err(Error::new(ErrorType::InternalError))
242        }
243    }
244
245    async fn connected_to_upstream(
246        &self,
247        _session: &mut Session,
248        reused: bool,
249        peer: &HttpPeer,
250        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
251        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
252        _digest: Option<&Digest>,
253        ctx: &mut Self::CTX,
254    ) -> Result<()> {
255        let backend_id = ctx
256            .as_ref()
257            .map(|b| b.backend.destination.name.as_str())
258            .unwrap_or("<unknown>");
259        info!(backend_id, reused, peer = %peer, "Connected to upstream");
260        Ok(())
261    }
262
263    async fn response_filter(
264        &self,
265        session: &mut Session,
266        upstream_response: &mut ResponseHeader,
267        ctx: &mut Self::CTX,
268    ) -> Result<()> {
269        if let Some(state) = ctx.as_mut() {
270            if let Some(resp_headers) = state.response_headers.take() {
271                for (key, val) in resp_headers {
272                    let _ = upstream_response.insert_header(key, val);
273                }
274            }
275
276            let runner = state.plugin_runner.clone();
277
278            runner
279                .run_response_filters(session, state, upstream_response)
280                .await;
281
282            if !state.cardinal_context.config.server.log_upstream_response {
283                return Ok(());
284            }
285
286            let status = upstream_response.status.as_u16();
287            let location = upstream_response
288                .headers
289                .get("location")
290                .and_then(|v| v.to_str().ok())
291                .map(|s| s.to_string());
292            let backend_id = &state.backend.destination.name;
293            if let Some(loc) = location {
294                info!(backend_id, status, location = %loc, "Upstream responded");
295            } else {
296                info!(backend_id, status, "Upstream responded");
297            }
298        }
299
300        Ok(())
301    }
302}