Skip to main content

qubit_http/request/
http_request_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! Builder for [`super::http_request::HttpRequest`].
10
11use std::time::Duration;
12use std::{future::Future, pin::Pin};
13
14use bytes::Bytes;
15use http::header::CONTENT_TYPE;
16use http::{HeaderMap, HeaderValue, Method};
17use serde::Serialize;
18use tokio_util::sync::CancellationToken;
19use url::form_urlencoded;
20use url::Url;
21
22use crate::{
23    AsyncHttpHeaderInjector, HttpClient, HttpError, HttpHeaderInjector, HttpRequestBodyByteStream,
24    HttpRequestStreamingBody, HttpResult, HttpRetryMethodPolicy,
25};
26
27use super::http_request::HttpRequest;
28use super::http_request_body::HttpRequestBody;
29use super::http_request_retry_override::HttpRequestRetryOverride;
30use super::parse_header;
31
32/// Builder for [`HttpRequest`](super::http_request::HttpRequest).
33#[derive(Debug, Clone)]
34pub struct HttpRequestBuilder {
35    /// HTTP method (e.g. GET, POST).
36    pub(super) method: Method,
37    /// Request path without the query string.
38    pub(super) path: String,
39    /// Query parameters as `(key, value)` pairs, appended to the URL when built.
40    pub(super) query: Vec<(String, String)>,
41    /// Request headers.
42    pub(super) headers: HeaderMap,
43    /// Request body; empty if not set.
44    pub(super) body: HttpRequestBody,
45    /// Deferred streaming upload body factory for per-attempt stream creation.
46    pub(super) streaming_body: Option<HttpRequestStreamingBody>,
47    /// Per-request timeout; if unset, the client default applies.
48    pub(super) request_timeout: Option<Duration>,
49    /// Per-request write timeout used by the send phase.
50    pub(super) write_timeout: Duration,
51    /// Per-request read timeout used by buffered/stream response reading.
52    pub(super) read_timeout: Duration,
53    /// Base URL copied from client options and used by [`HttpRequest::resolve_url`].
54    pub(super) base_url: Option<Url>,
55    /// Whether IPv6 literal hosts are rejected during URL resolution.
56    pub(super) ipv4_only: bool,
57    /// Optional cancellation token for this request.
58    pub(super) cancellation_token: Option<CancellationToken>,
59    /// Per-request retry override for one-off retry behavior customization.
60    pub(super) retry_override: HttpRequestRetryOverride,
61    /// Default headers snapshot from the originating client.
62    pub(super) default_headers: HeaderMap,
63    /// Sync header injectors snapshot from the originating client.
64    pub(super) injectors: Vec<HttpHeaderInjector>,
65    /// Async header injectors snapshot from the originating client.
66    pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
67}
68
69impl HttpRequestBuilder {
70    /// Starts a builder with method/path and copies supported defaults from client options.
71    ///
72    /// # Parameters
73    /// - `method`: HTTP verb.
74    /// - `path`: URL or relative path string.
75    /// - `client`: Source client whose relevant defaults are copied into this builder.
76    ///
77    /// # Returns
78    /// New [`HttpRequestBuilder`].
79    pub(crate) fn new(method: Method, path: &str, client: &HttpClient) -> Self {
80        let options = client.options();
81        Self {
82            method,
83            path: path.to_string(),
84            query: Vec::new(),
85            headers: HeaderMap::new(),
86            body: HttpRequestBody::Empty,
87            streaming_body: None,
88            request_timeout: options.timeouts.request_timeout,
89            write_timeout: options.timeouts.write_timeout,
90            read_timeout: options.timeouts.read_timeout,
91            base_url: options.base_url.clone(),
92            ipv4_only: options.ipv4_only,
93            cancellation_token: None,
94            retry_override: HttpRequestRetryOverride::default(),
95            default_headers: client.headers_snapshot(),
96            injectors: client.injectors_snapshot(),
97            async_injectors: client.async_injectors_snapshot(),
98        }
99    }
100
101    /// Appends a single `key=value` query pair (order preserved).
102    ///
103    /// # Parameters
104    /// - `key`: Query parameter name.
105    /// - `value`: Query parameter value.
106    ///
107    /// # Returns
108    /// `self` for chaining.
109    pub fn query_param(mut self, key: &str, value: &str) -> Self {
110        self.query.push((key.to_string(), value.to_string()));
111        self
112    }
113
114    /// Appends many query pairs via [`HttpRequestBuilder::query_param`].
115    ///
116    /// # Parameters
117    /// - `params`: Iterator of `(key, value)` pairs.
118    ///
119    /// # Returns
120    /// `self` for chaining.
121    pub fn query_params<'a, I>(mut self, params: I) -> Self
122    where
123        I: IntoIterator<Item = (&'a str, &'a str)>,
124    {
125        for (key, value) in params {
126            self = self.query_param(key, value);
127        }
128        self
129    }
130
131    /// Validates and inserts one header.
132    ///
133    /// # Parameters
134    /// - `name`: Header name (must be valid [`http::header::HeaderName`] bytes).
135    /// - `value`: Header value (must be valid [`http::header::HeaderValue`]).
136    ///
137    /// # Returns
138    /// `Ok(self)` or [`HttpError`] if name/value are invalid.
139    pub fn header(mut self, name: &str, value: &str) -> HttpResult<Self> {
140        let (header_name, header_value) = parse_header(name, value)?;
141        self.headers.insert(header_name, header_value);
142        Ok(self)
143    }
144
145    /// Merges all entries from `headers` into this builder (existing names may get extra values).
146    ///
147    /// # Parameters
148    /// - `headers`: Map to append.
149    ///
150    /// # Returns
151    /// `self` for chaining.
152    pub fn headers(mut self, headers: HeaderMap) -> Self {
153        self.headers.extend(headers);
154        self
155    }
156
157    /// Sets the body to raw bytes without changing `Content-Type` unless already set elsewhere.
158    ///
159    /// # Parameters
160    /// - `body`: Payload.
161    ///
162    /// # Returns
163    /// `self` for chaining.
164    pub fn bytes_body(mut self, body: impl Into<Bytes>) -> Self {
165        self.body = HttpRequestBody::Bytes(body.into());
166        self.streaming_body = None;
167        self
168    }
169
170    /// Sets the body to an ordered chunk stream for incremental upload.
171    ///
172    /// # Parameters
173    /// - `chunks`: Iterator of chunks in send order.
174    ///
175    /// # Returns
176    /// `self` for chaining.
177    pub fn stream_body<I, B>(mut self, chunks: I) -> Self
178    where
179        I: IntoIterator<Item = B>,
180        B: Into<Bytes>,
181    {
182        self.body = HttpRequestBody::Stream(chunks.into_iter().map(Into::into).collect());
183        self.streaming_body = None;
184        self
185    }
186
187    /// Sets a deferred streaming upload body factory.
188    ///
189    /// The factory runs once per send attempt and returns a fresh async byte
190    /// stream, which allows retries to rebuild the outbound stream body.
191    ///
192    /// # Parameters
193    /// - `factory`: Async stream factory for per-attempt body generation.
194    ///
195    /// # Returns
196    /// `self` for chaining.
197    pub fn streaming_body<F>(mut self, factory: F) -> Self
198    where
199        F: Fn() -> Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>
200            + Send
201            + Sync
202            + 'static,
203    {
204        self.streaming_body = Some(HttpRequestStreamingBody::new(factory));
205        self.body = HttpRequestBody::Empty;
206        self
207    }
208
209    /// Sets a UTF-8 text body and adds `text/plain; charset=utf-8` if `Content-Type` is absent.
210    ///
211    /// # Parameters
212    /// - `body`: Text payload.
213    ///
214    /// # Returns
215    /// `self` for chaining.
216    pub fn text_body(mut self, body: impl Into<String>) -> Self {
217        if !self.headers.contains_key(CONTENT_TYPE) {
218            self.headers.insert(
219                CONTENT_TYPE,
220                HeaderValue::from_static("text/plain; charset=utf-8"),
221            );
222        }
223        self.body = HttpRequestBody::Text(body.into());
224        self.streaming_body = None;
225        self
226    }
227
228    /// Serializes `value` to JSON, sets body to those bytes, and adds `application/json` if needed.
229    ///
230    /// # Parameters
231    /// - `value`: Serializable value.
232    ///
233    /// # Returns
234    /// `Ok(self)` or [`HttpError`] if JSON encoding fails.
235    pub fn json_body<T>(mut self, value: &T) -> HttpResult<Self>
236    where
237        T: Serialize,
238    {
239        let bytes = serde_json::to_vec(value)
240            .map_err(|error| HttpError::decode(format!("Failed to encode JSON body: {}", error)))?;
241        if !self.headers.contains_key(CONTENT_TYPE) {
242            self.headers
243                .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
244        }
245        self.body = HttpRequestBody::Json(Bytes::from(bytes));
246        self.streaming_body = None;
247        Ok(self)
248    }
249
250    /// Serializes key-value pairs as `application/x-www-form-urlencoded`.
251    ///
252    /// # Parameters
253    /// - `fields`: Iterable of `(key, value)` string pairs.
254    ///
255    /// # Returns
256    /// `self` for chaining.
257    pub fn form_body<'a, I>(mut self, fields: I) -> Self
258    where
259        I: IntoIterator<Item = (&'a str, &'a str)>,
260    {
261        let mut serializer = form_urlencoded::Serializer::new(String::new());
262        for (key, value) in fields {
263            serializer.append_pair(key, value);
264        }
265        let body = serializer.finish();
266        if !self.headers.contains_key(CONTENT_TYPE) {
267            self.headers.insert(
268                CONTENT_TYPE,
269                HeaderValue::from_static("application/x-www-form-urlencoded"),
270            );
271        }
272        self.body = HttpRequestBody::Form(Bytes::from(body));
273        self.streaming_body = None;
274        self
275    }
276
277    /// Sets multipart body bytes and optional auto content-type by boundary.
278    ///
279    /// # Parameters
280    /// - `body`: Multipart payload bytes.
281    /// - `boundary`: Multipart boundary used in payload framing.
282    ///
283    /// # Returns
284    /// `Ok(self)` for chaining.
285    ///
286    /// # Errors
287    /// Returns [`HttpError`] when `boundary` is empty or content-type cannot be built.
288    pub fn multipart_body(mut self, body: impl Into<Bytes>, boundary: &str) -> HttpResult<Self> {
289        if boundary.trim().is_empty() {
290            return Err(HttpError::other(
291                "Multipart boundary cannot be empty for multipart_body",
292            ));
293        }
294        if !self.headers.contains_key(CONTENT_TYPE) {
295            let value = HeaderValue::from_str(&format!("multipart/form-data; boundary={boundary}"))
296                .map_err(|error| {
297                    HttpError::other(format!(
298                        "Invalid multipart Content-Type header value: {error}"
299                    ))
300                })?;
301            self.headers.insert(CONTENT_TYPE, value);
302        }
303        self.body = HttpRequestBody::Multipart(body.into());
304        self.streaming_body = None;
305        Ok(self)
306    }
307
308    /// Serializes records as NDJSON (`one JSON object per line`).
309    ///
310    /// # Parameters
311    /// - `records`: Serializable records to encode as NDJSON lines.
312    ///
313    /// # Returns
314    /// `Ok(self)` for chaining.
315    ///
316    /// # Errors
317    /// Returns [`HttpError`] when any record fails JSON serialization.
318    pub fn ndjson_body<T>(mut self, records: &[T]) -> HttpResult<Self>
319    where
320        T: Serialize,
321    {
322        let mut payload = String::new();
323        for record in records {
324            let line = serde_json::to_string(record).map_err(|error| {
325                HttpError::decode(format!("Failed to encode NDJSON record: {error}"))
326            })?;
327            payload.push_str(&line);
328            payload.push('\n');
329        }
330        if !self.headers.contains_key(CONTENT_TYPE) {
331            self.headers.insert(
332                CONTENT_TYPE,
333                HeaderValue::from_static("application/x-ndjson"),
334            );
335        }
336        self.body = HttpRequestBody::Ndjson(Bytes::from(payload));
337        self.streaming_body = None;
338        Ok(self)
339    }
340
341    /// Overrides the client-wide request timeout for this request only.
342    ///
343    /// This sets reqwest's per-request [`reqwest::RequestBuilder::timeout`], i.e. a
344    /// whole-request deadline for that HTTP call (see reqwest docs for exact semantics).
345    ///
346    /// # Parameters
347    /// - `timeout`: Maximum time for the whole request (reqwest `timeout`).
348    ///
349    /// # Returns
350    /// `self` for chaining.
351    pub fn request_timeout(mut self, timeout: Duration) -> Self {
352        self.request_timeout = Some(timeout);
353        self
354    }
355
356    /// Overrides the write-phase timeout for this request only.
357    ///
358    /// # Parameters
359    /// - `timeout`: Maximum time allowed for sending the request bytes.
360    ///
361    /// # Returns
362    /// `self` for chaining.
363    pub fn write_timeout(mut self, timeout: Duration) -> Self {
364        self.write_timeout = timeout;
365        self
366    }
367
368    /// Overrides the read-phase timeout for this request only.
369    ///
370    /// # Parameters
371    /// - `timeout`: Maximum time allowed for one read wait on response body.
372    ///
373    /// # Returns
374    /// `self` for chaining.
375    pub fn read_timeout(mut self, timeout: Duration) -> Self {
376        self.read_timeout = timeout;
377        self
378    }
379
380    /// Overrides the client default base URL for this request.
381    ///
382    /// # Parameters
383    /// - `base_url`: Base URL used when resolving relative request paths.
384    ///
385    /// # Returns
386    /// `self` for chaining.
387    pub fn base_url(mut self, base_url: Url) -> Self {
388        self.base_url = Some(base_url);
389        self
390    }
391
392    /// Clears the base URL for this request.
393    ///
394    /// # Returns
395    /// `self` for chaining.
396    pub fn clear_base_url(mut self) -> Self {
397        self.base_url = None;
398        self
399    }
400
401    /// Overrides whether this request enforces IPv4-only literal-host validation.
402    ///
403    /// # Parameters
404    /// - `enabled`: `true` to reject IPv6 literal hosts, `false` to allow them.
405    ///
406    /// # Returns
407    /// `self` for chaining.
408    pub fn ipv4_only(mut self, enabled: bool) -> Self {
409        self.ipv4_only = enabled;
410        self
411    }
412
413    /// Binds a [`CancellationToken`] to this request.
414    ///
415    /// # Parameters
416    /// - `token`: Cancellation token checked before send and during request/stream I/O.
417    ///
418    /// # Returns
419    /// `self` for chaining.
420    pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
421        self.cancellation_token = Some(token);
422        self
423    }
424
425    /// Forces retry enabled for this request even if client-level retry is disabled.
426    ///
427    /// # Returns
428    /// `self` for chaining.
429    pub fn force_retry(mut self) -> Self {
430        self.retry_override = self.retry_override.force_enable();
431        self
432    }
433
434    /// Disables retry for this request even if client-level retry is enabled.
435    ///
436    /// # Returns
437    /// `self` for chaining.
438    pub fn disable_retry(mut self) -> Self {
439        self.retry_override = self.retry_override.force_disable();
440        self
441    }
442
443    /// Overrides retryable-method policy for this request.
444    ///
445    /// # Parameters
446    /// - `policy`: Method policy to apply on this request only.
447    ///
448    /// # Returns
449    /// `self` for chaining.
450    pub fn retry_method_policy(mut self, policy: HttpRetryMethodPolicy) -> Self {
451        self.retry_override = self.retry_override.with_method_policy(policy);
452        self
453    }
454
455    /// Enables or disables honoring `Retry-After` for this request.
456    ///
457    /// # Parameters
458    /// - `enabled`: `true` to honor `Retry-After` on retryable status
459    ///   responses (`429` and `5xx`).
460    ///
461    /// # Returns
462    /// `self` for chaining.
463    pub fn honor_retry_after(mut self, enabled: bool) -> Self {
464        self.retry_override = self.retry_override.with_honor_retry_after(enabled);
465        self
466    }
467
468    /// Consumes the builder into a frozen [`HttpRequest`].
469    ///
470    /// # Returns
471    /// Built [`HttpRequest`].
472    pub fn build(self) -> HttpRequest {
473        HttpRequest::new(self)
474    }
475}