cardinal_proxy/
lib.rs

1mod utils;
2
3use crate::utils::requests::{
4    compose_upstream_url, parse_origin, rewrite_request_path, set_upstream_host_headers,
5};
6use cardinal_base::context::CardinalContext;
7use cardinal_base::destinations::container::{DestinationContainer, DestinationWrapper};
8use cardinal_plugins::runner::{MiddlewareResult, PluginRunner};
9use pingora::http::ResponseHeader;
10use pingora::prelude::*;
11use pingora::protocols::Digest;
12use pingora::upstreams::peer::Peer;
13use std::sync::Arc;
14use tracing::{debug, error, info, warn};
15
16pub trait CardinalContextProvider: Send + Sync {
17    fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
18}
19
20#[derive(Clone)]
21pub struct StaticContextProvider {
22    context: Arc<CardinalContext>,
23}
24
25impl StaticContextProvider {
26    pub fn new(context: Arc<CardinalContext>) -> Self {
27        Self { context }
28    }
29}
30
31impl CardinalContextProvider for StaticContextProvider {
32    fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
33        Some(self.context.clone())
34    }
35}
36
37pub struct CardinalProxy {
38    provider: Arc<dyn CardinalContextProvider>,
39}
40
41impl CardinalProxy {
42    pub fn new(context: Arc<CardinalContext>) -> Self {
43        Self::builder(context).build()
44    }
45
46    pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
47        Self { provider }
48    }
49
50    pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
51        CardinalProxyBuilder::new(context)
52    }
53}
54
55pub struct CardinalProxyBuilder {
56    provider: Arc<dyn CardinalContextProvider>,
57}
58
59impl CardinalProxyBuilder {
60    pub fn new(context: Arc<CardinalContext>) -> Self {
61        Self {
62            provider: Arc::new(StaticContextProvider::new(context)),
63        }
64    }
65
66    pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
67        Self { provider }
68    }
69
70    pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
71        self.provider = provider;
72        self
73    }
74
75    pub fn build(self) -> CardinalProxy {
76        CardinalProxy::with_provider(self.provider)
77    }
78}
79
80#[async_trait::async_trait]
81impl ProxyHttp for CardinalProxy {
82    type CTX = Option<RequestContext>;
83
84    fn new_ctx(&self) -> Self::CTX {
85        None
86    }
87
88    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
89        let path = session.req_header().uri.path().to_string();
90        info!(%path, "Request received");
91
92        let context = match self.provider.resolve(session) {
93            Some(ctx) => ctx,
94            None => {
95                warn!(%path, "No context found for request host");
96                let _ = session.respond_error(421).await;
97                return Ok(true);
98            }
99        };
100
101        let destination_container = context
102            .get::<DestinationContainer>()
103            .await
104            .map_err(|_| Error::new_str("Destination Container is not present"))?;
105
106        let force_path = context.config.server.force_path_parameter;
107        let backend =
108            match destination_container.get_backend_for_request(session.req_header(), force_path) {
109                Some(b) => b.clone(),
110                None => {
111                    warn!(%path, "No matching backend, returning 404");
112                    let _ = session.respond_error(404).await;
113                    return Ok(true);
114                }
115            };
116
117        let destination_name = backend.destination.name.clone();
118        let _ = set_upstream_host_headers(session, &backend);
119        info!(backend_id = %destination_name, "Routing to backend");
120
121        rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
122
123        let mut request_state = RequestContext::new(context.clone());
124        request_state.backend = Some(backend.clone());
125
126        let run_filters = request_state
127            .runner
128            .run_request_filters(session, backend)
129            .await;
130
131        let res = match run_filters {
132            Ok(filter_result) => filter_result,
133            Err(err) => {
134                error!(%err, "Error running request filters");
135                let _ = session.respond_error(500).await;
136                return Ok(true);
137            }
138        };
139
140        *ctx = Some(request_state);
141
142        match res {
143            MiddlewareResult::Continue => Ok(false),
144            MiddlewareResult::Responded => Ok(true),
145        }
146    }
147
148    async fn upstream_peer(
149        &self,
150        _session: &mut Session,
151        ctx: &mut Self::CTX,
152    ) -> Result<Box<HttpPeer>> {
153        if let Some(state) = ctx.as_ref() {
154            if let Some(backend) = state.backend.as_ref() {
155                // Determine origin parts for TLS and SNI
156                let (host, port, is_tls) = parse_origin(&backend.destination.url)
157                    .map_err(|_| Error::new_str("Origin could not be parsed "))?;
158                let hostport = format!("{}:{}", host, port);
159
160                // Compose full upstream URL for logging with normalized scheme
161                let path_and_query = _session
162                    .req_header()
163                    .uri
164                    .path_and_query()
165                    .map(|pq| pq.as_str())
166                    .unwrap_or("/");
167                let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
168
169                info!(%upstream_url, backend_id = %backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
170                debug!(upstream_origin = %hostport, "Connecting to upstream origin");
171
172                let mut peer = HttpPeer::new(&hostport, is_tls, host);
173                if let Some(opts) = peer.get_mut_peer_options() {
174                    // Allow both HTTP/1.1 and HTTP/2 so plain HTTP backends keep working.
175                    opts.set_http_version(2, 1);
176                }
177                let peer = Box::new(peer);
178                Ok(peer)
179            } else {
180                Err(Error::new(ErrorType::InternalError))
181            }
182        } else {
183            Err(Error::new(ErrorType::InternalError))
184        }
185    }
186
187    async fn connected_to_upstream(
188        &self,
189        _session: &mut Session,
190        reused: bool,
191        peer: &HttpPeer,
192        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
193        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
194        _digest: Option<&Digest>,
195        ctx: &mut Self::CTX,
196    ) -> Result<()> {
197        let backend_id = ctx
198            .as_ref()
199            .and_then(|state| state.backend.as_ref())
200            .map(|b| b.destination.name.as_str())
201            .unwrap_or("<unknown>");
202        info!(backend_id, reused, peer = %peer, "Connected to upstream");
203        Ok(())
204    }
205
206    async fn response_filter(
207        &self,
208        session: &mut Session,
209        upstream_response: &mut ResponseHeader,
210        ctx: &mut Self::CTX,
211    ) -> Result<()> {
212        if let Some(state) = ctx.as_mut() {
213            if let Some(backend) = state.backend.clone() {
214                state
215                    .runner
216                    .run_response_filters(session, backend, upstream_response)
217                    .await;
218            }
219
220            if !state.context.config.server.log_upstream_response {
221                return Ok(());
222            }
223
224            let status = upstream_response.status.as_u16();
225            let location = upstream_response
226                .headers
227                .get("location")
228                .and_then(|v| v.to_str().ok())
229                .map(|s| s.to_string());
230            let backend_id = state
231                .backend
232                .as_ref()
233                .map(|b| b.destination.name.as_str())
234                .unwrap_or("<unknown>");
235            if let Some(loc) = location {
236                info!(backend_id, status, location = %loc, "Upstream responded");
237            } else {
238                info!(backend_id, status, "Upstream responded");
239            }
240        }
241
242        Ok(())
243    }
244}
245
246pub struct RequestContext {
247    pub context: Arc<CardinalContext>,
248    pub backend: Option<Arc<DestinationWrapper>>,
249    pub runner: PluginRunner,
250}
251
252impl RequestContext {
253    fn new(context: Arc<CardinalContext>) -> Self {
254        let runner = PluginRunner::new(context.clone());
255        Self {
256            context,
257            backend: None,
258            runner,
259        }
260    }
261}