1pub mod req;
2pub mod retry;
3mod utils;
4
5use crate::req::ReqCtx;
6use crate::retry::RetryState;
7use crate::utils::requests::{
8 compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
9 set_upstream_host_headers,
10};
11use bytes::Bytes;
12use cardinal_base::context::CardinalContext;
13use cardinal_base::destinations::container::DestinationContainer;
14use cardinal_config::DestinationRetryBackoffType;
15use cardinal_plugins::request_context::{RequestContext, RequestContextBase};
16use cardinal_plugins::runner::MiddlewareResult;
17use pingora::http::ResponseHeader;
18use pingora::prelude::*;
19use pingora::protocols::Digest;
20use pingora::upstreams::peer::Peer;
21use std::sync::Arc;
22use std::time::Duration;
23use tracing::{debug, error, info, warn};
24
25pub mod pingora {
26 pub use pingora::*;
27}
28
29#[derive(Debug, Clone)]
30pub enum HealthCheckStatus {
31 None,
32 Ready,
33 Unavailable {
34 status_code: u16,
35 reason: Option<String>,
36 },
37}
38
39pub trait CardinalContextProvider: Send + Sync {
40 fn ctx(&self) -> ReqCtx {
41 ReqCtx::default()
42 }
43
44 fn resolve(&self, session: &Session, ctx: &mut ReqCtx) -> Option<Arc<CardinalContext>>;
45 fn health_check(&self, _session: &Session) -> HealthCheckStatus {
46 HealthCheckStatus::None
47 }
48
49 fn logging(&self, _session: &mut Session, _e: Option<&Error>, _ctx: &mut ReqCtx) {}
50}
51
52#[derive(Clone)]
53pub struct StaticContextProvider {
54 context: Arc<CardinalContext>,
55}
56
57impl StaticContextProvider {
58 pub fn new(context: Arc<CardinalContext>) -> Self {
59 Self { context }
60 }
61}
62
63impl CardinalContextProvider for StaticContextProvider {
64 fn resolve(&self, _session: &Session, _ctx: &mut ReqCtx) -> Option<Arc<CardinalContext>> {
65 Some(self.context.clone())
66 }
67}
68
69pub struct CardinalProxy {
70 provider: Arc<dyn CardinalContextProvider>,
71}
72
73impl CardinalProxy {
74 pub fn new(context: Arc<CardinalContext>) -> Self {
75 Self::builder(context).build()
76 }
77
78 pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
79 Self { provider }
80 }
81
82 pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
83 CardinalProxyBuilder::new(context)
84 }
85}
86
87pub struct CardinalProxyBuilder {
88 provider: Arc<dyn CardinalContextProvider>,
89}
90
91impl CardinalProxyBuilder {
92 pub fn new(context: Arc<CardinalContext>) -> Self {
93 Self {
94 provider: Arc::new(StaticContextProvider::new(context)),
95 }
96 }
97
98 pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
99 Self { provider }
100 }
101
102 pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
103 self.provider = provider;
104 self
105 }
106
107 pub fn build(self) -> CardinalProxy {
108 CardinalProxy::with_provider(self.provider)
109 }
110}
111
112#[async_trait::async_trait]
113impl ProxyHttp for CardinalProxy {
114 type CTX = ReqCtx;
115
116 fn new_ctx(&self) -> Self::CTX {
117 self.provider.ctx()
118 }
119
120 async fn logging(&self, _session: &mut Session, _e: Option<&Error>, ctx: &mut Self::CTX)
121 where
122 Self::CTX: Send + Sync,
123 {
124 self.provider.logging(_session, _e, ctx);
125 }
126
127 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
128 let path = session.req_header().uri.path().to_string();
129 info!(%path, "Request received");
130
131 match self.provider.health_check(session) {
132 HealthCheckStatus::None => {}
133 HealthCheckStatus::Ready => {
134 debug!(%path, "Health check ready");
135 let mut resp = ResponseHeader::build(200, None)?;
137 resp.insert_header("Content-Type", "text/plain")?;
138 resp.set_content_length("healthy\n".len())?;
139
140 session
142 .write_response_header(Box::new(resp), false)
143 .await?;
144 session
145 .write_response_body(Some(Bytes::from_static(b"healthy\n")), true)
146 .await?;
147
148 return Ok(true);
150 }
151 HealthCheckStatus::Unavailable {
152 status_code,
153 reason,
154 } => {
155 if let Some(reason) = reason {
156 warn!(%path, status = status_code, reason = %reason, "Health check failed");
157 } else {
158 warn!(%path, status = status_code, "Health check failed");
159 }
160 let _ = session.respond_error(status_code).await;
161 return Ok(true);
162 }
163 }
164
165 let context = match self.provider.resolve(session, ctx) {
166 Some(ctx) => ctx,
167 None => {
168 warn!(%path, "No context found for request host");
169 let _ = session.respond_error(421).await;
170 return Ok(true);
171 }
172 };
173
174 let destination_container = context
175 .get::<DestinationContainer>()
176 .await
177 .map_err(|_| Error::new_str("Destination Container is not present"))?;
178
179 let force_path = context.config.server.force_path_parameter;
180 let backend =
181 match destination_container.get_backend_for_request(session.req_header(), force_path) {
182 Some(b) => b,
183 None => {
184 warn!(%path, "No matching backend, returning 404");
185 let _ = session.respond_error(404).await;
186 return Ok(true);
187 }
188 };
189
190 let destination_name = backend.destination.name.clone();
191 let _ = set_upstream_host_headers(session, &backend);
192 info!(backend_id = %destination_name, "Routing to backend");
193
194 rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
195
196 let mut request_state = RequestContext::new(
197 context.clone(),
198 backend,
199 execution_context_from_request(session),
200 );
201
202 let plugin_runner = request_state.plugin_runner.clone();
203
204 let run_filters = plugin_runner
205 .run_request_filters(session, &mut request_state)
206 .await;
207
208 let res = match run_filters {
209 Ok(filter_result) => filter_result,
210 Err(err) => {
211 error!(%err, "Error running request filters");
212 let _ = session.respond_error(500).await;
213 return Ok(true);
214 }
215 };
216
217 ctx.set_resolved_request(request_state);
218
219 match res {
220 MiddlewareResult::Continue(resp_headers) => {
221 ctx.ctx_base
222 .resolved_request
223 .as_mut()
224 .unwrap()
225 .response_headers = Some(resp_headers);
226
227 Ok(false)
228 }
229 MiddlewareResult::Responded => Ok(true),
230 }
231 }
232
233 fn fail_to_connect(
234 &self,
235 _session: &mut Session,
236 _peer: &HttpPeer,
237 ctx: &mut Self::CTX,
238 mut e: Box<Error>,
239 ) -> Box<Error> {
240 let backend_config = ctx.req_unsafe().backend.destination.retry.clone();
241 if let Some(mut retry_state) = ctx.retry_state.take() {
242 retry_state.register_attempt();
243 if retry_state.can_retry() {
244 e.set_retry(true);
245 ctx.retry_state = Some(retry_state);
246 } else {
247 ctx.retry_state = None;
248 }
249 } else if let Some(retry_config) = backend_config {
250 let mut retry_state = RetryState::from(retry_config);
251 retry_state.register_attempt();
252 if retry_state.can_retry() {
253 e.set_retry(true);
254 ctx.retry_state = Some(retry_state);
255 } else {
256 ctx.retry_state = None;
257 }
258 }
259
260 e
261 }
262
263 async fn upstream_peer(
264 &self,
265 _session: &mut Session,
266 ctx: &mut Self::CTX,
267 ) -> Result<Box<HttpPeer>> {
268 if let Some(retry_state) = ctx.retry_state.as_mut() {
269 if !retry_state.sleep_if_retry_allowed().await {
270 ctx.retry_state = None;
271 return Err(Error::new_str("Retry attempts exhausted"));
272 }
273 }
274
275 let backend = &ctx.req_unsafe().backend;
276 let (host, port, is_tls) = parse_origin(&backend.destination.url)
278 .map_err(|_| Error::new_str("Origin could not be parsed "))?;
279 let hostport = format!("{host}:{port}");
280
281 let path_and_query = _session
283 .req_header()
284 .uri
285 .path_and_query()
286 .map(|pq| pq.as_str())
287 .unwrap_or("/");
288 let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
289
290 info!(%upstream_url, backend_id = %&backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
291 debug!(upstream_origin = %hostport, "Connecting to upstream origin");
292
293 let mut peer = HttpPeer::new(&hostport, is_tls, host);
294 if let Some(opts) = peer.get_mut_peer_options() {
295 opts.set_http_version(2, 1);
297 if let Some(timeout) = &backend.destination.timeout {
298 opts.idle_timeout = timeout
299 .idle
300 .as_ref()
301 .map(|idle| Duration::from_millis(*idle));
302 opts.write_timeout = timeout
303 .write
304 .as_ref()
305 .map(|idle| Duration::from_millis(*idle));
306 opts.total_connection_timeout = timeout
307 .connect
308 .as_ref()
309 .map(|idle| Duration::from_millis(*idle));
310 opts.read_timeout = timeout
311 .read
312 .as_ref()
313 .map(|idle| Duration::from_millis(*idle));
314 }
315 }
316 let peer = Box::new(peer);
317 Ok(peer)
318 }
319
320 async fn connected_to_upstream(
321 &self,
322 _session: &mut Session,
323 reused: bool,
324 peer: &HttpPeer,
325 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
326 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
327 _digest: Option<&Digest>,
328 ctx: &mut Self::CTX,
329 ) -> Result<()> {
330 ctx.retry_state = None;
331 let backend_id = ctx.req_unsafe().backend.destination.name.to_string();
332
333 info!(backend_id, reused, peer = %peer, "Connected to upstream");
334 Ok(())
335 }
336
337 async fn response_filter(
338 &self,
339 session: &mut Session,
340 upstream_response: &mut ResponseHeader,
341 ctx: &mut Self::CTX,
342 ) -> Result<()> {
343 if let Some(resp_headers) = ctx.req_unsafe_mut().response_headers.take() {
344 for (key, val) in resp_headers {
345 let _ = upstream_response.insert_header(key, val);
346 }
347 }
348
349 {
350 {
352 let runner = {
353 let req = ctx.req_unsafe_mut();
354 req.plugin_runner.clone()
355 };
356
357 runner
358 .run_response_filters(
359 session,
360 {
361 let req = ctx.req_unsafe_mut();
362 req
363 },
364 upstream_response,
365 )
366 .await;
367 }
368
369 ctx.set("status", upstream_response.status.as_str());
370
371 let req = ctx.req_unsafe_mut();
373
374 if !req.cardinal_context.config.server.log_upstream_response {
375 return Ok(());
376 }
377
378 let status = upstream_response.status.as_u16();
379 let location = upstream_response
380 .headers
381 .get("location")
382 .and_then(|v| v.to_str().ok())
383 .map(str::to_string);
384 let backend_id = &req.backend.destination.name;
385
386 match location {
387 Some(loc) => info!(backend_id, status, location = %loc, "Upstream responded"),
388 None => info!(backend_id, status, "Upstream responded"),
389 }
390 }
391
392 Ok(())
393 }
394}