iroh_proxy_utils/downstream/
opts.rs1use std::{sync::Arc, time::Duration};
2
3use dynosaur::dynosaur;
4use http::{HeaderValue, Method, StatusCode, header::InvalidHeaderValue};
5use http_body_util::BodyExt;
6use iroh::EndpointId;
7use iroh_blobs::util::connection_pool;
8use n0_error::{AnyError, Result};
9
10use crate::{
11 downstream::{EndpointAuthority, HyperBody, SrcAddr},
12 parse::HttpRequest,
13};
14
15#[derive(Debug, Clone)]
20pub struct PoolOpts {
21 pub connect_timeout: Duration,
23 pub idle_timeout: Duration,
25}
26
27impl Default for PoolOpts {
28 fn default() -> Self {
29 Self {
30 connect_timeout: Duration::from_secs(10),
31 idle_timeout: Duration::from_secs(5),
32 }
33 }
34}
35
36impl From<PoolOpts> for connection_pool::Options {
37 fn from(opts: PoolOpts) -> Self {
38 connection_pool::Options {
39 connect_timeout: opts.connect_timeout,
40 idle_timeout: opts.idle_timeout,
41 ..Default::default()
42 }
43 }
44}
45
46#[derive(derive_more::Debug, Clone)]
48pub enum ProxyMode {
49 Tcp(EndpointAuthority),
55 Http(HttpProxyOpts),
60}
61
62#[derive(derive_more::Debug, Clone)]
66pub struct HttpProxyOpts {
67 #[debug("DynRequestHandler")]
68 pub(crate) request_handler: Arc<DynRequestHandler<'static>>,
69 #[debug("{:?}", response_writer.as_ref().map(|_| "DynWriteErrorResponse"))]
70 response_writer: Option<Arc<DynErrorResponder<'static>>>,
71}
72
73impl HttpProxyOpts {
74 pub fn new(request_handler: impl RequestHandler + 'static) -> Self {
76 Self {
77 request_handler: DynRequestHandler::new_arc(request_handler),
78 response_writer: None,
79 }
80 }
81
82 pub fn error_responder(mut self, writer: impl ErrorResponder + 'static) -> Self {
87 self.response_writer = Some(DynErrorResponder::new_arc(writer));
88 self
89 }
90
91 pub(crate) async fn error_response<'a>(
92 &'a self,
93 status: StatusCode,
94 ) -> hyper::Response<HyperBody> {
95 let response_writer: &DynErrorResponder = match self.response_writer.as_ref() {
96 Some(writer) => writer.as_ref(),
97 None => DynErrorResponder::from_ref(&DefaultResponseWriter),
98 };
99 response_writer.error_response(status).await
100 }
101}
102
103#[dynosaur(DynErrorResponder = dyn(box) ErrorResponder)]
104pub trait ErrorResponder: Send + Sync {
109 fn error_response<'a>(
111 &'a self,
112 status: StatusCode,
113 ) -> impl Future<Output = hyper::Response<HyperBody>> + Send + 'a;
114}
115
116pub(crate) struct DefaultResponseWriter;
117impl ErrorResponder for DefaultResponseWriter {
118 async fn error_response<'a>(&'a self, status: StatusCode) -> hyper::Response<HyperBody> {
119 let body = http_body_util::Empty::new().map_err(|_| unreachable!("infallible"));
120 let mut res = hyper::Response::builder().status(status);
121 res.headers_mut().unwrap().insert(
122 http::header::CONTENT_LENGTH,
123 HeaderValue::from_str("0").unwrap(),
124 );
125 res.body(body.boxed()).unwrap()
126 }
127}
128
129#[dynosaur(DynRequestHandler = dyn(box) RequestHandler)]
130pub trait RequestHandler: Send + Sync {
136 fn handle_request(
141 &self,
142 src_addr: SrcAddr,
143 req: &mut HttpRequest,
144 ) -> impl Future<Output = Result<EndpointId, Deny>> + Send;
145}
146
147pub struct StaticForwardProxy(pub EndpointId);
155
156impl RequestHandler for StaticForwardProxy {
157 async fn handle_request(
158 &self,
159 src_addr: SrcAddr,
160 req: &mut HttpRequest,
161 ) -> Result<EndpointId, Deny> {
162 if req.method == Method::CONNECT {
163 if req.uri.authority().is_none()
164 || req.uri.scheme().is_some()
165 || req.uri.path_and_query().is_some()
166 {
167 return Err(Deny::bad_request(
168 "invalid request target for CONNECT request",
169 ));
170 }
171 } else {
172 if req.uri.authority().is_none() || req.uri.scheme().is_none() {
173 return Err(Deny::bad_request("missing absolute-form request target"));
174 }
175 }
176 req.set_forwarded_for_if_tcp(src_addr)
177 .set_via("iroh-proxy")?;
178 Ok(self.0)
179 }
180}
181
182pub struct StaticReverseProxy(pub EndpointAuthority);
191
192impl RequestHandler for StaticReverseProxy {
193 async fn handle_request(
194 &self,
195 src_addr: SrcAddr,
196 req: &mut HttpRequest,
197 ) -> Result<EndpointId, Deny> {
198 if req.method == Method::CONNECT {
199 return Err(Deny::new(
200 StatusCode::BAD_REQUEST,
201 "CONNECT requests are not supported",
202 ));
203 }
204 if req.version < http::Version::HTTP_2 && req.uri.scheme().is_some() {
205 return Err(Deny::new(
206 StatusCode::BAD_REQUEST,
207 "Absolute-form request targets are not supported",
208 ));
209 }
210 req.set_forwarded_for_if_tcp(src_addr)
211 .set_via("iroh-proxy")?
212 .set_absolute_http_authority(self.0.authority.clone())
213 .map_err(|err| Deny::new(StatusCode::INTERNAL_SERVER_ERROR, err))?;
214 Ok(self.0.endpoint_id)
215 }
216}
217
218#[derive(Default)]
231pub struct RequestHandlerChain(Vec<Box<DynRequestHandler<'static>>>);
232
233impl RequestHandlerChain {
234 pub fn push(mut self, handler: impl RequestHandler + 'static) -> Self {
236 self.0.push(DynRequestHandler::new_box(handler));
237 self
238 }
239}
240
241impl RequestHandler for RequestHandlerChain {
242 async fn handle_request(
243 &self,
244 src_addr: SrcAddr,
245 req: &mut HttpRequest,
246 ) -> Result<EndpointId, Deny> {
247 let mut last_err = None;
248 for handler in self.0.iter() {
249 match handler.handle_request(src_addr.clone(), req).await {
250 Ok(destination) => return Ok(destination),
251 Err(err) => {
252 last_err = Some(err);
253 }
254 }
255 }
256 Err(last_err.expect("err is set"))
257 }
258}
259
260pub struct Deny {
265 pub reason: AnyError,
267 pub code: StatusCode,
269}
270
271impl From<AnyError> for Deny {
272 fn from(value: AnyError) -> Self {
273 Self::bad_request(value)
274 }
275}
276
277impl Deny {
278 pub fn bad_request(reason: impl Into<AnyError>) -> Self {
280 Self::new(StatusCode::BAD_REQUEST, reason)
281 }
282
283 pub fn new(code: StatusCode, reason: impl Into<AnyError>) -> Self {
285 Self {
286 code,
287 reason: reason.into(),
288 }
289 }
290}
291
292impl From<InvalidHeaderValue> for Deny {
293 fn from(_value: InvalidHeaderValue) -> Self {
294 Self::new(StatusCode::BAD_REQUEST, "invalid header value")
295 }
296}