sentinel_proxy/http_helpers.rs
1//! HTTP request and response helpers for Sentinel proxy
2//!
3//! This module provides utilities for:
4//! - Extracting request information from Pingora sessions
5//! - Writing HTTP responses to Pingora sessions
6//! - Trace ID extraction from headers
7//!
8//! These helpers reduce boilerplate in the main proxy logic and ensure
9//! consistent handling of HTTP operations.
10
11use bytes::Bytes;
12use http::Response;
13use http_body_util::{BodyExt, Full};
14use pingora::http::ResponseHeader;
15use pingora::prelude::*;
16use pingora::proxy::Session;
17use std::collections::HashMap;
18
19use crate::routing::RequestInfo;
20use crate::trace_id::{generate_for_format, TraceIdFormat};
21
22// ============================================================================
23// Request Helpers
24// ============================================================================
25
26/// Owned request information for external use (non-hot-path)
27///
28/// This struct owns its data and is used when lifetime management of
29/// `RequestInfo<'a>` is impractical (e.g., storing beyond request scope).
30#[derive(Debug, Clone)]
31pub struct OwnedRequestInfo {
32 pub method: String,
33 pub path: String,
34 pub host: String,
35 pub headers: HashMap<String, String>,
36 pub query_params: HashMap<String, String>,
37}
38
39/// Extract request info from a Pingora session
40///
41/// Builds an `OwnedRequestInfo` struct from the session's request headers.
42/// This function allocates all fields.
43///
44/// For the hot path, use `RequestInfo::new()` with
45/// `with_headers()`/`with_query_params()` only when needed.
46///
47/// # Example
48///
49/// ```ignore
50/// let request_info = extract_request_info(session);
51/// ```
52pub fn extract_request_info(session: &Session) -> OwnedRequestInfo {
53 let req_header = session.req_header();
54
55 let headers = RequestInfo::build_headers(req_header.headers.iter());
56 let host = headers.get("host").cloned().unwrap_or_default();
57 let path = req_header.uri.path().to_string();
58 let method = req_header.method.as_str().to_string();
59
60 OwnedRequestInfo {
61 method,
62 path: path.clone(),
63 host,
64 headers,
65 query_params: RequestInfo::parse_query_params(&path),
66 }
67}
68
69/// Extract or generate a trace ID from request headers
70///
71/// Looks for existing trace ID headers in order of preference:
72/// 1. `X-Trace-Id`
73/// 2. `X-Correlation-Id`
74/// 3. `X-Request-Id`
75///
76/// If none are found, generates a new TinyFlake trace ID (11 chars).
77/// See [`crate::trace_id`] module for TinyFlake format details.
78///
79/// # Example
80///
81/// ```ignore
82/// let trace_id = get_or_create_trace_id(session, TraceIdFormat::TinyFlake);
83/// tracing::info!(trace_id = %trace_id, "Processing request");
84/// ```
85pub fn get_or_create_trace_id(session: &Session, format: TraceIdFormat) -> String {
86 let req_header = session.req_header();
87
88 // Check for existing trace ID headers (in order of preference)
89 const TRACE_HEADERS: [&str; 3] = ["x-trace-id", "x-correlation-id", "x-request-id"];
90
91 for header_name in &TRACE_HEADERS {
92 if let Some(value) = req_header.headers.get(*header_name) {
93 if let Ok(id) = value.to_str() {
94 if !id.is_empty() {
95 return id.to_string();
96 }
97 }
98 }
99 }
100
101 // Generate new trace ID using configured format
102 generate_for_format(format)
103}
104
105/// Extract or generate a trace ID (convenience function using TinyFlake default)
106///
107/// This is a convenience wrapper around [`get_or_create_trace_id`] that uses
108/// the default TinyFlake format.
109#[inline]
110pub fn get_or_create_trace_id_default(session: &Session) -> String {
111 get_or_create_trace_id(session, TraceIdFormat::default())
112}
113
114// ============================================================================
115// Response Helpers
116// ============================================================================
117
118/// Write an HTTP response to a Pingora session
119///
120/// Handles the conversion from `http::Response<Full<Bytes>>` to Pingora's
121/// format and writes it to the session.
122///
123/// # Arguments
124///
125/// * `session` - The Pingora session to write to
126/// * `response` - The HTTP response to write
127/// * `keepalive_secs` - Keepalive timeout in seconds (None = disable keepalive)
128///
129/// # Returns
130///
131/// Returns `Ok(())` on success or an error if writing fails.
132///
133/// # Example
134///
135/// ```ignore
136/// let response = Response::builder()
137/// .status(200)
138/// .body(Full::new(Bytes::from("OK")))?;
139/// write_response(session, response, Some(60)).await?;
140/// ```
141pub async fn write_response(
142 session: &mut Session,
143 response: Response<Full<Bytes>>,
144 keepalive_secs: Option<u64>,
145) -> Result<(), Box<Error>> {
146 let status = response.status().as_u16();
147
148 // Collect headers to owned strings to avoid lifetime issues
149 let headers_owned: Vec<(String, String)> = response
150 .headers()
151 .iter()
152 .map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or("").to_string()))
153 .collect();
154
155 // Extract body bytes
156 let full_body = response.into_body();
157 let body_bytes: Bytes = BodyExt::collect(full_body)
158 .await
159 .map(|collected| collected.to_bytes())
160 .unwrap_or_default();
161
162 // Build Pingora response header
163 let mut resp_header = ResponseHeader::build(status, None)?;
164 for (key, value) in headers_owned {
165 resp_header.insert_header(key, &value)?;
166 }
167
168 // Write response to session
169 session.set_keepalive(keepalive_secs);
170 session
171 .write_response_header(Box::new(resp_header), false)
172 .await?;
173 session.write_response_body(Some(body_bytes), true).await?;
174
175 Ok(())
176}
177
178/// Write an error response to a Pingora session
179///
180/// Convenience wrapper for error responses with status code, body, and content type.
181///
182/// # Arguments
183///
184/// * `session` - The Pingora session to write to
185/// * `status` - HTTP status code
186/// * `body` - Response body as string
187/// * `content_type` - Content-Type header value
188pub async fn write_error(
189 session: &mut Session,
190 status: u16,
191 body: &str,
192 content_type: &str,
193) -> Result<(), Box<Error>> {
194 let mut resp_header = ResponseHeader::build(status, None)?;
195 resp_header.insert_header("Content-Type", content_type)?;
196 resp_header.insert_header("Content-Length", body.len().to_string())?;
197
198 session.set_keepalive(None);
199 session
200 .write_response_header(Box::new(resp_header), false)
201 .await?;
202 session
203 .write_response_body(Some(Bytes::copy_from_slice(body.as_bytes())), true)
204 .await?;
205
206 Ok(())
207}
208
209/// Write a plain text error response
210///
211/// Shorthand for `write_error` with `text/plain; charset=utf-8` content type.
212pub async fn write_text_error(
213 session: &mut Session,
214 status: u16,
215 message: &str,
216) -> Result<(), Box<Error>> {
217 write_error(session, status, message, "text/plain; charset=utf-8").await
218}
219
220/// Write a JSON error response
221///
222/// Creates a JSON object with `error` and optional `message` fields.
223///
224/// # Example
225///
226/// ```ignore
227/// // Produces: {"error":"not_found","message":"Resource does not exist"}
228/// write_json_error(session, 404, "not_found", Some("Resource does not exist")).await?;
229/// ```
230pub async fn write_json_error(
231 session: &mut Session,
232 status: u16,
233 error: &str,
234 message: Option<&str>,
235) -> Result<(), Box<Error>> {
236 let body = match message {
237 Some(msg) => format!(r#"{{"error":"{}","message":"{}"}}"#, error, msg),
238 None => format!(r#"{{"error":"{}"}}"#, error),
239 };
240 write_error(session, status, &body, "application/json").await
241}
242
243/// Write a rate limit error response with standard rate limit headers
244///
245/// Includes the following headers:
246/// - `X-RateLimit-Limit`: Maximum requests per window
247/// - `X-RateLimit-Remaining`: Remaining requests in current window
248/// - `X-RateLimit-Reset`: Unix timestamp when the window resets
249/// - `Retry-After`: Seconds until the client should retry (for 429 responses)
250///
251/// # Arguments
252///
253/// * `session` - The Pingora session to write to
254/// * `status` - HTTP status code (typically 429)
255/// * `body` - Response body as string
256/// * `limit` - Maximum requests allowed per window
257/// * `remaining` - Remaining requests in current window
258/// * `reset_at` - Unix timestamp when the window resets
259/// * `retry_after` - Seconds until client should retry
260pub async fn write_rate_limit_error(
261 session: &mut Session,
262 status: u16,
263 body: &str,
264 limit: u32,
265 remaining: u32,
266 reset_at: u64,
267 retry_after: u64,
268) -> Result<(), Box<Error>> {
269 let mut resp_header = ResponseHeader::build(status, None)?;
270 resp_header.insert_header("Content-Type", "text/plain; charset=utf-8")?;
271 resp_header.insert_header("Content-Length", body.len().to_string())?;
272
273 // Add standard rate limit headers
274 resp_header.insert_header("X-RateLimit-Limit", limit.to_string())?;
275 resp_header.insert_header("X-RateLimit-Remaining", remaining.to_string())?;
276 resp_header.insert_header("X-RateLimit-Reset", reset_at.to_string())?;
277
278 // Add Retry-After header (seconds until reset)
279 if retry_after > 0 {
280 resp_header.insert_header("Retry-After", retry_after.to_string())?;
281 }
282
283 session.set_keepalive(None);
284 session
285 .write_response_header(Box::new(resp_header), false)
286 .await?;
287 session
288 .write_response_body(Some(Bytes::copy_from_slice(body.as_bytes())), true)
289 .await?;
290
291 Ok(())
292}
293
294// ============================================================================
295// Tests
296// ============================================================================
297
298#[cfg(test)]
299mod tests {
300 // Trace ID generation tests are in crate::trace_id module.
301 // Integration tests for get_or_create_trace_id require mocking Pingora session.
302 // See crates/proxy/tests/ for integration test examples.
303}