edc_dataplane_proxy/web/proxy/
public.rs

1use std::str;
2
3use axum::http::{uri::InvalidUri, Uri};
4use edc_dataplane_core::core::model::transfer::types::HttpData;
5use edc_dataplane_core::core::model::transfer::{Transfer, TransferStatus};
6use futures::TryFutureExt;
7use pingora::http::RequestHeader;
8use pingora::{upstreams::peer::HttpPeer, Result};
9use pingora_proxy::{ProxyHttp, Session};
10use tracing::debug;
11
12use crate::model::edr::EdrEntry;
13use crate::{
14    web::state::Context,
15    {
16        model::edr::EdrClaims,
17        service::token::{TokenError, TokenManager},
18    },
19};
20
21const PUBLIC_PATH: &str = "/api/v1/public";
22
23pub struct PublicProxy<T: TokenManager + Clone> {
24    ctx: Context<T>,
25}
26
27impl<T: TokenManager + Clone> PublicProxy<T> {
28    pub fn new(ctx: Context<T>) -> Self {
29        Self { ctx }
30    }
31}
32
33#[derive(Default)]
34pub struct PublicCtx {
35    transfer: Option<TransferRequest>,
36}
37impl PublicCtx {
38    pub fn transfer(&self) -> Result<&TransferRequest> {
39        self.transfer.as_ref().ok_or_else(|| {
40            pingora::Error::new(pingora::ErrorType::Custom("Transfer not found in context"))
41        })
42    }
43}
44
45#[async_trait::async_trait]
46impl<T: TokenManager + Send + Sync + Clone + 'static> ProxyHttp for PublicProxy<T> {
47    type CTX = PublicCtx;
48    fn new_ctx(&self) -> Self::CTX {
49        PublicCtx::default()
50    }
51
52    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
53        if !self.can_handle(session) {
54            session.respond_error(404).await?;
55            return Ok(true);
56        }
57
58        match self.parse_upstream_request(session).await {
59            Ok(req) => self.handle_upstream_request(session, req, ctx).await,
60            Err(err) => {
61                debug!("Failed to handle proxy request error: {:#}", err);
62                session.respond_error(err.to_response_code()).await?;
63                Ok(true)
64            }
65        }
66    }
67
68    async fn upstream_peer(
69        &self,
70        _session: &mut Session,
71        ctx: &mut Self::CTX,
72    ) -> Result<Box<HttpPeer>> {
73        let host = ctx.transfer()?.upstream_host();
74        let tls = ctx.transfer()?.is_tls();
75        let port = ctx.transfer()?.upstream_port();
76
77        Ok(Box::new(HttpPeer::new(
78            (host.to_string(), port),
79            tls,
80            host.to_string(),
81        )))
82    }
83
84    async fn upstream_request_filter(
85        &self,
86        _session: &mut Session,
87        upstream_request: &mut pingora::http::RequestHeader,
88        ctx: &mut Self::CTX,
89    ) -> Result<()> {
90        upstream_request.remove_header("Authorization");
91        upstream_request
92            .insert_header("Host", ctx.transfer()?.upstream_host())
93            .unwrap();
94        Ok(())
95    }
96}
97
98impl<T: TokenManager + Send + Sync + Clone + 'static> PublicProxy<T> {
99    async fn parse_upstream_request(
100        &self,
101        session: &Session,
102    ) -> std::result::Result<TransferRequest, ProxyError> {
103        self.validate_token(session.req_header())
104            .and_then(|claims| self.fetch_edr(claims))
105            .and_then(|edr| self.fetch_transfer(edr))
106            .and_then(|transfer| self.parse_transfer(transfer))
107            .await
108    }
109
110    async fn handle_upstream_request(
111        &self,
112        session: &mut Session,
113        req: TransferRequest,
114        ctx: &mut PublicCtx,
115    ) -> Result<bool> {
116        let upstream_uri = req.to_upstream_uri(session);
117        session.req_header_mut().set_uri(upstream_uri);
118        ctx.transfer = Some(req);
119
120        Ok(false)
121    }
122
123    async fn validate_token(
124        &self,
125        req: &RequestHeader,
126    ) -> std::result::Result<EdrClaims, ProxyError> {
127        req.headers
128            .get("Authorization")
129            .ok_or(ProxyError::MissingToken)
130            .and_then(|token| str::from_utf8(token.as_bytes()).map_err(ProxyError::Utf8Error))
131            .and_then(|mut token| {
132                if token.starts_with("Bearer ") {
133                    token = &token[7..];
134                }
135
136                self.ctx
137                    .tokens()
138                    .validate::<EdrClaims>(token)
139                    .map_err(ProxyError::TokenError)
140            })
141            .map(|data| data.claims)
142    }
143
144    async fn fetch_transfer(&self, edr: EdrEntry) -> std::result::Result<Transfer, ProxyError> {
145        self.ctx
146            .transfers()
147            .get(&edr.transfer_id)
148            .await
149            .map_err(ProxyError::Generic)?
150            .filter(|transfer| transfer.status == TransferStatus::Started)
151            .ok_or_else(|| ProxyError::InvalidTransfer)
152    }
153
154    async fn fetch_edr(&self, claims: EdrClaims) -> std::result::Result<EdrEntry, ProxyError> {
155        self.ctx
156            .edrs()
157            .get_by_transfer_id(&claims.transfer_id)
158            .await
159            .map_err(ProxyError::Generic)?
160            .filter(|edr| edr.token_id == claims.jti.into())
161            .ok_or_else(|| ProxyError::InvalidTransfer)
162    }
163
164    async fn parse_transfer(
165        &self,
166        transfer: Transfer,
167    ) -> std::result::Result<TransferRequest, ProxyError> {
168        let data = HttpData::try_from(transfer.source.as_ref())?;
169
170        Ok(TransferRequest { data })
171    }
172
173    fn can_handle(&self, session: &Session) -> bool {
174        session.req_header().uri.path().starts_with(PUBLIC_PATH)
175    }
176}
177
178#[derive(thiserror::Error, Debug)]
179pub enum ProxyError {
180    #[error(transparent)]
181    TokenError(TokenError),
182    #[error("Missing Token")]
183    MissingToken,
184    #[error("Invalid Transfer")]
185    InvalidTransfer,
186    #[error(transparent)]
187    Utf8Error(str::Utf8Error),
188    #[error(transparent)]
189    Generic(#[from] anyhow::Error),
190    #[error(transparent)]
191    InvalidUri(#[from] InvalidUri),
192}
193
194impl ProxyError {
195    pub fn to_response_code(&self) -> u16 {
196        match self {
197            ProxyError::TokenError(_) => 403,
198            ProxyError::MissingToken => 403,
199            ProxyError::InvalidTransfer => 403,
200            ProxyError::Utf8Error(_) => 502,
201            ProxyError::Generic(_) => 502,
202            ProxyError::InvalidUri(_) => 502,
203        }
204    }
205}
206
207pub struct TransferRequest {
208    data: HttpData,
209}
210
211impl TransferRequest {
212    pub fn upstream_host(&self) -> &str {
213        self.data.base_url.host().unwrap()
214    }
215
216    pub fn is_tls(&self) -> bool {
217        self.data
218            .base_url
219            .scheme()
220            .map(|f| f.as_str() == "https")
221            .unwrap_or_default()
222    }
223
224    pub fn upstream_port(&self) -> u16 {
225        self.data
226            .base_url
227            .port_u16()
228            .unwrap_or_else(|| if self.is_tls() { 443 } else { 80 })
229    }
230
231    pub fn to_upstream_uri(&self, session: &Session) -> Uri {
232        let req_path = session.req_header().uri.path().replace(PUBLIC_PATH, "");
233
234        Uri::builder()
235            .path_and_query(&(self.data.base_url.path().to_string() + &req_path))
236            .build()
237            .unwrap()
238    }
239}