Skip to main content

brainwires_proxy/transport/
http.rs

1//! HTTP transport — listener (hyper server) and connector (hyper client).
2
3use crate::error::{ProxyError, ProxyResult};
4use crate::request_id::RequestId;
5use crate::transport::{InboundConnection, TransportConnector, TransportListener};
6use crate::types::{ProxyBody, ProxyRequest, ProxyResponse, TransportKind};
7
8use bytes::Bytes;
9use http::{StatusCode, Uri};
10use http_body_util::{BodyExt, Full};
11use hyper::body::Incoming;
12use hyper::service::service_fn;
13use hyper_util::rt::TokioIo;
14use hyper_util::server::conn::auto::Builder as ServerBuilder;
15use std::net::SocketAddr;
16use tokio::net::TcpListener;
17use tokio::sync::{mpsc, oneshot, watch};
18
19/// HTTP listener using hyper.
20pub struct HttpListener {
21    addr: SocketAddr,
22}
23
24impl HttpListener {
25    pub fn new(addr: SocketAddr) -> Self {
26        Self { addr }
27    }
28}
29
30#[async_trait::async_trait]
31impl TransportListener for HttpListener {
32    async fn listen(
33        &self,
34        tx: mpsc::Sender<InboundConnection>,
35        mut shutdown: watch::Receiver<bool>,
36    ) -> ProxyResult<()> {
37        let listener = TcpListener::bind(self.addr).await?;
38        tracing::info!(addr = %self.addr, "HTTP listener started");
39
40        loop {
41            tokio::select! {
42                accept = listener.accept() => {
43                    let (stream, peer) = accept?;
44                    let tx = tx.clone();
45                    let io = TokioIo::new(stream);
46
47                    tokio::spawn(async move {
48                        let service = service_fn(move |req: hyper::Request<Incoming>| {
49                            let tx = tx.clone();
50                            async move {
51                                handle_http_request(req, peer, tx).await
52                            }
53                        });
54
55                        if let Err(e) = ServerBuilder::new(hyper_util::rt::TokioExecutor::new())
56                            .serve_connection(io, service)
57                            .await
58                        {
59                            tracing::debug!(peer = %peer, error = %e, "connection error");
60                        }
61                    });
62                }
63                _ = shutdown.changed() => {
64                    tracing::info!("HTTP listener shutting down");
65                    break;
66                }
67            }
68        }
69
70        Ok(())
71    }
72
73    fn transport_name(&self) -> &str {
74        "http"
75    }
76}
77
78async fn handle_http_request(
79    req: hyper::Request<Incoming>,
80    _peer: SocketAddr,
81    tx: mpsc::Sender<InboundConnection>,
82) -> Result<hyper::Response<Full<Bytes>>, hyper::Error> {
83    let (parts, body) = req.into_parts();
84    let body_bytes = body
85        .collect()
86        .await
87        .map(|b| b.to_bytes())
88        .unwrap_or_default();
89
90    let proxy_req = ProxyRequest {
91        id: RequestId::new(),
92        method: parts.method,
93        uri: parts.uri,
94        headers: parts.headers,
95        body: ProxyBody::from(body_bytes),
96        transport: TransportKind::Http,
97        timestamp: chrono::Utc::now(),
98        extensions: crate::types::Extensions::new(),
99    };
100
101    let (resp_tx, resp_rx) = oneshot::channel();
102
103    if tx.send((proxy_req.clone(), resp_tx)).await.is_err() {
104        let mut resp = hyper::Response::new(Full::new(Bytes::from("Proxy unavailable")));
105        *resp.status_mut() = StatusCode::BAD_GATEWAY;
106        return Ok(resp);
107    }
108
109    match resp_rx.await {
110        Ok(proxy_resp) => Ok(proxy_response_to_hyper(proxy_resp)),
111        Err(_) => {
112            let mut resp = hyper::Response::new(Full::new(Bytes::from("Upstream timeout")));
113            *resp.status_mut() = StatusCode::GATEWAY_TIMEOUT;
114            Ok(resp)
115        }
116    }
117}
118
119fn proxy_response_to_hyper(resp: ProxyResponse) -> hyper::Response<Full<Bytes>> {
120    let mut builder = hyper::Response::builder().status(resp.status);
121    if let Some(headers) = builder.headers_mut() {
122        *headers = resp.headers;
123    }
124    builder
125        .body(Full::new(resp.body.into_bytes()))
126        .unwrap_or_else(|_| hyper::Response::new(Full::new(Bytes::from("Internal proxy error"))))
127}
128
129/// HTTP connector — forwards requests to an upstream URL using hyper client.
130pub struct HttpConnector {
131    upstream_url: url::Url,
132}
133
134impl HttpConnector {
135    pub fn new(upstream_url: url::Url) -> Self {
136        Self { upstream_url }
137    }
138}
139
140#[async_trait::async_trait]
141impl TransportConnector for HttpConnector {
142    async fn forward(&self, request: ProxyRequest) -> ProxyResult<ProxyResponse> {
143        use hyper_util::client::legacy::Client;
144        use hyper_util::rt::TokioExecutor;
145
146        let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
147
148        // Build upstream URI by combining upstream base URL with request path/query
149        let mut upstream_uri = self.upstream_url.clone();
150        upstream_uri.set_path(request.uri.path());
151        upstream_uri.set_query(request.uri.query());
152
153        let uri: Uri = upstream_uri
154            .as_str()
155            .parse()
156            .map_err(|e: http::uri::InvalidUri| ProxyError::Connection(e.to_string()))?;
157
158        let mut builder = hyper::Request::builder().method(request.method).uri(uri);
159
160        if let Some(headers) = builder.headers_mut() {
161            *headers = request.headers;
162        }
163
164        let hyper_req = builder
165            .body(Full::new(request.body.into_bytes()))
166            .map_err(|e| ProxyError::Connection(e.to_string()))?;
167
168        let hyper_resp = client
169            .request(hyper_req)
170            .await
171            .map_err(|e| ProxyError::UpstreamUnreachable(e.to_string()))?;
172
173        let status = hyper_resp.status();
174        let headers = hyper_resp.headers().clone();
175        let body_bytes = hyper_resp
176            .into_body()
177            .collect()
178            .await
179            .map(|b| b.to_bytes())
180            .map_err(|e| ProxyError::Transport(e.to_string()))?;
181
182        Ok(ProxyResponse {
183            id: request.id,
184            status,
185            headers,
186            body: ProxyBody::from(body_bytes),
187            timestamp: chrono::Utc::now(),
188            extensions: crate::types::Extensions::new(),
189        })
190    }
191
192    fn connector_name(&self) -> &str {
193        "http"
194    }
195}