cardinal_proxy/
lib.rs

1mod utils;
2
3use crate::utils::requests::{
4    compose_upstream_url, execution_context_from_request, parse_origin, rewrite_request_path,
5    set_upstream_host_headers,
6};
7use cardinal_base::context::CardinalContext;
8use cardinal_base::destinations::container::DestinationContainer;
9use cardinal_plugins::request_context::RequestContext;
10use cardinal_plugins::runner::MiddlewareResult;
11use pingora::http::ResponseHeader;
12use pingora::prelude::*;
13use pingora::protocols::Digest;
14use pingora::upstreams::peer::Peer;
15use std::sync::Arc;
16use tracing::{debug, error, info, warn};
17
18pub mod pingora {
19    pub use pingora::*;
20}
21
22pub trait CardinalContextProvider: Send + Sync {
23    fn resolve(&self, session: &Session) -> Option<Arc<CardinalContext>>;
24}
25
26#[derive(Clone)]
27pub struct StaticContextProvider {
28    context: Arc<CardinalContext>,
29}
30
31impl StaticContextProvider {
32    pub fn new(context: Arc<CardinalContext>) -> Self {
33        Self { context }
34    }
35}
36
37impl CardinalContextProvider for StaticContextProvider {
38    fn resolve(&self, _session: &Session) -> Option<Arc<CardinalContext>> {
39        Some(self.context.clone())
40    }
41}
42
43pub struct CardinalProxy {
44    provider: Arc<dyn CardinalContextProvider>,
45}
46
47impl CardinalProxy {
48    pub fn new(context: Arc<CardinalContext>) -> Self {
49        Self::builder(context).build()
50    }
51
52    pub fn with_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
53        Self { provider }
54    }
55
56    pub fn builder(context: Arc<CardinalContext>) -> CardinalProxyBuilder {
57        CardinalProxyBuilder::new(context)
58    }
59}
60
61pub struct CardinalProxyBuilder {
62    provider: Arc<dyn CardinalContextProvider>,
63}
64
65impl CardinalProxyBuilder {
66    pub fn new(context: Arc<CardinalContext>) -> Self {
67        Self {
68            provider: Arc::new(StaticContextProvider::new(context)),
69        }
70    }
71
72    pub fn from_context_provider(provider: Arc<dyn CardinalContextProvider>) -> Self {
73        Self { provider }
74    }
75
76    pub fn with_context_provider(mut self, provider: Arc<dyn CardinalContextProvider>) -> Self {
77        self.provider = provider;
78        self
79    }
80
81    pub fn build(self) -> CardinalProxy {
82        CardinalProxy::with_provider(self.provider)
83    }
84}
85
86#[async_trait::async_trait]
87impl ProxyHttp for CardinalProxy {
88    type CTX = Option<RequestContext>;
89
90    fn new_ctx(&self) -> Self::CTX {
91        None
92    }
93
94    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
95        let path = session.req_header().uri.path().to_string();
96        info!(%path, "Request received");
97
98        let context = match self.provider.resolve(session) {
99            Some(ctx) => ctx,
100            None => {
101                warn!(%path, "No context found for request host");
102                let _ = session.respond_error(421).await;
103                return Ok(true);
104            }
105        };
106
107        let destination_container = context
108            .get::<DestinationContainer>()
109            .await
110            .map_err(|_| Error::new_str("Destination Container is not present"))?;
111
112        let force_path = context.config.server.force_path_parameter;
113        let backend =
114            match destination_container.get_backend_for_request(session.req_header(), force_path) {
115                Some(b) => b.clone(),
116                None => {
117                    warn!(%path, "No matching backend, returning 404");
118                    let _ = session.respond_error(404).await;
119                    return Ok(true);
120                }
121            };
122
123        let destination_name = backend.destination.name.clone();
124        let _ = set_upstream_host_headers(session, &backend);
125        info!(backend_id = %destination_name, "Routing to backend");
126
127        rewrite_request_path(session.req_header_mut(), &destination_name, force_path);
128
129        let mut request_state = RequestContext::new(
130            context.clone(),
131            backend,
132            execution_context_from_request(session),
133        );
134
135        let plugin_runner = request_state.plugin_runner.clone();
136
137        let run_filters = plugin_runner
138            .run_request_filters(session, &mut request_state)
139            .await;
140
141        let res = match run_filters {
142            Ok(filter_result) => filter_result,
143            Err(err) => {
144                error!(%err, "Error running request filters");
145                let _ = session.respond_error(500).await;
146                return Ok(true);
147            }
148        };
149
150        *ctx = Some(request_state);
151
152        match res {
153            MiddlewareResult::Continue(resp_headers) => {
154                ctx.as_mut().unwrap().response_headers = Some(resp_headers);
155
156                Ok(false)
157            }
158            MiddlewareResult::Responded => Ok(true),
159        }
160    }
161
162    async fn upstream_peer(
163        &self,
164        _session: &mut Session,
165        ctx: &mut Self::CTX,
166    ) -> Result<Box<HttpPeer>> {
167        if let Some(state) = ctx.as_ref() {
168            // Determine origin parts for TLS and SNI
169            let (host, port, is_tls) = parse_origin(&state.backend.destination.url)
170                .map_err(|_| Error::new_str("Origin could not be parsed "))?;
171            let hostport = format!("{host}:{port}");
172
173            // Compose full upstream URL for logging with normalized scheme
174            let path_and_query = _session
175                .req_header()
176                .uri
177                .path_and_query()
178                .map(|pq| pq.as_str())
179                .unwrap_or("/");
180            let upstream_url = compose_upstream_url(is_tls, &host, port, path_and_query);
181
182            info!(%upstream_url, backend_id = %state.backend.destination.name, is_tls, sni = %host, "Forwarding to upstream");
183            debug!(upstream_origin = %hostport, "Connecting to upstream origin");
184
185            let mut peer = HttpPeer::new(&hostport, is_tls, host);
186            if let Some(opts) = peer.get_mut_peer_options() {
187                // Allow both HTTP/1.1 and HTTP/2 so plain HTTP backends keep working.
188                opts.set_http_version(2, 1);
189            }
190            let peer = Box::new(peer);
191            Ok(peer)
192        } else {
193            Err(Error::new(ErrorType::InternalError))
194        }
195    }
196
197    async fn connected_to_upstream(
198        &self,
199        _session: &mut Session,
200        reused: bool,
201        peer: &HttpPeer,
202        #[cfg(unix)] _fd: std::os::unix::io::RawFd,
203        #[cfg(windows)] _sock: std::os::windows::io::RawSocket,
204        _digest: Option<&Digest>,
205        ctx: &mut Self::CTX,
206    ) -> Result<()> {
207        let backend_id = ctx
208            .as_ref()
209            .map(|b| b.backend.destination.name.as_str())
210            .unwrap_or("<unknown>");
211        info!(backend_id, reused, peer = %peer, "Connected to upstream");
212        Ok(())
213    }
214
215    async fn response_filter(
216        &self,
217        session: &mut Session,
218        upstream_response: &mut ResponseHeader,
219        ctx: &mut Self::CTX,
220    ) -> Result<()> {
221        if let Some(state) = ctx.as_mut() {
222            if let Some(resp_headers) = state.response_headers.take() {
223                for (key, val) in resp_headers {
224                    let _ = upstream_response.insert_header(key, val);
225                }
226            }
227
228            let runner = state.plugin_runner.clone();
229
230            runner
231                .run_response_filters(session, state, upstream_response)
232                .await;
233
234            if !state.cardinal_context.config.server.log_upstream_response {
235                return Ok(());
236            }
237
238            let status = upstream_response.status.as_u16();
239            let location = upstream_response
240                .headers
241                .get("location")
242                .and_then(|v| v.to_str().ok())
243                .map(|s| s.to_string());
244            let backend_id = &state.backend.destination.name;
245            if let Some(loc) = location {
246                info!(backend_id, status, location = %loc, "Upstream responded");
247            } else {
248                info!(backend_id, status, "Upstream responded");
249            }
250        }
251
252        Ok(())
253    }
254}