Skip to main content

qubit_http/request/
http_request_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`super::http_request::HttpRequest`].
11
12use std::time::Duration;
13use std::{
14    future::Future,
15    pin::Pin,
16};
17
18use bytes::Bytes;
19use http::header::CONTENT_TYPE;
20use http::{
21    HeaderMap,
22    HeaderValue,
23    Method,
24};
25use serde::Serialize;
26use tokio_util::sync::CancellationToken;
27use url::form_urlencoded;
28use url::Url;
29
30use crate::{
31    AsyncHttpHeaderInjector,
32    HttpClient,
33    HttpError,
34    HttpHeaderInjector,
35    HttpRequestBodyByteStream,
36    HttpRequestStreamingBody,
37    HttpResult,
38    HttpRetryMethodPolicy,
39};
40
41use super::http_request::HttpRequest;
42use super::http_request_body::HttpRequestBody;
43use super::http_request_retry_override::HttpRequestRetryOverride;
44use super::parse_header;
45
46/// Builder for [`HttpRequest`](super::http_request::HttpRequest).
47#[derive(Debug, Clone)]
48pub struct HttpRequestBuilder {
49    /// HTTP method (e.g. GET, POST).
50    pub(super) method: Method,
51    /// Request path without the query string.
52    pub(super) path: String,
53    /// Query parameters as `(key, value)` pairs, appended to the URL when built.
54    pub(super) query: Vec<(String, String)>,
55    /// Request headers.
56    pub(super) headers: HeaderMap,
57    /// Request body; empty if not set.
58    pub(super) body: HttpRequestBody,
59    /// Deferred streaming upload body factory for per-attempt stream creation.
60    pub(super) streaming_body: Option<HttpRequestStreamingBody>,
61    /// Per-request timeout; if unset, the client default applies.
62    pub(super) request_timeout: Option<Duration>,
63    /// Per-request write timeout used by the send phase.
64    pub(super) write_timeout: Duration,
65    /// Per-request read timeout used by buffered/stream response reading.
66    pub(super) read_timeout: Duration,
67    /// Base URL copied from client options and used by [`HttpRequest::resolve_url`].
68    pub(super) base_url: Option<Url>,
69    /// Whether IPv6 literal hosts are rejected during URL resolution.
70    pub(super) ipv4_only: bool,
71    /// Optional cancellation token for this request.
72    pub(super) cancellation_token: Option<CancellationToken>,
73    /// Per-request retry override for one-off retry behavior customization.
74    pub(super) retry_override: HttpRequestRetryOverride,
75    /// Default headers snapshot from the originating client.
76    pub(super) default_headers: HeaderMap,
77    /// Sync header injectors snapshot from the originating client.
78    pub(super) injectors: Vec<HttpHeaderInjector>,
79    /// Async header injectors snapshot from the originating client.
80    pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
81}
82
83impl HttpRequestBuilder {
84    /// Starts a builder with method/path and copies supported defaults from client options.
85    ///
86    /// # Parameters
87    /// - `method`: HTTP verb.
88    /// - `path`: URL or relative path string.
89    /// - `client`: Source client whose relevant defaults are copied into this builder.
90    ///
91    /// # Returns
92    /// New [`HttpRequestBuilder`].
93    pub(crate) fn new(method: Method, path: &str, client: &HttpClient) -> Self {
94        let options = client.options();
95        Self {
96            method,
97            path: path.to_string(),
98            query: Vec::new(),
99            headers: HeaderMap::new(),
100            body: HttpRequestBody::Empty,
101            streaming_body: None,
102            request_timeout: options.timeouts.request_timeout,
103            write_timeout: options.timeouts.write_timeout,
104            read_timeout: options.timeouts.read_timeout,
105            base_url: options.base_url.clone(),
106            ipv4_only: options.ipv4_only,
107            cancellation_token: None,
108            retry_override: HttpRequestRetryOverride::default(),
109            default_headers: client.headers_snapshot(),
110            injectors: client.injectors_snapshot(),
111            async_injectors: client.async_injectors_snapshot(),
112        }
113    }
114
115    /// Appends a single `key=value` query pair (order preserved).
116    ///
117    /// # Parameters
118    /// - `key`: Query parameter name.
119    /// - `value`: Query parameter value.
120    ///
121    /// # Returns
122    /// `self` for chaining.
123    pub fn query_param(mut self, key: &str, value: &str) -> Self {
124        self.query.push((key.to_string(), value.to_string()));
125        self
126    }
127
128    /// Appends many query pairs via [`HttpRequestBuilder::query_param`].
129    ///
130    /// # Parameters
131    /// - `params`: Iterator of `(key, value)` pairs.
132    ///
133    /// # Returns
134    /// `self` for chaining.
135    pub fn query_params<'a, I>(mut self, params: I) -> Self
136    where
137        I: IntoIterator<Item = (&'a str, &'a str)>,
138    {
139        for (key, value) in params {
140            self = self.query_param(key, value);
141        }
142        self
143    }
144
145    /// Validates and inserts one header.
146    ///
147    /// # Parameters
148    /// - `name`: Header name (must be valid [`http::header::HeaderName`] bytes).
149    /// - `value`: Header value (must be valid [`http::header::HeaderValue`]).
150    ///
151    /// # Returns
152    /// `Ok(self)` or [`HttpError`] if name/value are invalid.
153    pub fn header(mut self, name: &str, value: &str) -> HttpResult<Self> {
154        let (header_name, header_value) = parse_header(name, value)?;
155        self.headers.insert(header_name, header_value);
156        Ok(self)
157    }
158
159    /// Merges all entries from `headers` into this builder (existing names may get extra values).
160    ///
161    /// # Parameters
162    /// - `headers`: Map to append.
163    ///
164    /// # Returns
165    /// `self` for chaining.
166    pub fn headers(mut self, headers: HeaderMap) -> Self {
167        self.headers.extend(headers);
168        self
169    }
170
171    /// Sets the body to raw bytes without changing `Content-Type` unless already set elsewhere.
172    ///
173    /// # Parameters
174    /// - `body`: Payload.
175    ///
176    /// # Returns
177    /// `self` for chaining.
178    pub fn bytes_body(mut self, body: impl Into<Bytes>) -> Self {
179        self.body = HttpRequestBody::Bytes(body.into());
180        self.streaming_body = None;
181        self
182    }
183
184    /// Sets the body to an ordered chunk stream for incremental upload.
185    ///
186    /// # Parameters
187    /// - `chunks`: Iterator of chunks in send order.
188    ///
189    /// # Returns
190    /// `self` for chaining.
191    pub fn stream_body<I, B>(mut self, chunks: I) -> Self
192    where
193        I: IntoIterator<Item = B>,
194        B: Into<Bytes>,
195    {
196        self.body = HttpRequestBody::Stream(chunks.into_iter().map(Into::into).collect());
197        self.streaming_body = None;
198        self
199    }
200
201    /// Sets a deferred streaming upload body factory.
202    ///
203    /// The factory runs once per send attempt and returns a fresh async byte
204    /// stream, which allows retries to rebuild the outbound stream body.
205    ///
206    /// # Parameters
207    /// - `factory`: Async stream factory for per-attempt body generation.
208    ///
209    /// # Returns
210    /// `self` for chaining.
211    pub fn streaming_body<F>(mut self, factory: F) -> Self
212    where
213        F: Fn() -> Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>
214            + Send
215            + Sync
216            + 'static,
217    {
218        self.streaming_body = Some(HttpRequestStreamingBody::new(factory));
219        self.body = HttpRequestBody::Empty;
220        self
221    }
222
223    /// Sets a UTF-8 text body and adds `text/plain; charset=utf-8` if `Content-Type` is absent.
224    ///
225    /// # Parameters
226    /// - `body`: Text payload.
227    ///
228    /// # Returns
229    /// `self` for chaining.
230    pub fn text_body(mut self, body: impl Into<String>) -> Self {
231        if !self.headers.contains_key(CONTENT_TYPE) {
232            self.headers.insert(
233                CONTENT_TYPE,
234                HeaderValue::from_static("text/plain; charset=utf-8"),
235            );
236        }
237        self.body = HttpRequestBody::Text(body.into());
238        self.streaming_body = None;
239        self
240    }
241
242    /// Serializes `value` to JSON, sets body to those bytes, and adds `application/json` if needed.
243    ///
244    /// # Parameters
245    /// - `value`: Serializable value.
246    ///
247    /// # Returns
248    /// `Ok(self)` or [`HttpError`] if JSON encoding fails.
249    pub fn json_body<T>(mut self, value: &T) -> HttpResult<Self>
250    where
251        T: Serialize,
252    {
253        let bytes = serde_json::to_vec(value)
254            .map_err(|error| HttpError::decode(format!("Failed to encode JSON body: {}", error)))?;
255        if !self.headers.contains_key(CONTENT_TYPE) {
256            self.headers
257                .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
258        }
259        self.body = HttpRequestBody::Json(Bytes::from(bytes));
260        self.streaming_body = None;
261        Ok(self)
262    }
263
264    /// Serializes key-value pairs as `application/x-www-form-urlencoded`.
265    ///
266    /// # Parameters
267    /// - `fields`: Iterable of `(key, value)` string pairs.
268    ///
269    /// # Returns
270    /// `self` for chaining.
271    pub fn form_body<'a, I>(mut self, fields: I) -> Self
272    where
273        I: IntoIterator<Item = (&'a str, &'a str)>,
274    {
275        let mut serializer = form_urlencoded::Serializer::new(String::new());
276        for (key, value) in fields {
277            serializer.append_pair(key, value);
278        }
279        let body = serializer.finish();
280        if !self.headers.contains_key(CONTENT_TYPE) {
281            self.headers.insert(
282                CONTENT_TYPE,
283                HeaderValue::from_static("application/x-www-form-urlencoded"),
284            );
285        }
286        self.body = HttpRequestBody::Form(Bytes::from(body));
287        self.streaming_body = None;
288        self
289    }
290
291    /// Sets multipart body bytes and optional auto content-type by boundary.
292    ///
293    /// # Parameters
294    /// - `body`: Multipart payload bytes.
295    /// - `boundary`: Multipart boundary used in payload framing.
296    ///
297    /// # Returns
298    /// `Ok(self)` for chaining.
299    ///
300    /// # Errors
301    /// Returns [`HttpError`] when `boundary` is empty or content-type cannot be built.
302    pub fn multipart_body(mut self, body: impl Into<Bytes>, boundary: &str) -> HttpResult<Self> {
303        if boundary.trim().is_empty() {
304            return Err(HttpError::other(
305                "Multipart boundary cannot be empty for multipart_body",
306            ));
307        }
308        if !self.headers.contains_key(CONTENT_TYPE) {
309            let value = HeaderValue::from_str(&format!("multipart/form-data; boundary={boundary}"))
310                .map_err(|error| {
311                    HttpError::other(format!(
312                        "Invalid multipart Content-Type header value: {error}"
313                    ))
314                })?;
315            self.headers.insert(CONTENT_TYPE, value);
316        }
317        self.body = HttpRequestBody::Multipart(body.into());
318        self.streaming_body = None;
319        Ok(self)
320    }
321
322    /// Serializes records as NDJSON (`one JSON object per line`).
323    ///
324    /// # Parameters
325    /// - `records`: Serializable records to encode as NDJSON lines.
326    ///
327    /// # Returns
328    /// `Ok(self)` for chaining.
329    ///
330    /// # Errors
331    /// Returns [`HttpError`] when any record fails JSON serialization.
332    pub fn ndjson_body<T>(mut self, records: &[T]) -> HttpResult<Self>
333    where
334        T: Serialize,
335    {
336        let mut payload = String::new();
337        for record in records {
338            let line = serde_json::to_string(record).map_err(|error| {
339                HttpError::decode(format!("Failed to encode NDJSON record: {error}"))
340            })?;
341            payload.push_str(&line);
342            payload.push('\n');
343        }
344        if !self.headers.contains_key(CONTENT_TYPE) {
345            self.headers.insert(
346                CONTENT_TYPE,
347                HeaderValue::from_static("application/x-ndjson"),
348            );
349        }
350        self.body = HttpRequestBody::Ndjson(Bytes::from(payload));
351        self.streaming_body = None;
352        Ok(self)
353    }
354
355    /// Overrides the client-wide request timeout for this request only.
356    ///
357    /// This sets reqwest's per-request [`reqwest::RequestBuilder::timeout`], i.e. a
358    /// whole-request deadline for that HTTP call (see reqwest docs for exact semantics).
359    ///
360    /// # Parameters
361    /// - `timeout`: Maximum time for the whole request (reqwest `timeout`).
362    ///
363    /// # Returns
364    /// `self` for chaining.
365    pub fn request_timeout(mut self, timeout: Duration) -> Self {
366        self.request_timeout = Some(timeout);
367        self
368    }
369
370    /// Overrides the write-phase timeout for this request only.
371    ///
372    /// # Parameters
373    /// - `timeout`: Maximum time allowed for sending the request bytes.
374    ///
375    /// # Returns
376    /// `self` for chaining.
377    pub fn write_timeout(mut self, timeout: Duration) -> Self {
378        self.write_timeout = timeout;
379        self
380    }
381
382    /// Overrides the read-phase timeout for this request only.
383    ///
384    /// # Parameters
385    /// - `timeout`: Maximum time allowed for one read wait on response body.
386    ///
387    /// # Returns
388    /// `self` for chaining.
389    pub fn read_timeout(mut self, timeout: Duration) -> Self {
390        self.read_timeout = timeout;
391        self
392    }
393
394    /// Overrides the client default base URL for this request.
395    ///
396    /// # Parameters
397    /// - `base_url`: Base URL used when resolving relative request paths.
398    ///
399    /// # Returns
400    /// `self` for chaining.
401    pub fn base_url(mut self, base_url: Url) -> Self {
402        self.base_url = Some(base_url);
403        self
404    }
405
406    /// Clears the base URL for this request.
407    ///
408    /// # Returns
409    /// `self` for chaining.
410    pub fn clear_base_url(mut self) -> Self {
411        self.base_url = None;
412        self
413    }
414
415    /// Overrides whether this request enforces IPv4-only literal-host validation.
416    ///
417    /// # Parameters
418    /// - `enabled`: `true` to reject IPv6 literal hosts, `false` to allow them.
419    ///
420    /// # Returns
421    /// `self` for chaining.
422    pub fn ipv4_only(mut self, enabled: bool) -> Self {
423        self.ipv4_only = enabled;
424        self
425    }
426
427    /// Binds a [`CancellationToken`] to this request.
428    ///
429    /// # Parameters
430    /// - `token`: Cancellation token checked before send and during request/stream I/O.
431    ///
432    /// # Returns
433    /// `self` for chaining.
434    pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
435        self.cancellation_token = Some(token);
436        self
437    }
438
439    /// Forces retry enabled for this request even if client-level retry is disabled.
440    ///
441    /// # Returns
442    /// `self` for chaining.
443    pub fn force_retry(mut self) -> Self {
444        self.retry_override = self.retry_override.force_enable();
445        self
446    }
447
448    /// Disables retry for this request even if client-level retry is enabled.
449    ///
450    /// # Returns
451    /// `self` for chaining.
452    pub fn disable_retry(mut self) -> Self {
453        self.retry_override = self.retry_override.force_disable();
454        self
455    }
456
457    /// Overrides retryable-method policy for this request.
458    ///
459    /// # Parameters
460    /// - `policy`: Method policy to apply on this request only.
461    ///
462    /// # Returns
463    /// `self` for chaining.
464    pub fn retry_method_policy(mut self, policy: HttpRetryMethodPolicy) -> Self {
465        self.retry_override = self.retry_override.with_method_policy(policy);
466        self
467    }
468
469    /// Enables or disables honoring `Retry-After` for this request.
470    ///
471    /// # Parameters
472    /// - `enabled`: `true` to honor `Retry-After` on retryable status
473    ///   responses (`429` and `5xx`).
474    ///
475    /// # Returns
476    /// `self` for chaining.
477    pub fn honor_retry_after(mut self, enabled: bool) -> Self {
478        self.retry_override = self.retry_override.with_honor_retry_after(enabled);
479        self
480    }
481
482    /// Consumes the builder into a frozen [`HttpRequest`].
483    ///
484    /// # Returns
485    /// Built [`HttpRequest`].
486    pub fn build(self) -> HttpRequest {
487        HttpRequest::new(self)
488    }
489}