1mod utils;
2
3use crate::utils::requests::{
4 compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
5 set_upstream_host_headers,
6};
7use cardinal_base::context::CardinalContext;
8use cardinal_base::destinations::container::DestinationContainer;
9use cardinal_plugins::request_context::RequestContext;
10use cardinal_plugins::runner::MiddlewareResult;
11use pingora::http::ResponseHeader;
12use pingora::prelude::*;
13use pingora::protocols::Digest;
14use pingora::upstreams::peer::Peer;
15use std::sync::Arc;
16use tracing::{debug, error, info, warn};
17
18pub mod pingora {
19 pub use pingora::*;
20}
21
22#[derive(Debug, Clone)]
23pub enum HealthCheckStatus {
24 Ready,
25 Unavailable {
26 status_code: u16,
27 reason: Option<String>,
28 },
29}
30
31pub trait CardinalContextProvider: Send + Sync {
32 fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
33 fn health_check(&self, _session: &Session) -> HealthCheckStatus {
34 HealthCheckStatus::Ready
35 }
36}
37
38#[derive(Clone)]
39pub struct StaticContextProvider {
40 context: Arc<CardinalContext>,
41}
42
43impl StaticContextProvider {
44 pub fn new(context: Arc<CardinalContext>) -> Self {
45 Self { context }
46 }
47}
48
49impl CardinalContextProvider for StaticContextProvider {
50 fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
51 Some(self.context.clone())
52 }
53}
54
55pub struct CardinalProxy {
56 provider: Arc<dyn CardinalContextProvider>,
57}
58
59impl CardinalProxy {
60 pub fn new(context: Arc<CardinalContext>) -> Self {
61 Self::builder(context).build()
62 }
63
64 pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
65 Self { provider }
66 }
67
68 pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
69 CardinalProxyBuilder::new(context)
70 }
71}
72
73pub struct CardinalProxyBuilder {
74 provider: Arc<dyn CardinalContextProvider>,
75}
76
77impl CardinalProxyBuilder {
78 pub fn new(context: Arc<CardinalContext>) -> Self {
79 Self {
80 provider: Arc::new(StaticContextProvider::new(context)),
81 }
82 }
83
84 pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
85 Self { provider }
86 }
87
88 pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
89 self.provider = provider;
90 self
91 }
92
93 pub fn build(self) -> CardinalProxy {
94 CardinalProxy::with_provider(self.provider)
95 }
96}
97
98#[async_trait::async_trait]
99impl ProxyHttp for CardinalProxy {
100 type CTX = Option<RequestContext>;
101
102 fn new_ctx(&self) -> Self::CTX {
103 None
104 }
105
106 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
107 let path = session.req_header().uri.path().to_string();
108 info!(%path, "Request received");
109
110 match self.provider.health_check(session) {
111 HealthCheckStatus::Ready => {}
112 HealthCheckStatus::Unavailable {
113 status_code,
114 reason,
115 } => {
116 if let Some(reason) = reason {
117 warn!(%path, status = status_code, reason = %reason, "Health check failed");
118 } else {
119 warn!(%path, status = status_code, "Health check failed");
120 }
121 let _ = session.respond_error(status_code).await;
122 return Ok(true);
123 }
124 }
125
126 let context = match self.provider.resolve(session) {
127 Some(ctx) => ctx,
128 None => {
129 warn!(%path, "No context found for request host");
130 let _ = session.respond_error(421).await;
131 return Ok(true);
132 }
133 };
134
135 let destination_container = context
136 .get::<DestinationContainer>()
137 .await
138 .map_err(|_| Error::new_str("Destination Container is not present"))?;
139
140 let force_path = context.config.server.force_path_parameter;
141 let backend =
142 match destination_container.get_backend_for_request(session.req_header(), force_path) {
143 Some(b) => b,
144 None => {
145 warn!(%path, "No matching backend, returning 404");
146 let _ = session.respond_error(404).await;
147 return Ok(true);
148 }
149 };
150
151 let destination_name = backend.destination.name.clone();
152 let _ = set_upstream_host_headers(session, &backend);
153 info!(backend_id = %destination_name, "Routing to backend");
154
155 rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
156
157 let mut request_state = RequestContext::new(
158 context.clone(),
159 backend,
160 execution_context_from_request(session),
161 );
162
163 let plugin_runner = request_state.plugin_runner.clone();
164
165 let run_filters = plugin_runner
166 .run_request_filters(session, &mut request_state)
167 .await;
168
169 let res = match run_filters {
170 Ok(filter_result) => filter_result,
171 Err(err) => {
172 error!(%err, "Error running request filters");
173 let _ = session.respond_error(500).await;
174 return Ok(true);
175 }
176 };
177
178 *ctx = Some(request_state);
179
180 match res {
181 MiddlewareResult::Continue(resp_headers) => {
182 ctx.as_mut().unwrap().response_headers = Some(resp_headers);
183
184 Ok(false)
185 }
186 MiddlewareResult::Responded => Ok(true),
187 }
188 }
189
190 async fn upstream_peer(
191 &self,
192 _session: &mut Session,
193 ctx: &mut Self::CTX,
194 ) -> Result<Box<HttpPeer>> {
195 if let Some(state) = ctx.as_ref() {
196 let (host, port, is_tls) = parse_origin(&state.backend.destination.url)
198 .map_err(|_| Error::new_str("Origin could not be parsed "))?;
199 let hostport = format!("{host}:{port}");
200
201 let path_and_query = _session
203 .req_header()
204 .uri
205 .path_and_query()
206 .map(|pq| pq.as_str())
207 .unwrap_or("/");
208 let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
209
210 info!(%upstream_url, backend_id = %state.backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
211 debug!(upstream_origin = %hostport, "Connecting to upstream origin");
212
213 let mut peer = HttpPeer::new(&hostport, is_tls, host);
214 if let Some(opts) = peer.get_mut_peer_options() {
215 opts.set_http_version(2, 1);
217 }
218 let peer = Box::new(peer);
219 Ok(peer)
220 } else {
221 Err(Error::new(ErrorType::InternalError))
222 }
223 }
224
225 async fn connected_to_upstream(
226 &self,
227 _session: &mut Session,
228 reused: bool,
229 peer: &HttpPeer,
230 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
231 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
232 _digest: Option<&Digest>,
233 ctx: &mut Self::CTX,
234 ) -> Result<()> {
235 let backend_id = ctx
236 .as_ref()
237 .map(|b| b.backend.destination.name.as_str())
238 .unwrap_or("<unknown>");
239 info!(backend_id, reused, peer = %peer, "Connected to upstream");
240 Ok(())
241 }
242
243 async fn response_filter(
244 &self,
245 session: &mut Session,
246 upstream_response: &mut ResponseHeader,
247 ctx: &mut Self::CTX,
248 ) -> Result<()> {
249 if let Some(state) = ctx.as_mut() {
250 if let Some(resp_headers) = state.response_headers.take() {
251 for (key, val) in resp_headers {
252 let _ = upstream_response.insert_header(key, val);
253 }
254 }
255
256 let runner = state.plugin_runner.clone();
257
258 runner
259 .run_response_filters(session, state, upstream_response)
260 .await;
261
262 if !state.cardinal_context.config.server.log_upstream_response {
263 return Ok(());
264 }
265
266 let status = upstream_response.status.as_u16();
267 let location = upstream_response
268 .headers
269 .get("location")
270 .and_then(|v| v.to_str().ok())
271 .map(|s| s.to_string());
272 let backend_id = &state.backend.destination.name;
273 if let Some(loc) = location {
274 info!(backend_id, status, location = %loc, "Upstream responded");
275 } else {
276 info!(backend_id, status, "Upstream responded");
277 }
278 }
279
280 Ok(())
281 }
282}