rama_http_backend/client/proxy/layer/proxy_connector/
service.rs1use crate::client::proxy::layer::HttpProxyError;
2
3use super::InnerHttpProxyConnector;
4use rama_core::{
5 error::{BoxError, ErrorExt, OpaqueError},
6 Context, Service,
7};
8use rama_http_types::headers::ProxyAuthorization;
9use rama_net::{
10 address::ProxyAddress,
11 client::{ConnectorService, EstablishedClientConnection},
12 stream::Stream,
13 transport::TryRefIntoTransportContext,
14 user::ProxyCredential,
15};
16use rama_utils::macros::define_inner_service_accessors;
17use std::fmt;
18
19#[cfg(feature = "tls")]
20use rama_net::tls::TlsTunnel;
21
22pub struct HttpProxyConnector<S> {
27 inner: S,
28 required: bool,
29}
30
31impl<S: fmt::Debug> fmt::Debug for HttpProxyConnector<S> {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 f.debug_struct("HttpProxyConnector")
34 .field("inner", &self.inner)
35 .field("required", &self.required)
36 .finish()
37 }
38}
39
40impl<S: Clone> Clone for HttpProxyConnector<S> {
41 fn clone(&self) -> Self {
42 Self {
43 inner: self.inner.clone(),
44 required: self.required,
45 }
46 }
47}
48
49impl<S> HttpProxyConnector<S> {
50 pub(super) fn new(inner: S, required: bool) -> Self {
52 Self { inner, required }
53 }
54
55 pub fn optional(inner: S) -> Self {
59 Self::new(inner, false)
60 }
61
62 pub fn required(inner: S) -> Self {
66 Self::new(inner, true)
67 }
68
69 define_inner_service_accessors!();
70}
71
72impl<S, State, Request> Service<State, Request> for HttpProxyConnector<S>
73where
74 S: ConnectorService<State, Request, Connection: Stream + Unpin, Error: Into<BoxError>>,
75 State: Clone + Send + Sync + 'static,
76 Request: TryRefIntoTransportContext<State, Error: Into<BoxError> + Send + Sync + 'static>
77 + Send
78 + 'static,
79{
80 type Response = EstablishedClientConnection<S::Connection, State, Request>;
81 type Error = BoxError;
82
83 async fn serve(
84 &self,
85 mut ctx: Context<State>,
86 req: Request,
87 ) -> Result<Self::Response, Self::Error> {
88 let address = ctx.get::<ProxyAddress>().cloned();
89
90 let transport_ctx = ctx
91 .get_or_try_insert_with_ctx(|ctx| req.try_ref_into_transport_ctx(ctx))
92 .map_err(|err| {
93 OpaqueError::from_boxed(err.into())
94 .context("http proxy connector: get transport context")
95 })?
96 .clone();
97
98 if let Some(address) = &address {
100 ctx.insert(address.clone());
101
102 #[cfg(feature = "tls")]
103 if address
104 .protocol
105 .as_ref()
106 .map(|p| p.is_secure())
107 .unwrap_or_default()
108 {
109 tracing::trace!(
110 authority = %transport_ctx.authority,
111 "http proxy connector: preparing proxy connection for tls tunnel"
112 );
113 ctx.insert(TlsTunnel {
114 server_host: address.authority.host().clone(),
115 });
116 }
117 }
118
119 let established_conn =
120 self.inner
121 .connect(ctx, req)
122 .await
123 .map_err(|err| match address.as_ref() {
124 Some(address) => OpaqueError::from_std(HttpProxyError::Transport(
125 OpaqueError::from_boxed(err.into())
126 .context(format!(
127 "establish connection to proxy {} (protocol: {:?})",
128 address.authority, address.protocol,
129 ))
130 .into_boxed(),
131 )),
132 None => {
133 OpaqueError::from_boxed(err.into()).context("establish connection target")
134 }
135 })?;
136
137 let address = match address {
139 Some(address) => address,
140 None => {
141 return if self.required {
142 Err("http proxy required but none is defined".into())
143 } else {
144 tracing::trace!("http proxy connector: no proxy required or set: proceed with direct connection");
145 Ok(established_conn)
146 };
147 }
148 };
149 let EstablishedClientConnection {
152 ctx,
153 req,
154 conn,
155 addr,
156 } = established_conn;
157
158 tracing::trace!(
159 authority = %transport_ctx.authority,
160 proxy_addr = %addr,
161 "http proxy connector: connected to proxy",
162 );
163
164 if !transport_ctx
165 .app_protocol
166 .map(|p| p.is_secure())
167 .unwrap_or_else(|| transport_ctx.authority.port() == 443)
169 {
170 return Ok(EstablishedClientConnection {
174 ctx,
175 req,
176 conn,
177 addr,
178 });
179 }
180
181 let mut connector = InnerHttpProxyConnector::new(transport_ctx.authority.clone());
182 if let Some(credential) = address.credential.clone() {
183 match credential {
184 ProxyCredential::Basic(basic) => {
185 connector.with_typed_header(ProxyAuthorization(basic));
186 }
187 ProxyCredential::Bearer(bearer) => {
188 connector.with_typed_header(ProxyAuthorization(bearer));
189 }
190 }
191 }
192
193 let conn = connector
194 .handshake(conn)
195 .await
196 .map_err(|err| OpaqueError::from_std(err).context("http proxy handshake"))?;
197
198 tracing::trace!(
199 authority = %transport_ctx.authority,
200 proxy_addr = %addr,
201 "http proxy connector: connected to proxy: ready secure request",
202 );
203 Ok(EstablishedClientConnection {
204 ctx,
205 req,
206 conn,
207 addr,
208 })
209 }
210}