1mod utils;
2
3use crate::utils::requests::{
4 compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
5 set_upstream_host_headers,
6};
7use cardinal_base::context::CardinalContext;
8use cardinal_base::destinations::container::DestinationContainer;
9use cardinal_plugins::request_context::RequestContext;
10use cardinal_plugins::runner::MiddlewareResult;
11use pingora::http::ResponseHeader;
12use pingora::prelude::*;
13use pingora::protocols::Digest;
14use pingora::upstreams::peer::Peer;
15use std::sync::Arc;
16use tracing::{debug, error, info, warn};
17
18pub mod pingora {
19 pub use pingora::*;
20}
21
22pub trait CardinalContextProvider: Send + Sync {
23 fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
24}
25
26#[derive(Clone)]
27pub struct StaticContextProvider {
28 context: Arc<CardinalContext>,
29}
30
31impl StaticContextProvider {
32 pub fn new(context: Arc<CardinalContext>) -> Self {
33 Self { context }
34 }
35}
36
37impl CardinalContextProvider for StaticContextProvider {
38 fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
39 Some(self.context.clone())
40 }
41}
42
43pub struct CardinalProxy {
44 provider: Arc<dyn CardinalContextProvider>,
45}
46
47impl CardinalProxy {
48 pub fn new(context: Arc<CardinalContext>) -> Self {
49 Self::builder(context).build()
50 }
51
52 pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
53 Self { provider }
54 }
55
56 pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
57 CardinalProxyBuilder::new(context)
58 }
59}
60
61pub struct CardinalProxyBuilder {
62 provider: Arc<dyn CardinalContextProvider>,
63}
64
65impl CardinalProxyBuilder {
66 pub fn new(context: Arc<CardinalContext>) -> Self {
67 Self {
68 provider: Arc::new(StaticContextProvider::new(context)),
69 }
70 }
71
72 pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
73 Self { provider }
74 }
75
76 pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
77 self.provider = provider;
78 self
79 }
80
81 pub fn build(self) -> CardinalProxy {
82 CardinalProxy::with_provider(self.provider)
83 }
84}
85
86#[async_trait::async_trait]
87impl ProxyHttp for CardinalProxy {
88 type CTX = Option<RequestContext>;
89
90 fn new_ctx(&self) -> Self::CTX {
91 None
92 }
93
94 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
95 let path = session.req_header().uri.path().to_string();
96 info!(%path, "Request received");
97
98 let context = match self.provider.resolve(session) {
99 Some(ctx) => ctx,
100 None => {
101 warn!(%path, "No context found for request host");
102 let _ = session.respond_error(421).await;
103 return Ok(true);
104 }
105 };
106
107 let destination_container = context
108 .get::<DestinationContainer>()
109 .await
110 .map_err(|_| Error::new_str("Destination Container is not present"))?;
111
112 let force_path = context.config.server.force_path_parameter;
113 let backend =
114 match destination_container.get_backend_for_request(session.req_header(), force_path) {
115 Some(b) => b.clone(),
116 None => {
117 warn!(%path, "No matching backend, returning 404");
118 let _ = session.respond_error(404).await;
119 return Ok(true);
120 }
121 };
122
123 let destination_name = backend.destination.name.clone();
124 let _ = set_upstream_host_headers(session, &backend);
125 info!(backend_id = %destination_name, "Routing to backend");
126
127 rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
128
129 let mut request_state = RequestContext::new(
130 context.clone(),
131 backend,
132 execution_context_from_request(session),
133 );
134
135 let plugin_runner = request_state.plugin_runner.clone();
136
137 let run_filters = plugin_runner
138 .run_request_filters(session, &mut request_state)
139 .await;
140
141 let res = match run_filters {
142 Ok(filter_result) => filter_result,
143 Err(err) => {
144 error!(%err, "Error running request filters");
145 let _ = session.respond_error(500).await;
146 return Ok(true);
147 }
148 };
149
150 *ctx = Some(request_state);
151
152 match res {
153 MiddlewareResult::Continue(resp_headers) => {
154 ctx.as_mut().unwrap().response_headers = Some(resp_headers);
155
156 Ok(false)
157 }
158 MiddlewareResult::Responded => Ok(true),
159 }
160 }
161
162 async fn upstream_peer(
163 &self,
164 _session: &mut Session,
165 ctx: &mut Self::CTX,
166 ) -> Result<Box<HttpPeer>> {
167 if let Some(state) = ctx.as_ref() {
168 let (host, port, is_tls) = parse_origin(&state.backend.destination.url)
170 .map_err(|_| Error::new_str("Origin could not be parsed "))?;
171 let hostport = format!("{host}:{port}");
172
173 let path_and_query = _session
175 .req_header()
176 .uri
177 .path_and_query()
178 .map(|pq| pq.as_str())
179 .unwrap_or("/");
180 let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
181
182 info!(%upstream_url, backend_id = %state.backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
183 debug!(upstream_origin = %hostport, "Connecting to upstream origin");
184
185 let mut peer = HttpPeer::new(&hostport, is_tls, host);
186 if let Some(opts) = peer.get_mut_peer_options() {
187 opts.set_http_version(2, 1);
189 }
190 let peer = Box::new(peer);
191 Ok(peer)
192 } else {
193 Err(Error::new(ErrorType::InternalError))
194 }
195 }
196
197 async fn connected_to_upstream(
198 &self,
199 _session: &mut Session,
200 reused: bool,
201 peer: &HttpPeer,
202 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
203 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
204 _digest: Option<&Digest>,
205 ctx: &mut Self::CTX,
206 ) -> Result<()> {
207 let backend_id = ctx
208 .as_ref()
209 .map(|b| b.backend.destination.name.as_str())
210 .unwrap_or("<unknown>");
211 info!(backend_id, reused, peer = %peer, "Connected to upstream");
212 Ok(())
213 }
214
215 async fn response_filter(
216 &self,
217 session: &mut Session,
218 upstream_response: &mut ResponseHeader,
219 ctx: &mut Self::CTX,
220 ) -> Result<()> {
221 if let Some(state) = ctx.as_mut() {
222 if let Some(resp_headers) = state.response_headers.take() {
223 for (key, val) in resp_headers {
224 let _ = upstream_response.insert_header(key, val);
225 }
226 }
227
228 let runner = state.plugin_runner.clone();
229
230 runner
231 .run_response_filters(session, state, upstream_response)
232 .await;
233
234 if !state.cardinal_context.config.server.log_upstream_response {
235 return Ok(());
236 }
237
238 let status = upstream_response.status.as_u16();
239 let location = upstream_response
240 .headers
241 .get("location")
242 .and_then(|v| v.to_str().ok())
243 .map(|s| s.to_string());
244 let backend_id = &state.backend.destination.name;
245 if let Some(loc) = location {
246 info!(backend_id, status, location = %loc, "Upstream responded");
247 } else {
248 info!(backend_id, status, "Upstream responded");
249 }
250 }
251
252 Ok(())
253 }
254}