1mod utils;
2
3use crate::utils::requests::{
4 compose_upstream_url, parse_origin, rewrite_request_path, set_upstream_host_headers,
5};
6use cardinal_base::context::CardinalContext;
7use cardinal_base::destinations::container::{DestinationContainer, DestinationWrapper};
8use cardinal_plugins::runner::{MiddlewareResult, PluginRunner};
9use pingora::http::ResponseHeader;
10use pingora::prelude::*;
11use pingora::protocols::Digest;
12use pingora::upstreams::peer::Peer;
13use std::sync::Arc;
14use tracing::{debug, error, info, warn};
15
16pub trait CardinalContextProvider: Send + Sync {
17 fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
18}
19
20#[derive(Clone)]
21pub struct StaticContextProvider {
22 context: Arc<CardinalContext>,
23}
24
25impl StaticContextProvider {
26 pub fn new(context: Arc<CardinalContext>) -> Self {
27 Self { context }
28 }
29}
30
31impl CardinalContextProvider for StaticContextProvider {
32 fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
33 Some(self.context.clone())
34 }
35}
36
37pub struct CardinalProxy {
38 provider: Arc<dyn CardinalContextProvider>,
39}
40
41impl CardinalProxy {
42 pub fn new(context: Arc<CardinalContext>) -> Self {
43 Self::builder(context).build()
44 }
45
46 pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
47 Self { provider }
48 }
49
50 pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
51 CardinalProxyBuilder::new(context)
52 }
53}
54
55pub struct CardinalProxyBuilder {
56 provider: Arc<dyn CardinalContextProvider>,
57}
58
59impl CardinalProxyBuilder {
60 pub fn new(context: Arc<CardinalContext>) -> Self {
61 Self {
62 provider: Arc::new(StaticContextProvider::new(context)),
63 }
64 }
65
66 pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
67 Self { provider }
68 }
69
70 pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
71 self.provider = provider;
72 self
73 }
74
75 pub fn build(self) -> CardinalProxy {
76 CardinalProxy::with_provider(self.provider)
77 }
78}
79
80#[async_trait::async_trait]
81impl ProxyHttp for CardinalProxy {
82 type CTX = Option<RequestContext>;
83
84 fn new_ctx(&self) -> Self::CTX {
85 None
86 }
87
88 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
89 let path = session.req_header().uri.path().to_string();
90 info!(%path, "Request received");
91
92 let context = match self.provider.resolve(session) {
93 Some(ctx) => ctx,
94 None => {
95 warn!(%path, "No context found for request host");
96 let _ = session.respond_error(421).await;
97 return Ok(true);
98 }
99 };
100
101 let destination_container = context
102 .get::<DestinationContainer>()
103 .await
104 .map_err(|_| Error::new_str("Destination Container is not present"))?;
105
106 let force_path = context.config.server.force_path_parameter;
107 let backend =
108 match destination_container.get_backend_for_request(session.req_header(), force_path) {
109 Some(b) => b.clone(),
110 None => {
111 warn!(%path, "No matching backend, returning 404");
112 let _ = session.respond_error(404).await;
113 return Ok(true);
114 }
115 };
116
117 let destination_name = backend.destination.name.clone();
118 let _ = set_upstream_host_headers(session, &backend);
119 info!(backend_id = %destination_name, "Routing to backend");
120
121 rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
122
123 let mut request_state = RequestContext::new(context.clone());
124 request_state.backend = Some(backend.clone());
125
126 let run_filters = request_state
127 .runner
128 .run_request_filters(session, backend)
129 .await;
130
131 let res = match run_filters {
132 Ok(filter_result) => filter_result,
133 Err(err) => {
134 error!(%err, "Error running request filters");
135 let _ = session.respond_error(500).await;
136 return Ok(true);
137 }
138 };
139
140 *ctx = Some(request_state);
141
142 match res {
143 MiddlewareResult::Continue => Ok(false),
144 MiddlewareResult::Responded => Ok(true),
145 }
146 }
147
148 async fn upstream_peer(
149 &self,
150 _session: &mut Session,
151 ctx: &mut Self::CTX,
152 ) -> Result<Box<HttpPeer>> {
153 if let Some(state) = ctx.as_ref() {
154 if let Some(backend) = state.backend.as_ref() {
155 let (host, port, is_tls) = parse_origin(&backend.destination.url)
157 .map_err(|_| Error::new_str("Origin could not be parsed "))?;
158 let hostport = format!("{}:{}", host, port);
159
160 let path_and_query = _session
162 .req_header()
163 .uri
164 .path_and_query()
165 .map(|pq| pq.as_str())
166 .unwrap_or("/");
167 let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
168
169 info!(%upstream_url, backend_id = %backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
170 debug!(upstream_origin = %hostport, "Connecting to upstream origin");
171
172 let mut peer = HttpPeer::new(&hostport, is_tls, host);
173 if let Some(opts) = peer.get_mut_peer_options() {
174 opts.set_http_version(2, 1);
176 }
177 let peer = Box::new(peer);
178 Ok(peer)
179 } else {
180 Err(Error::new(ErrorType::InternalError))
181 }
182 } else {
183 Err(Error::new(ErrorType::InternalError))
184 }
185 }
186
187 async fn connected_to_upstream(
188 &self,
189 _session: &mut Session,
190 reused: bool,
191 peer: &HttpPeer,
192 #[cfg(unix)] _fd: std::os::unix::io::RawFd,
193 #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
194 _digest: Option<&Digest>,
195 ctx: &mut Self::CTX,
196 ) -> Result<()> {
197 let backend_id = ctx
198 .as_ref()
199 .and_then(|state| state.backend.as_ref())
200 .map(|b| b.destination.name.as_str())
201 .unwrap_or("<unknown>");
202 info!(backend_id, reused, peer = %peer, "Connected to upstream");
203 Ok(())
204 }
205
206 async fn response_filter(
207 &self,
208 session: &mut Session,
209 upstream_response: &mut ResponseHeader,
210 ctx: &mut Self::CTX,
211 ) -> Result<()> {
212 if let Some(state) = ctx.as_mut() {
213 if let Some(backend) = state.backend.clone() {
214 state
215 .runner
216 .run_response_filters(session, backend, upstream_response)
217 .await;
218 }
219
220 if !state.context.config.server.log_upstream_response {
221 return Ok(());
222 }
223
224 let status = upstream_response.status.as_u16();
225 let location = upstream_response
226 .headers
227 .get("location")
228 .and_then(|v| v.to_str().ok())
229 .map(|s| s.to_string());
230 let backend_id = state
231 .backend
232 .as_ref()
233 .map(|b| b.destination.name.as_str())
234 .unwrap_or("<unknown>");
235 if let Some(loc) = location {
236 info!(backend_id, status, location = %loc, "Upstream responded");
237 } else {
238 info!(backend_id, status, "Upstream responded");
239 }
240 }
241
242 Ok(())
243 }
244}
245
246pub struct RequestContext {
247 pub context: Arc<CardinalContext>,
248 pub backend: Option<Arc<DestinationWrapper>>,
249 pub runner: PluginRunner,
250}
251
252impl RequestContext {
253 fn new(context: Arc<CardinalContext>) -> Self {
254 let runner = PluginRunner::new(context.clone());
255 Self {
256 context,
257 backend: None,
258 runner,
259 }
260 }
261}