brainwires_proxy/transport/
http.rs1use crate::error::{ProxyError, ProxyResult};
4use crate::request_id::RequestId;
5use crate::transport::{InboundConnection, TransportConnector, TransportListener};
6use crate::types::{ProxyBody, ProxyRequest, ProxyResponse, TransportKind};
7
8use bytes::Bytes;
9use http::{StatusCode, Uri};
10use http_body_util::{BodyExt, Full};
11use hyper::body::Incoming;
12use hyper::service::service_fn;
13use hyper_util::rt::TokioIo;
14use hyper_util::server::conn::auto::Builder as ServerBuilder;
15use std::net::SocketAddr;
16use tokio::net::TcpListener;
17use tokio::sync::{mpsc, oneshot, watch};
18
19pub struct HttpListener {
21 addr: SocketAddr,
22}
23
24impl HttpListener {
25 pub fn new(addr: SocketAddr) -> Self {
26 Self { addr }
27 }
28}
29
30#[async_trait::async_trait]
31impl TransportListener for HttpListener {
32 async fn listen(
33 &self,
34 tx: mpsc::Sender<InboundConnection>,
35 mut shutdown: watch::Receiver<bool>,
36 ) -> ProxyResult<()> {
37 let listener = TcpListener::bind(self.addr).await?;
38 tracing::info!(addr = %self.addr, "HTTP listener started");
39
40 loop {
41 tokio::select! {
42 accept = listener.accept() => {
43 let (stream, peer) = accept?;
44 let tx = tx.clone();
45 let io = TokioIo::new(stream);
46
47 tokio::spawn(async move {
48 let service = service_fn(move |req: hyper::Request<Incoming>| {
49 let tx = tx.clone();
50 async move {
51 handle_http_request(req, peer, tx).await
52 }
53 });
54
55 if let Err(e) = ServerBuilder::new(hyper_util::rt::TokioExecutor::new())
56 .serve_connection(io, service)
57 .await
58 {
59 tracing::debug!(peer = %peer, error = %e, "connection error");
60 }
61 });
62 }
63 _ = shutdown.changed() => {
64 tracing::info!("HTTP listener shutting down");
65 break;
66 }
67 }
68 }
69
70 Ok(())
71 }
72
73 fn transport_name(&self) -> &str {
74 "http"
75 }
76}
77
78async fn handle_http_request(
79 req: hyper::Request<Incoming>,
80 _peer: SocketAddr,
81 tx: mpsc::Sender<InboundConnection>,
82) -> Result<hyper::Response<Full<Bytes>>, hyper::Error> {
83 let (parts, body) = req.into_parts();
84 let body_bytes = body
85 .collect()
86 .await
87 .map(|b| b.to_bytes())
88 .unwrap_or_default();
89
90 let proxy_req = ProxyRequest {
91 id: RequestId::new(),
92 method: parts.method,
93 uri: parts.uri,
94 headers: parts.headers,
95 body: ProxyBody::from(body_bytes),
96 transport: TransportKind::Http,
97 timestamp: chrono::Utc::now(),
98 extensions: crate::types::Extensions::new(),
99 };
100
101 let (resp_tx, resp_rx) = oneshot::channel();
102
103 if tx.send((proxy_req.clone(), resp_tx)).await.is_err() {
104 let mut resp = hyper::Response::new(Full::new(Bytes::from("Proxy unavailable")));
105 *resp.status_mut() = StatusCode::BAD_GATEWAY;
106 return Ok(resp);
107 }
108
109 match resp_rx.await {
110 Ok(proxy_resp) => Ok(proxy_response_to_hyper(proxy_resp)),
111 Err(_) => {
112 let mut resp = hyper::Response::new(Full::new(Bytes::from("Upstream timeout")));
113 *resp.status_mut() = StatusCode::GATEWAY_TIMEOUT;
114 Ok(resp)
115 }
116 }
117}
118
119fn proxy_response_to_hyper(resp: ProxyResponse) -> hyper::Response<Full<Bytes>> {
120 let mut builder = hyper::Response::builder().status(resp.status);
121 if let Some(headers) = builder.headers_mut() {
122 *headers = resp.headers;
123 }
124 builder
125 .body(Full::new(resp.body.into_bytes()))
126 .unwrap_or_else(|_| hyper::Response::new(Full::new(Bytes::from("Internal proxy error"))))
127}
128
129pub struct HttpConnector {
131 upstream_url: url::Url,
132}
133
134impl HttpConnector {
135 pub fn new(upstream_url: url::Url) -> Self {
136 Self { upstream_url }
137 }
138}
139
140#[async_trait::async_trait]
141impl TransportConnector for HttpConnector {
142 async fn forward(&self, request: ProxyRequest) -> ProxyResult<ProxyResponse> {
143 use hyper_util::client::legacy::Client;
144 use hyper_util::rt::TokioExecutor;
145
146 let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
147
148 let mut upstream_uri = self.upstream_url.clone();
150 upstream_uri.set_path(request.uri.path());
151 upstream_uri.set_query(request.uri.query());
152
153 let uri: Uri = upstream_uri
154 .as_str()
155 .parse()
156 .map_err(|e: http::uri::InvalidUri| ProxyError::Connection(e.to_string()))?;
157
158 let mut builder = hyper::Request::builder().method(request.method).uri(uri);
159
160 if let Some(headers) = builder.headers_mut() {
161 *headers = request.headers;
162 }
163
164 let hyper_req = builder
165 .body(Full::new(request.body.into_bytes()))
166 .map_err(|e| ProxyError::Connection(e.to_string()))?;
167
168 let hyper_resp = client
169 .request(hyper_req)
170 .await
171 .map_err(|e| ProxyError::UpstreamUnreachable(e.to_string()))?;
172
173 let status = hyper_resp.status();
174 let headers = hyper_resp.headers().clone();
175 let body_bytes = hyper_resp
176 .into_body()
177 .collect()
178 .await
179 .map(|b| b.to_bytes())
180 .map_err(|e| ProxyError::Transport(e.to_string()))?;
181
182 Ok(ProxyResponse {
183 id: request.id,
184 status,
185 headers,
186 body: ProxyBody::from(body_bytes),
187 timestamp: chrono::Utc::now(),
188 extensions: crate::types::Extensions::new(),
189 })
190 }
191
192 fn connector_name(&self) -> &str {
193 "http"
194 }
195}