rama_http_backend/client/proxy/layer/proxy_connector/
service.rs

1use crate::client::proxy::layer::HttpProxyError;
2
3use super::InnerHttpProxyConnector;
4use rama_core::{
5    error::{BoxError, ErrorExt, OpaqueError},
6    Context, Service,
7};
8use rama_http_types::headers::ProxyAuthorization;
9use rama_net::{
10    address::ProxyAddress,
11    client::{ConnectorService, EstablishedClientConnection},
12    stream::Stream,
13    transport::TryRefIntoTransportContext,
14    user::ProxyCredential,
15};
16use rama_utils::macros::define_inner_service_accessors;
17use std::fmt;
18
19#[cfg(feature = "tls")]
20use rama_net::tls::TlsTunnel;
21
22/// A connector which can be used to establish a connection over an HTTP Proxy.
23///
24/// This behaviour is optional and only triggered in case there
25/// is a [`ProxyAddress`] found in the [`Context`].
26pub struct HttpProxyConnector<S> {
27    inner: S,
28    required: bool,
29}
30
31impl<S: fmt::Debug> fmt::Debug for HttpProxyConnector<S> {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        f.debug_struct("HttpProxyConnector")
34            .field("inner", &self.inner)
35            .field("required", &self.required)
36            .finish()
37    }
38}
39
40impl<S: Clone> Clone for HttpProxyConnector<S> {
41    fn clone(&self) -> Self {
42        Self {
43            inner: self.inner.clone(),
44            required: self.required,
45        }
46    }
47}
48
49impl<S> HttpProxyConnector<S> {
50    /// Creates a new [`HttpProxyConnector`].
51    pub(super) fn new(inner: S, required: bool) -> Self {
52        Self { inner, required }
53    }
54
55    /// Create a new [`HttpProxyConnector`]
56    /// which will only connect via an http proxy in case the [`ProxyAddress`] is available
57    /// in the [`Context`].
58    pub fn optional(inner: S) -> Self {
59        Self::new(inner, false)
60    }
61
62    /// Create a new [`HttpProxyConnector`]
63    /// which will always connect via an http proxy, but fail in case the [`ProxyAddress`] is
64    /// not available in the [`Context`].
65    pub fn required(inner: S) -> Self {
66        Self::new(inner, true)
67    }
68
69    define_inner_service_accessors!();
70}
71
72impl<S, State, Request> Service<State, Request> for HttpProxyConnector<S>
73where
74    S: ConnectorService<State, Request, Connection: Stream + Unpin, Error: Into<BoxError>>,
75    State: Clone + Send + Sync + 'static,
76    Request: TryRefIntoTransportContext<State, Error: Into<BoxError> + Send + Sync + 'static>
77        + Send
78        + 'static,
79{
80    type Response = EstablishedClientConnection<S::Connection, State, Request>;
81    type Error = BoxError;
82
83    async fn serve(
84        &self,
85        mut ctx: Context<State>,
86        req: Request,
87    ) -> Result<Self::Response, Self::Error> {
88        let address = ctx.get::<ProxyAddress>().cloned();
89
90        let transport_ctx = ctx
91            .get_or_try_insert_with_ctx(|ctx| req.try_ref_into_transport_ctx(ctx))
92            .map_err(|err| {
93                OpaqueError::from_boxed(err.into())
94                    .context("http proxy connector: get transport context")
95            })?
96            .clone();
97
98        // in case the provider gave us a proxy info, we insert it into the context
99        if let Some(address) = &address {
100            ctx.insert(address.clone());
101
102            #[cfg(feature = "tls")]
103            if address
104                .protocol
105                .as_ref()
106                .map(|p| p.is_secure())
107                .unwrap_or_default()
108            {
109                tracing::trace!(
110                    authority = %transport_ctx.authority,
111                    "http proxy connector: preparing proxy connection for tls tunnel"
112                );
113                ctx.insert(TlsTunnel {
114                    server_host: address.authority.host().clone(),
115                });
116            }
117        }
118
119        let established_conn =
120            self.inner
121                .connect(ctx, req)
122                .await
123                .map_err(|err| match address.as_ref() {
124                    Some(address) => OpaqueError::from_std(HttpProxyError::Transport(
125                        OpaqueError::from_boxed(err.into())
126                            .context(format!(
127                                "establish connection to proxy {} (protocol: {:?})",
128                                address.authority, address.protocol,
129                            ))
130                            .into_boxed(),
131                    )),
132                    None => {
133                        OpaqueError::from_boxed(err.into()).context("establish connection target")
134                    }
135                })?;
136
137        // return early in case we did not use a proxy
138        let address = match address {
139            Some(address) => address,
140            None => {
141                return if self.required {
142                    Err("http proxy required but none is defined".into())
143                } else {
144                    tracing::trace!("http proxy connector: no proxy required or set: proceed with direct connection");
145                    Ok(established_conn)
146                };
147            }
148        };
149        // and do the handshake otherwise...
150
151        let EstablishedClientConnection {
152            ctx,
153            req,
154            conn,
155            addr,
156        } = established_conn;
157
158        tracing::trace!(
159            authority = %transport_ctx.authority,
160            proxy_addr = %addr,
161            "http proxy connector: connected to proxy",
162        );
163
164        if !transport_ctx
165            .app_protocol
166            .map(|p| p.is_secure())
167            // TODO: re-evaluate this fallback at some point... seems pretty flawed to me
168            .unwrap_or_else(|| transport_ctx.authority.port() == 443)
169        {
170            // unless the scheme is not secure, in such a case no handshake is required...
171            // we do however need to add authorization headers if credentials are present
172            // => for this the user has to use another middleware as we do not have access to that here
173            return Ok(EstablishedClientConnection {
174                ctx,
175                req,
176                conn,
177                addr,
178            });
179        }
180
181        let mut connector = InnerHttpProxyConnector::new(transport_ctx.authority.clone());
182        if let Some(credential) = address.credential.clone() {
183            match credential {
184                ProxyCredential::Basic(basic) => {
185                    connector.with_typed_header(ProxyAuthorization(basic));
186                }
187                ProxyCredential::Bearer(bearer) => {
188                    connector.with_typed_header(ProxyAuthorization(bearer));
189                }
190            }
191        }
192
193        let conn = connector
194            .handshake(conn)
195            .await
196            .map_err(|err| OpaqueError::from_std(err).context("http proxy handshake"))?;
197
198        tracing::trace!(
199            authority = %transport_ctx.authority,
200            proxy_addr = %addr,
201            "http proxy connector: connected to proxy: ready secure request",
202        );
203        Ok(EstablishedClientConnection {
204            ctx,
205            req,
206            conn,
207            addr,
208        })
209    }
210}