edc_dataplane_proxy/web/proxy/
public.rs1use std::str;
2
3use axum::http::{uri::InvalidUri, Uri};
4use edc_dataplane_core::core::model::transfer::types::HttpData;
5use edc_dataplane_core::core::model::transfer::{Transfer, TransferStatus};
6use futures::TryFutureExt;
7use pingora::http::RequestHeader;
8use pingora::{upstreams::peer::HttpPeer, Result};
9use pingora_proxy::{ProxyHttp, Session};
10use tracing::debug;
11
12use crate::model::edr::EdrEntry;
13use crate::{
14 web::state::Context,
15 {
16 model::edr::EdrClaims,
17 service::token::{TokenError, TokenManager},
18 },
19};
20
21const PUBLIC_PATH: &str = "/api/v1/public";
22
23pub struct PublicProxy<T: TokenManager + Clone> {
24 ctx: Context<T>,
25}
26
27impl<T: TokenManager + Clone> PublicProxy<T> {
28 pub fn new(ctx: Context<T>) -> Self {
29 Self { ctx }
30 }
31}
32
33#[derive(Default)]
34pub struct PublicCtx {
35 transfer: Option<TransferRequest>,
36}
37impl PublicCtx {
38 pub fn transfer(&self) -> Result<&TransferRequest> {
39 self.transfer.as_ref().ok_or_else(|| {
40 pingora::Error::new(pingora::ErrorType::Custom("Transfer not found in context"))
41 })
42 }
43}
44
45#[async_trait::async_trait]
46impl<T: TokenManager + Send + Sync + Clone + 'static> ProxyHttp for PublicProxy<T> {
47 type CTX = PublicCtx;
48 fn new_ctx(&self) -> Self::CTX {
49 PublicCtx::default()
50 }
51
52 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
53 if !self.can_handle(session) {
54 session.respond_error(404).await?;
55 return Ok(true);
56 }
57
58 match self.parse_upstream_request(session).await {
59 Ok(req) => self.handle_upstream_request(session, req, ctx).await,
60 Err(err) => {
61 debug!("Failed to handle proxy request error: {:#}", err);
62 session.respond_error(err.to_response_code()).await?;
63 Ok(true)
64 }
65 }
66 }
67
68 async fn upstream_peer(
69 &self,
70 _session: &mut Session,
71 ctx: &mut Self::CTX,
72 ) -> Result<Box<HttpPeer>> {
73 let host = ctx.transfer()?.upstream_host();
74 let tls = ctx.transfer()?.is_tls();
75 let port = ctx.transfer()?.upstream_port();
76
77 Ok(Box::new(HttpPeer::new(
78 (host.to_string(), port),
79 tls,
80 host.to_string(),
81 )))
82 }
83
84 async fn upstream_request_filter(
85 &self,
86 _session: &mut Session,
87 upstream_request: &mut pingora::http::RequestHeader,
88 ctx: &mut Self::CTX,
89 ) -> Result<()> {
90 upstream_request.remove_header("Authorization");
91 upstream_request
92 .insert_header("Host", ctx.transfer()?.upstream_host())
93 .unwrap();
94 Ok(())
95 }
96}
97
98impl<T: TokenManager + Send + Sync + Clone + 'static> PublicProxy<T> {
99 async fn parse_upstream_request(
100 &self,
101 session: &Session,
102 ) -> std::result::Result<TransferRequest, ProxyError> {
103 self.validate_token(session.req_header())
104 .and_then(|claims| self.fetch_edr(claims))
105 .and_then(|edr| self.fetch_transfer(edr))
106 .and_then(|transfer| self.parse_transfer(transfer))
107 .await
108 }
109
110 async fn handle_upstream_request(
111 &self,
112 session: &mut Session,
113 req: TransferRequest,
114 ctx: &mut PublicCtx,
115 ) -> Result<bool> {
116 let upstream_uri = req.to_upstream_uri(session);
117 session.req_header_mut().set_uri(upstream_uri);
118 ctx.transfer = Some(req);
119
120 Ok(false)
121 }
122
123 async fn validate_token(
124 &self,
125 req: &RequestHeader,
126 ) -> std::result::Result<EdrClaims, ProxyError> {
127 req.headers
128 .get("Authorization")
129 .ok_or(ProxyError::MissingToken)
130 .and_then(|token| str::from_utf8(token.as_bytes()).map_err(ProxyError::Utf8Error))
131 .and_then(|mut token| {
132 if token.starts_with("Bearer ") {
133 token = &token[7..];
134 }
135
136 self.ctx
137 .tokens()
138 .validate::<EdrClaims>(token)
139 .map_err(ProxyError::TokenError)
140 })
141 .map(|data| data.claims)
142 }
143
144 async fn fetch_transfer(&self, edr: EdrEntry) -> std::result::Result<Transfer, ProxyError> {
145 self.ctx
146 .transfers()
147 .get(&edr.transfer_id)
148 .await
149 .map_err(ProxyError::Generic)?
150 .filter(|transfer| transfer.status == TransferStatus::Started)
151 .ok_or_else(|| ProxyError::InvalidTransfer)
152 }
153
154 async fn fetch_edr(&self, claims: EdrClaims) -> std::result::Result<EdrEntry, ProxyError> {
155 self.ctx
156 .edrs()
157 .get_by_transfer_id(&claims.transfer_id)
158 .await
159 .map_err(ProxyError::Generic)?
160 .filter(|edr| edr.token_id == claims.jti.into())
161 .ok_or_else(|| ProxyError::InvalidTransfer)
162 }
163
164 async fn parse_transfer(
165 &self,
166 transfer: Transfer,
167 ) -> std::result::Result<TransferRequest, ProxyError> {
168 let data = HttpData::try_from(transfer.source.as_ref())?;
169
170 Ok(TransferRequest { data })
171 }
172
173 fn can_handle(&self, session: &Session) -> bool {
174 session.req_header().uri.path().starts_with(PUBLIC_PATH)
175 }
176}
177
178#[derive(thiserror::Error, Debug)]
179pub enum ProxyError {
180 #[error(transparent)]
181 TokenError(TokenError),
182 #[error("Missing Token")]
183 MissingToken,
184 #[error("Invalid Transfer")]
185 InvalidTransfer,
186 #[error(transparent)]
187 Utf8Error(str::Utf8Error),
188 #[error(transparent)]
189 Generic(#[from] anyhow::Error),
190 #[error(transparent)]
191 InvalidUri(#[from] InvalidUri),
192}
193
194impl ProxyError {
195 pub fn to_response_code(&self) -> u16 {
196 match self {
197 ProxyError::TokenError(_) => 403,
198 ProxyError::MissingToken => 403,
199 ProxyError::InvalidTransfer => 403,
200 ProxyError::Utf8Error(_) => 502,
201 ProxyError::Generic(_) => 502,
202 ProxyError::InvalidUri(_) => 502,
203 }
204 }
205}
206
207pub struct TransferRequest {
208 data: HttpData,
209}
210
211impl TransferRequest {
212 pub fn upstream_host(&self) -> &str {
213 self.data.base_url.host().unwrap()
214 }
215
216 pub fn is_tls(&self) -> bool {
217 self.data
218 .base_url
219 .scheme()
220 .map(|f| f.as_str() == "https")
221 .unwrap_or_default()
222 }
223
224 pub fn upstream_port(&self) -> u16 {
225 self.data
226 .base_url
227 .port_u16()
228 .unwrap_or_else(|| if self.is_tls() { 443 } else { 80 })
229 }
230
231 pub fn to_upstream_uri(&self, session: &Session) -> Uri {
232 let req_path = session.req_header().uri.path().replace(PUBLIC_PATH, "");
233
234 Uri::builder()
235 .path_and_query(&(self.data.base_url.path().to_string() + &req_path))
236 .build()
237 .unwrap()
238 }
239}