qubit_http/request/http_request_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`super::http_request::HttpRequest`].
11
12use std::time::Duration;
13use std::{
14 future::Future,
15 pin::Pin,
16};
17
18use bytes::Bytes;
19use http::header::CONTENT_TYPE;
20use http::{
21 HeaderMap,
22 HeaderValue,
23 Method,
24};
25use serde::Serialize;
26use tokio_util::sync::CancellationToken;
27use url::form_urlencoded;
28use url::Url;
29
30use crate::{
31 AsyncHttpHeaderInjector,
32 HttpClient,
33 HttpError,
34 HttpHeaderInjector,
35 HttpRequestBodyByteStream,
36 HttpRequestStreamingBody,
37 HttpResult,
38 HttpRetryMethodPolicy,
39};
40
41use super::http_request::HttpRequest;
42use super::http_request_body::HttpRequestBody;
43use super::http_request_retry_override::HttpRequestRetryOverride;
44use super::parse_header;
45
46/// Builder for [`HttpRequest`](super::http_request::HttpRequest).
47#[derive(Debug, Clone)]
48pub struct HttpRequestBuilder {
49 /// HTTP method (e.g. GET, POST).
50 pub(super) method: Method,
51 /// Request path without the query string.
52 pub(super) path: String,
53 /// Query parameters as `(key, value)` pairs, appended to the URL when built.
54 pub(super) query: Vec<(String, String)>,
55 /// Request headers.
56 pub(super) headers: HeaderMap,
57 /// Request body; empty if not set.
58 pub(super) body: HttpRequestBody,
59 /// Deferred streaming upload body factory for per-attempt stream creation.
60 pub(super) streaming_body: Option<HttpRequestStreamingBody>,
61 /// Per-request timeout; if unset, the client default applies.
62 pub(super) request_timeout: Option<Duration>,
63 /// Per-request write timeout used by the send phase.
64 pub(super) write_timeout: Duration,
65 /// Per-request read timeout used by buffered/stream response reading.
66 pub(super) read_timeout: Duration,
67 /// Base URL copied from client options and used by [`HttpRequest::resolve_url`].
68 pub(super) base_url: Option<Url>,
69 /// Whether IPv6 literal hosts are rejected during URL resolution.
70 pub(super) ipv4_only: bool,
71 /// Optional cancellation token for this request.
72 pub(super) cancellation_token: Option<CancellationToken>,
73 /// Per-request retry override for one-off retry behavior customization.
74 pub(super) retry_override: HttpRequestRetryOverride,
75 /// Default headers snapshot from the originating client.
76 pub(super) default_headers: HeaderMap,
77 /// Sync header injectors snapshot from the originating client.
78 pub(super) injectors: Vec<HttpHeaderInjector>,
79 /// Async header injectors snapshot from the originating client.
80 pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
81}
82
83impl HttpRequestBuilder {
84 /// Starts a builder with method/path and copies supported defaults from client options.
85 ///
86 /// # Parameters
87 /// - `method`: HTTP verb.
88 /// - `path`: URL or relative path string.
89 /// - `client`: Source client whose relevant defaults are copied into this builder.
90 ///
91 /// # Returns
92 /// New [`HttpRequestBuilder`].
93 pub(crate) fn new(method: Method, path: &str, client: &HttpClient) -> Self {
94 let options = client.options();
95 Self {
96 method,
97 path: path.to_string(),
98 query: Vec::new(),
99 headers: HeaderMap::new(),
100 body: HttpRequestBody::Empty,
101 streaming_body: None,
102 request_timeout: options.timeouts.request_timeout,
103 write_timeout: options.timeouts.write_timeout,
104 read_timeout: options.timeouts.read_timeout,
105 base_url: options.base_url.clone(),
106 ipv4_only: options.ipv4_only,
107 cancellation_token: None,
108 retry_override: HttpRequestRetryOverride::default(),
109 default_headers: client.headers_snapshot(),
110 injectors: client.injectors_snapshot(),
111 async_injectors: client.async_injectors_snapshot(),
112 }
113 }
114
115 /// Appends a single `key=value` query pair (order preserved).
116 ///
117 /// # Parameters
118 /// - `key`: Query parameter name.
119 /// - `value`: Query parameter value.
120 ///
121 /// # Returns
122 /// `self` for chaining.
123 pub fn query_param(mut self, key: &str, value: &str) -> Self {
124 self.query.push((key.to_string(), value.to_string()));
125 self
126 }
127
128 /// Appends many query pairs via [`HttpRequestBuilder::query_param`].
129 ///
130 /// # Parameters
131 /// - `params`: Iterator of `(key, value)` pairs.
132 ///
133 /// # Returns
134 /// `self` for chaining.
135 pub fn query_params<'a, I>(mut self, params: I) -> Self
136 where
137 I: IntoIterator<Item = (&'a str, &'a str)>,
138 {
139 for (key, value) in params {
140 self = self.query_param(key, value);
141 }
142 self
143 }
144
145 /// Validates and inserts one header.
146 ///
147 /// # Parameters
148 /// - `name`: Header name (must be valid [`http::header::HeaderName`] bytes).
149 /// - `value`: Header value (must be valid [`http::header::HeaderValue`]).
150 ///
151 /// # Returns
152 /// `Ok(self)` or [`HttpError`] if name/value are invalid.
153 pub fn header(mut self, name: &str, value: &str) -> HttpResult<Self> {
154 let (header_name, header_value) = parse_header(name, value)?;
155 self.headers.insert(header_name, header_value);
156 Ok(self)
157 }
158
159 /// Merges all entries from `headers` into this builder (existing names may get extra values).
160 ///
161 /// # Parameters
162 /// - `headers`: Map to append.
163 ///
164 /// # Returns
165 /// `self` for chaining.
166 pub fn headers(mut self, headers: HeaderMap) -> Self {
167 self.headers.extend(headers);
168 self
169 }
170
171 /// Sets the body to raw bytes without changing `Content-Type` unless already set elsewhere.
172 ///
173 /// # Parameters
174 /// - `body`: Payload.
175 ///
176 /// # Returns
177 /// `self` for chaining.
178 pub fn bytes_body(mut self, body: impl Into<Bytes>) -> Self {
179 self.body = HttpRequestBody::Bytes(body.into());
180 self.streaming_body = None;
181 self
182 }
183
184 /// Sets the body to an ordered chunk stream for incremental upload.
185 ///
186 /// # Parameters
187 /// - `chunks`: Iterator of chunks in send order.
188 ///
189 /// # Returns
190 /// `self` for chaining.
191 pub fn stream_body<I, B>(mut self, chunks: I) -> Self
192 where
193 I: IntoIterator<Item = B>,
194 B: Into<Bytes>,
195 {
196 self.body = HttpRequestBody::Stream(chunks.into_iter().map(Into::into).collect());
197 self.streaming_body = None;
198 self
199 }
200
201 /// Sets a deferred streaming upload body factory.
202 ///
203 /// The factory runs once per send attempt and returns a fresh async byte
204 /// stream, which allows retries to rebuild the outbound stream body.
205 ///
206 /// # Parameters
207 /// - `factory`: Async stream factory for per-attempt body generation.
208 ///
209 /// # Returns
210 /// `self` for chaining.
211 pub fn streaming_body<F>(mut self, factory: F) -> Self
212 where
213 F: Fn() -> Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>
214 + Send
215 + Sync
216 + 'static,
217 {
218 self.streaming_body = Some(HttpRequestStreamingBody::new(factory));
219 self.body = HttpRequestBody::Empty;
220 self
221 }
222
223 /// Sets a UTF-8 text body and adds `text/plain; charset=utf-8` if `Content-Type` is absent.
224 ///
225 /// # Parameters
226 /// - `body`: Text payload.
227 ///
228 /// # Returns
229 /// `self` for chaining.
230 pub fn text_body(mut self, body: impl Into<String>) -> Self {
231 if !self.headers.contains_key(CONTENT_TYPE) {
232 self.headers.insert(
233 CONTENT_TYPE,
234 HeaderValue::from_static("text/plain; charset=utf-8"),
235 );
236 }
237 self.body = HttpRequestBody::Text(body.into());
238 self.streaming_body = None;
239 self
240 }
241
242 /// Serializes `value` to JSON, sets body to those bytes, and adds `application/json` if needed.
243 ///
244 /// # Parameters
245 /// - `value`: Serializable value.
246 ///
247 /// # Returns
248 /// `Ok(self)` or [`HttpError`] if JSON encoding fails.
249 pub fn json_body<T>(mut self, value: &T) -> HttpResult<Self>
250 where
251 T: Serialize,
252 {
253 let bytes = serde_json::to_vec(value)
254 .map_err(|error| HttpError::decode(format!("Failed to encode JSON body: {}", error)))?;
255 if !self.headers.contains_key(CONTENT_TYPE) {
256 self.headers
257 .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
258 }
259 self.body = HttpRequestBody::Json(Bytes::from(bytes));
260 self.streaming_body = None;
261 Ok(self)
262 }
263
264 /// Serializes key-value pairs as `application/x-www-form-urlencoded`.
265 ///
266 /// # Parameters
267 /// - `fields`: Iterable of `(key, value)` string pairs.
268 ///
269 /// # Returns
270 /// `self` for chaining.
271 pub fn form_body<'a, I>(mut self, fields: I) -> Self
272 where
273 I: IntoIterator<Item = (&'a str, &'a str)>,
274 {
275 let mut serializer = form_urlencoded::Serializer::new(String::new());
276 for (key, value) in fields {
277 serializer.append_pair(key, value);
278 }
279 let body = serializer.finish();
280 if !self.headers.contains_key(CONTENT_TYPE) {
281 self.headers.insert(
282 CONTENT_TYPE,
283 HeaderValue::from_static("application/x-www-form-urlencoded"),
284 );
285 }
286 self.body = HttpRequestBody::Form(Bytes::from(body));
287 self.streaming_body = None;
288 self
289 }
290
291 /// Sets multipart body bytes and optional auto content-type by boundary.
292 ///
293 /// # Parameters
294 /// - `body`: Multipart payload bytes.
295 /// - `boundary`: Multipart boundary used in payload framing.
296 ///
297 /// # Returns
298 /// `Ok(self)` for chaining.
299 ///
300 /// # Errors
301 /// Returns [`HttpError`] when `boundary` is empty or content-type cannot be built.
302 pub fn multipart_body(mut self, body: impl Into<Bytes>, boundary: &str) -> HttpResult<Self> {
303 if boundary.trim().is_empty() {
304 return Err(HttpError::other(
305 "Multipart boundary cannot be empty for multipart_body",
306 ));
307 }
308 if !self.headers.contains_key(CONTENT_TYPE) {
309 let value = HeaderValue::from_str(&format!("multipart/form-data; boundary={boundary}"))
310 .map_err(|error| {
311 HttpError::other(format!(
312 "Invalid multipart Content-Type header value: {error}"
313 ))
314 })?;
315 self.headers.insert(CONTENT_TYPE, value);
316 }
317 self.body = HttpRequestBody::Multipart(body.into());
318 self.streaming_body = None;
319 Ok(self)
320 }
321
322 /// Serializes records as NDJSON (`one JSON object per line`).
323 ///
324 /// # Parameters
325 /// - `records`: Serializable records to encode as NDJSON lines.
326 ///
327 /// # Returns
328 /// `Ok(self)` for chaining.
329 ///
330 /// # Errors
331 /// Returns [`HttpError`] when any record fails JSON serialization.
332 pub fn ndjson_body<T>(mut self, records: &[T]) -> HttpResult<Self>
333 where
334 T: Serialize,
335 {
336 let mut payload = String::new();
337 for record in records {
338 let line = serde_json::to_string(record).map_err(|error| {
339 HttpError::decode(format!("Failed to encode NDJSON record: {error}"))
340 })?;
341 payload.push_str(&line);
342 payload.push('\n');
343 }
344 if !self.headers.contains_key(CONTENT_TYPE) {
345 self.headers.insert(
346 CONTENT_TYPE,
347 HeaderValue::from_static("application/x-ndjson"),
348 );
349 }
350 self.body = HttpRequestBody::Ndjson(Bytes::from(payload));
351 self.streaming_body = None;
352 Ok(self)
353 }
354
355 /// Overrides the client-wide request timeout for this request only.
356 ///
357 /// This sets reqwest's per-request [`reqwest::RequestBuilder::timeout`], i.e. a
358 /// whole-request deadline for that HTTP call (see reqwest docs for exact semantics).
359 ///
360 /// # Parameters
361 /// - `timeout`: Maximum time for the whole request (reqwest `timeout`).
362 ///
363 /// # Returns
364 /// `self` for chaining.
365 pub fn request_timeout(mut self, timeout: Duration) -> Self {
366 self.request_timeout = Some(timeout);
367 self
368 }
369
370 /// Overrides the write-phase timeout for this request only.
371 ///
372 /// # Parameters
373 /// - `timeout`: Maximum time allowed for sending the request bytes.
374 ///
375 /// # Returns
376 /// `self` for chaining.
377 pub fn write_timeout(mut self, timeout: Duration) -> Self {
378 self.write_timeout = timeout;
379 self
380 }
381
382 /// Overrides the read-phase timeout for this request only.
383 ///
384 /// # Parameters
385 /// - `timeout`: Maximum time allowed for one read wait on response body.
386 ///
387 /// # Returns
388 /// `self` for chaining.
389 pub fn read_timeout(mut self, timeout: Duration) -> Self {
390 self.read_timeout = timeout;
391 self
392 }
393
394 /// Overrides the client default base URL for this request.
395 ///
396 /// # Parameters
397 /// - `base_url`: Base URL used when resolving relative request paths.
398 ///
399 /// # Returns
400 /// `self` for chaining.
401 pub fn base_url(mut self, base_url: Url) -> Self {
402 self.base_url = Some(base_url);
403 self
404 }
405
406 /// Clears the base URL for this request.
407 ///
408 /// # Returns
409 /// `self` for chaining.
410 pub fn clear_base_url(mut self) -> Self {
411 self.base_url = None;
412 self
413 }
414
415 /// Overrides whether this request enforces IPv4-only literal-host validation.
416 ///
417 /// # Parameters
418 /// - `enabled`: `true` to reject IPv6 literal hosts, `false` to allow them.
419 ///
420 /// # Returns
421 /// `self` for chaining.
422 pub fn ipv4_only(mut self, enabled: bool) -> Self {
423 self.ipv4_only = enabled;
424 self
425 }
426
427 /// Binds a [`CancellationToken`] to this request.
428 ///
429 /// # Parameters
430 /// - `token`: Cancellation token checked before send and during request/stream I/O.
431 ///
432 /// # Returns
433 /// `self` for chaining.
434 pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
435 self.cancellation_token = Some(token);
436 self
437 }
438
439 /// Forces retry enabled for this request even if client-level retry is disabled.
440 ///
441 /// # Returns
442 /// `self` for chaining.
443 pub fn force_retry(mut self) -> Self {
444 self.retry_override = self.retry_override.force_enable();
445 self
446 }
447
448 /// Disables retry for this request even if client-level retry is enabled.
449 ///
450 /// # Returns
451 /// `self` for chaining.
452 pub fn disable_retry(mut self) -> Self {
453 self.retry_override = self.retry_override.force_disable();
454 self
455 }
456
457 /// Overrides retryable-method policy for this request.
458 ///
459 /// # Parameters
460 /// - `policy`: Method policy to apply on this request only.
461 ///
462 /// # Returns
463 /// `self` for chaining.
464 pub fn retry_method_policy(mut self, policy: HttpRetryMethodPolicy) -> Self {
465 self.retry_override = self.retry_override.with_method_policy(policy);
466 self
467 }
468
469 /// Enables or disables honoring `Retry-After` for this request.
470 ///
471 /// # Parameters
472 /// - `enabled`: `true` to honor `Retry-After` on retryable status
473 /// responses (`429` and `5xx`).
474 ///
475 /// # Returns
476 /// `self` for chaining.
477 pub fn honor_retry_after(mut self, enabled: bool) -> Self {
478 self.retry_override = self.retry_override.with_honor_retry_after(enabled);
479 self
480 }
481
482 /// Consumes the builder into a frozen [`HttpRequest`].
483 ///
484 /// # Returns
485 /// Built [`HttpRequest`].
486 pub fn build(self) -> HttpRequest {
487 HttpRequest::new(self)
488 }
489}