pub use hyper::Uri;
pub const API_KEY_HEADER: &str = "X-API-Key";
pub type HttpClient = hyper_util::client::legacy::Client<
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
http_body_util::Empty<bytes::Bytes>,
>;
pub type HttpClientWithBody = hyper_util::client::legacy::Client<
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
http_body_util::Full<bytes::Bytes>,
>;
pub const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
fn build_client_inner<B>() -> hyper_util::client::legacy::Client<
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
B,
>
where
B: hyper::body::Body + Send + 'static,
B::Data: Send,
{
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.build();
Client::builder(TokioExecutor::new()).build(https)
}
#[must_use]
pub fn build_client() -> HttpClient {
build_client_inner::<http_body_util::Empty<bytes::Bytes>>()
}
#[must_use]
pub fn build_client_with_body() -> HttpClientWithBody {
build_client_inner::<http_body_util::Full<bytes::Bytes>>()
}
pub fn redact_endpoint(uri: &Uri) -> String {
let scheme = uri.scheme_str().unwrap_or("http");
let host = uri.host().unwrap_or("?");
let path_and_query = uri.path_and_query().map_or("/", |p| p.as_str());
if let Some(port) = uri.port_u16() {
format!("{scheme}://{host}:{port}{path_and_query}")
} else {
format!("{scheme}://{host}{path_and_query}")
}
}
#[must_use]
pub fn redact_endpoint_str(raw: &str) -> String {
let Some((scheme, rest)) = raw.split_once("://") else {
return raw.to_string();
};
let first_slash = rest.find('/').unwrap_or(rest.len());
let authority = &rest[..first_slash];
let tail = &rest[first_slash..];
let host_port = authority.rsplit_once('@').map_or(authority, |(_, h)| h);
format!("{scheme}://{host_port}{tail}")
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum FetchError {
#[error("failed to build HTTP request")]
RequestBuild(#[source] hyper::http::Error),
#[error("HTTP transport error")]
Transport(#[source] hyper_util::client::legacy::Error),
#[error("body read failed: {0}")]
BodyRead(String),
#[error("endpoint returned HTTP {0}")]
HttpStatus(u16),
#[error("request timed out")]
Timeout,
}
pub async fn fetch_get(
client: &HttpClient,
uri: &Uri,
user_agent: &str,
timeout: std::time::Duration,
auth: Option<&crate::ingest::auth_header::AuthHeader>,
) -> Result<bytes::Bytes, FetchError> {
use http_body_util::{BodyExt, Empty, Limited};
let mut builder = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(uri.clone())
.header(hyper::header::USER_AGENT, user_agent);
if let Some(auth) = auth {
builder = builder.header(&auth.name, &auth.value);
}
let req = builder
.body(Empty::<bytes::Bytes>::new())
.map_err(FetchError::RequestBuild)?;
let response = tokio::time::timeout(timeout, client.request(req))
.await
.map_err(|_| FetchError::Timeout)?
.map_err(FetchError::Transport)?;
if !response.status().is_success() {
return Err(FetchError::HttpStatus(response.status().as_u16()));
}
let limited = Limited::new(response.into_body(), MAX_BODY_BYTES);
let collected = limited
.collect()
.await
.map_err(|e| FetchError::BodyRead(format!("{e}")))?;
Ok(collected.to_bytes())
}
pub async fn fetch_with_body(
client: &HttpClientWithBody,
method: hyper::Method,
uri: &Uri,
user_agent: &str,
timeout: std::time::Duration,
api_key: Option<&str>,
body: bytes::Bytes,
) -> Result<(hyper::StatusCode, bytes::Bytes), FetchError> {
use http_body_util::{BodyExt, Full, Limited};
let mut builder = hyper::Request::builder()
.method(method)
.uri(uri.clone())
.header(hyper::header::USER_AGENT, user_agent)
.header(hyper::header::CONTENT_TYPE, "application/json");
if let Some(key) = api_key {
let mut value = hyper::header::HeaderValue::from_str(key)
.map_err(|e| FetchError::RequestBuild(e.into()))?;
value.set_sensitive(true);
builder = builder.header(API_KEY_HEADER, value);
}
let req = builder
.body(Full::new(body))
.map_err(FetchError::RequestBuild)?;
let response = tokio::time::timeout(timeout, client.request(req))
.await
.map_err(|_| FetchError::Timeout)?
.map_err(FetchError::Transport)?;
let status = response.status();
let limited = Limited::new(response.into_body(), MAX_BODY_BYTES);
let collected = limited
.collect()
.await
.map_err(|e| FetchError::BodyRead(format!("{e}")))?;
Ok((status, collected.to_bytes()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_client_constructs_without_panic() {
let _client: HttpClient = build_client();
}
#[test]
fn redact_endpoint_strips_credentials_with_default_port() {
let uri: Uri = "http://user:pass@example.com/metrics".parse().unwrap();
assert_eq!(redact_endpoint(&uri), "http://example.com/metrics");
}
#[test]
fn redact_endpoint_preserves_explicit_port() {
let uri: Uri = "http://metrics.local:9090/metrics".parse().unwrap();
assert_eq!(redact_endpoint(&uri), "http://metrics.local:9090/metrics");
}
#[test]
fn redact_endpoint_preserves_https_scheme() {
let uri: Uri = "https://api.electricitymap.org/v3/carbon-intensity/latest?zone=FR"
.parse()
.unwrap();
let redacted = redact_endpoint(&uri);
assert!(redacted.starts_with("https://api.electricitymap.org"));
assert!(redacted.contains("zone=FR"));
}
#[test]
fn redact_endpoint_strips_credentials_with_explicit_port() {
let uri: Uri = "http://admin:secret@localhost:8080/scrape".parse().unwrap();
let redacted = redact_endpoint(&uri);
assert_eq!(redacted, "http://localhost:8080/scrape");
assert!(!redacted.contains("admin"));
assert!(!redacted.contains("secret"));
}
#[test]
fn redact_endpoint_handles_root_path() {
let uri: Uri = "http://host/".parse().unwrap();
assert_eq!(redact_endpoint(&uri), "http://host/");
}
#[tokio::test]
async fn build_client_can_perform_plain_http_round_trip() {
use http_body_util::{BodyExt, Empty, Limited};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}/");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0u8; 1024];
let _ = socket.read(&mut buf).await;
let response = "HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: 5\r\n\
Connection: close\r\n\
\r\n\
hello";
let _ = socket.write_all(response.as_bytes()).await;
let _ = socket.shutdown().await;
});
let client = build_client();
let uri: Uri = endpoint.parse().unwrap();
let req = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(&uri)
.header(hyper::header::USER_AGENT, "perf-sentinel-test")
.body(Empty::<bytes::Bytes>::new())
.unwrap();
let resp = client
.request(req)
.await
.expect("round-trip should succeed");
assert_eq!(resp.status().as_u16(), 200);
let body = Limited::new(resp.into_body(), MAX_BODY_BYTES)
.collect()
.await
.unwrap()
.to_bytes();
assert_eq!(&body[..], b"hello");
server.await.unwrap();
}
#[tokio::test]
async fn fetch_get_attaches_auth_header() {
use crate::ingest::auth_header::AuthHeader;
let response = b"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
Content-Length: 2\r\n\
Connection: close\r\n\
\r\n\
ok"
.to_vec();
let (endpoint, mut rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_client();
let uri: Uri = format!("{endpoint}/").parse().expect("uri");
let auth = AuthHeader::parse("Authorization: Bearer topsecret").expect("valid");
let bytes = fetch_get(
&client,
&uri,
"perf-sentinel-test",
std::time::Duration::from_secs(5),
Some(&auth),
)
.await
.expect("fetch_get must succeed");
assert_eq!(&bytes[..], b"ok");
let captured = rx.recv().await.expect("captured request");
let text = std::str::from_utf8(&captured).expect("utf8");
assert!(
text.contains("authorization: Bearer topsecret")
|| text.contains("Authorization: Bearer topsecret"),
"auth header missing from request, got:\n{text}"
);
server.await.expect("server join");
}
#[test]
fn build_client_with_body_constructs_without_panic() {
let _client: HttpClientWithBody = build_client_with_body();
}
#[tokio::test]
async fn fetch_with_body_returns_status_and_body_on_201() {
let response = crate::test_helpers::http_status(201, "Created");
let (endpoint, _rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_client_with_body();
let uri: Uri = format!("{endpoint}/api/findings/sig/ack").parse().unwrap();
let (status, body) = fetch_with_body(
&client,
hyper::Method::POST,
&uri,
"perf-sentinel-test",
std::time::Duration::from_secs(5),
None,
bytes::Bytes::from_static(b"{}"),
)
.await
.expect("call must succeed");
assert_eq!(status.as_u16(), 201);
assert!(body.is_empty());
server.await.unwrap();
}
#[tokio::test]
async fn fetch_with_body_surfaces_409_without_erroring() {
let response = crate::test_helpers::http_status(409, "Conflict");
let (endpoint, _rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_client_with_body();
let uri: Uri = format!("{endpoint}/api/findings/sig/ack").parse().unwrap();
let (status, _) = fetch_with_body(
&client,
hyper::Method::POST,
&uri,
"perf-sentinel-test",
std::time::Duration::from_secs(5),
None,
bytes::Bytes::from_static(b"{}"),
)
.await
.expect("non-2xx must not produce an error");
assert_eq!(status.as_u16(), 409);
server.await.unwrap();
}
#[tokio::test]
async fn fetch_with_body_attaches_x_api_key_header() {
let response = crate::test_helpers::http_status(204, "No Content");
let (endpoint, mut rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_client_with_body();
let uri: Uri = format!("{endpoint}/api/findings/sig/ack").parse().unwrap();
let (status, _) = fetch_with_body(
&client,
hyper::Method::DELETE,
&uri,
"perf-sentinel-test",
std::time::Duration::from_secs(5),
Some("secret123"),
bytes::Bytes::new(),
)
.await
.expect("call must succeed");
assert_eq!(status.as_u16(), 204);
let captured = rx.recv().await.expect("captured request");
let text = std::str::from_utf8(&captured).unwrap();
assert!(
text.contains("x-api-key: secret123") || text.contains("X-API-Key: secret123"),
"X-API-Key header missing, got:\n{text}"
);
server.await.unwrap();
}
#[tokio::test]
async fn fetch_with_body_sends_content_type_json() {
let response = crate::test_helpers::http_status(201, "Created");
let (endpoint, mut rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_client_with_body();
let uri: Uri = format!("{endpoint}/api/findings/sig/ack").parse().unwrap();
let _ = fetch_with_body(
&client,
hyper::Method::POST,
&uri,
"perf-sentinel-test",
std::time::Duration::from_secs(5),
None,
bytes::Bytes::from_static(br#"{"reason":"x"}"#),
)
.await
.expect("call must succeed");
let captured = rx.recv().await.expect("captured request");
let text = std::str::from_utf8(&captured).unwrap();
assert!(
text.to_ascii_lowercase()
.contains("content-type: application/json"),
"Content-Type header missing, got:\n{text}"
);
server.await.unwrap();
}
#[tokio::test]
async fn fetch_with_body_sends_request_body() {
let response = crate::test_helpers::http_status(201, "Created");
let (endpoint, mut rx, server) = crate::test_helpers::spawn_capture_server(response).await;
let client = build_client_with_body();
let uri: Uri = format!("{endpoint}/api/findings/sig/ack").parse().unwrap();
let payload = br#"{"by":"alice","reason":"deferred"}"#;
let _ = fetch_with_body(
&client,
hyper::Method::POST,
&uri,
"perf-sentinel-test",
std::time::Duration::from_secs(5),
None,
bytes::Bytes::from_static(payload),
)
.await
.expect("call must succeed");
let captured = rx.recv().await.expect("captured request");
let text = std::str::from_utf8(&captured).unwrap();
assert!(
text.contains(r#"{"by":"alice","reason":"deferred"}"#),
"request body missing, got:\n{text}"
);
server.await.unwrap();
}
#[test]
fn redact_endpoint_str_strips_userinfo() {
assert_eq!(
redact_endpoint_str("http://admin:secret@localhost:8080/scrape"),
"http://localhost:8080/scrape"
);
}
#[test]
fn redact_endpoint_str_returns_input_when_no_scheme() {
assert_eq!(
redact_endpoint_str("user:pass@example.com/foo"),
"user:pass@example.com/foo"
);
}
#[test]
fn redact_endpoint_str_keeps_at_in_path() {
assert_eq!(redact_endpoint_str("http://host/u@v"), "http://host/u@v");
}
#[test]
fn redact_endpoint_str_with_multiple_at_in_userinfo() {
assert_eq!(
redact_endpoint_str("http://a@b:c@host/path"),
"http://host/path"
);
}
#[test]
fn redact_endpoint_str_handles_empty_input() {
assert_eq!(redact_endpoint_str(""), "");
}
#[test]
fn redact_endpoint_str_strips_only_userinfo_when_path_also_has_at() {
assert_eq!(
redact_endpoint_str("http://user:pass@host:8080/x@y"),
"http://host:8080/x@y"
);
}
}