Skip to main content

qubit_http/request/
http_request_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`super::http_request::HttpRequest`].
11
12use std::time::Duration;
13use std::{future::Future, pin::Pin};
14
15use bytes::Bytes;
16use http::header::CONTENT_TYPE;
17use http::{HeaderMap, HeaderValue, Method};
18use serde::Serialize;
19use tokio_util::sync::CancellationToken;
20use url::form_urlencoded;
21use url::Url;
22
23use crate::{
24    AsyncHttpHeaderInjector, HttpClient, HttpError, HttpHeaderInjector, HttpRequestBodyByteStream,
25    HttpRequestStreamingBody, HttpResult, HttpRetryMethodPolicy,
26};
27
28use super::http_request::HttpRequest;
29use super::http_request_body::HttpRequestBody;
30use super::http_request_retry_override::HttpRequestRetryOverride;
31use super::parse_header;
32
33/// Builder for [`HttpRequest`](super::http_request::HttpRequest).
34#[derive(Debug, Clone)]
35pub struct HttpRequestBuilder {
36    /// HTTP method (e.g. GET, POST).
37    pub(super) method: Method,
38    /// Request path without the query string.
39    pub(super) path: String,
40    /// Query parameters as `(key, value)` pairs, appended to the URL when built.
41    pub(super) query: Vec<(String, String)>,
42    /// Request headers.
43    pub(super) headers: HeaderMap,
44    /// Request body; empty if not set.
45    pub(super) body: HttpRequestBody,
46    /// Deferred streaming upload body factory for per-attempt stream creation.
47    pub(super) streaming_body: Option<HttpRequestStreamingBody>,
48    /// Per-request timeout; if unset, the client default applies.
49    pub(super) request_timeout: Option<Duration>,
50    /// Per-request write timeout used by the send phase.
51    pub(super) write_timeout: Duration,
52    /// Per-request read timeout used by buffered/stream response reading.
53    pub(super) read_timeout: Duration,
54    /// Base URL copied from client options and used by [`HttpRequest::resolve_url`].
55    pub(super) base_url: Option<Url>,
56    /// Whether IPv6 literal hosts are rejected during URL resolution.
57    pub(super) ipv4_only: bool,
58    /// Optional cancellation token for this request.
59    pub(super) cancellation_token: Option<CancellationToken>,
60    /// Per-request retry override for one-off retry behavior customization.
61    pub(super) retry_override: HttpRequestRetryOverride,
62    /// Default headers snapshot from the originating client.
63    pub(super) default_headers: HeaderMap,
64    /// Sync header injectors snapshot from the originating client.
65    pub(super) injectors: Vec<HttpHeaderInjector>,
66    /// Async header injectors snapshot from the originating client.
67    pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
68}
69
70impl HttpRequestBuilder {
71    /// Starts a builder with method/path and copies supported defaults from client options.
72    ///
73    /// # Parameters
74    /// - `method`: HTTP verb.
75    /// - `path`: URL or relative path string.
76    /// - `client`: Source client whose relevant defaults are copied into this builder.
77    ///
78    /// # Returns
79    /// New [`HttpRequestBuilder`].
80    pub(crate) fn new(method: Method, path: &str, client: &HttpClient) -> Self {
81        let options = client.options();
82        Self {
83            method,
84            path: path.to_string(),
85            query: Vec::new(),
86            headers: HeaderMap::new(),
87            body: HttpRequestBody::Empty,
88            streaming_body: None,
89            request_timeout: options.timeouts.request_timeout,
90            write_timeout: options.timeouts.write_timeout,
91            read_timeout: options.timeouts.read_timeout,
92            base_url: options.base_url.clone(),
93            ipv4_only: options.ipv4_only,
94            cancellation_token: None,
95            retry_override: HttpRequestRetryOverride::default(),
96            default_headers: client.headers_snapshot(),
97            injectors: client.injectors_snapshot(),
98            async_injectors: client.async_injectors_snapshot(),
99        }
100    }
101
102    /// Appends a single `key=value` query pair (order preserved).
103    ///
104    /// # Parameters
105    /// - `key`: Query parameter name.
106    /// - `value`: Query parameter value.
107    ///
108    /// # Returns
109    /// `self` for chaining.
110    pub fn query_param(mut self, key: &str, value: &str) -> Self {
111        self.query.push((key.to_string(), value.to_string()));
112        self
113    }
114
115    /// Appends many query pairs via [`HttpRequestBuilder::query_param`].
116    ///
117    /// # Parameters
118    /// - `params`: Iterator of `(key, value)` pairs.
119    ///
120    /// # Returns
121    /// `self` for chaining.
122    pub fn query_params<'a, I>(mut self, params: I) -> Self
123    where
124        I: IntoIterator<Item = (&'a str, &'a str)>,
125    {
126        for (key, value) in params {
127            self = self.query_param(key, value);
128        }
129        self
130    }
131
132    /// Validates and inserts one header.
133    ///
134    /// # Parameters
135    /// - `name`: Header name (must be valid [`http::header::HeaderName`] bytes).
136    /// - `value`: Header value (must be valid [`http::header::HeaderValue`]).
137    ///
138    /// # Returns
139    /// `Ok(self)` or [`HttpError`] if name/value are invalid.
140    pub fn header(mut self, name: &str, value: &str) -> HttpResult<Self> {
141        let (header_name, header_value) = parse_header(name, value)?;
142        self.headers.insert(header_name, header_value);
143        Ok(self)
144    }
145
146    /// Merges all entries from `headers` into this builder (existing names may get extra values).
147    ///
148    /// # Parameters
149    /// - `headers`: Map to append.
150    ///
151    /// # Returns
152    /// `self` for chaining.
153    pub fn headers(mut self, headers: HeaderMap) -> Self {
154        self.headers.extend(headers);
155        self
156    }
157
158    /// Sets the body to raw bytes without changing `Content-Type` unless already set elsewhere.
159    ///
160    /// # Parameters
161    /// - `body`: Payload.
162    ///
163    /// # Returns
164    /// `self` for chaining.
165    pub fn bytes_body(mut self, body: impl Into<Bytes>) -> Self {
166        self.body = HttpRequestBody::Bytes(body.into());
167        self.streaming_body = None;
168        self
169    }
170
171    /// Sets the body to an ordered chunk stream for incremental upload.
172    ///
173    /// # Parameters
174    /// - `chunks`: Iterator of chunks in send order.
175    ///
176    /// # Returns
177    /// `self` for chaining.
178    pub fn stream_body<I, B>(mut self, chunks: I) -> Self
179    where
180        I: IntoIterator<Item = B>,
181        B: Into<Bytes>,
182    {
183        self.body = HttpRequestBody::Stream(chunks.into_iter().map(Into::into).collect());
184        self.streaming_body = None;
185        self
186    }
187
188    /// Sets a deferred streaming upload body factory.
189    ///
190    /// The factory runs once per send attempt and returns a fresh async byte
191    /// stream, which allows retries to rebuild the outbound stream body.
192    ///
193    /// # Parameters
194    /// - `factory`: Async stream factory for per-attempt body generation.
195    ///
196    /// # Returns
197    /// `self` for chaining.
198    pub fn streaming_body<F>(mut self, factory: F) -> Self
199    where
200        F: Fn() -> Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>
201            + Send
202            + Sync
203            + 'static,
204    {
205        self.streaming_body = Some(HttpRequestStreamingBody::new(factory));
206        self.body = HttpRequestBody::Empty;
207        self
208    }
209
210    /// Sets a UTF-8 text body and adds `text/plain; charset=utf-8` if `Content-Type` is absent.
211    ///
212    /// # Parameters
213    /// - `body`: Text payload.
214    ///
215    /// # Returns
216    /// `self` for chaining.
217    pub fn text_body(mut self, body: impl Into<String>) -> Self {
218        if !self.headers.contains_key(CONTENT_TYPE) {
219            self.headers.insert(
220                CONTENT_TYPE,
221                HeaderValue::from_static("text/plain; charset=utf-8"),
222            );
223        }
224        self.body = HttpRequestBody::Text(body.into());
225        self.streaming_body = None;
226        self
227    }
228
229    /// Serializes `value` to JSON, sets body to those bytes, and adds `application/json` if needed.
230    ///
231    /// # Parameters
232    /// - `value`: Serializable value.
233    ///
234    /// # Returns
235    /// `Ok(self)` or [`HttpError`] if JSON encoding fails.
236    pub fn json_body<T>(mut self, value: &T) -> HttpResult<Self>
237    where
238        T: Serialize,
239    {
240        let bytes = serde_json::to_vec(value)
241            .map_err(|error| HttpError::decode(format!("Failed to encode JSON body: {}", error)))?;
242        if !self.headers.contains_key(CONTENT_TYPE) {
243            self.headers
244                .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
245        }
246        self.body = HttpRequestBody::Json(Bytes::from(bytes));
247        self.streaming_body = None;
248        Ok(self)
249    }
250
251    /// Serializes key-value pairs as `application/x-www-form-urlencoded`.
252    ///
253    /// # Parameters
254    /// - `fields`: Iterable of `(key, value)` string pairs.
255    ///
256    /// # Returns
257    /// `self` for chaining.
258    pub fn form_body<'a, I>(mut self, fields: I) -> Self
259    where
260        I: IntoIterator<Item = (&'a str, &'a str)>,
261    {
262        let mut serializer = form_urlencoded::Serializer::new(String::new());
263        for (key, value) in fields {
264            serializer.append_pair(key, value);
265        }
266        let body = serializer.finish();
267        if !self.headers.contains_key(CONTENT_TYPE) {
268            self.headers.insert(
269                CONTENT_TYPE,
270                HeaderValue::from_static("application/x-www-form-urlencoded"),
271            );
272        }
273        self.body = HttpRequestBody::Form(Bytes::from(body));
274        self.streaming_body = None;
275        self
276    }
277
278    /// Sets multipart body bytes and optional auto content-type by boundary.
279    ///
280    /// # Parameters
281    /// - `body`: Multipart payload bytes.
282    /// - `boundary`: Multipart boundary used in payload framing.
283    ///
284    /// # Returns
285    /// `Ok(self)` for chaining.
286    ///
287    /// # Errors
288    /// Returns [`HttpError`] when `boundary` is empty or content-type cannot be built.
289    pub fn multipart_body(mut self, body: impl Into<Bytes>, boundary: &str) -> HttpResult<Self> {
290        if boundary.trim().is_empty() {
291            return Err(HttpError::other(
292                "Multipart boundary cannot be empty for multipart_body",
293            ));
294        }
295        if !self.headers.contains_key(CONTENT_TYPE) {
296            let value = HeaderValue::from_str(&format!("multipart/form-data; boundary={boundary}"))
297                .map_err(|error| {
298                    HttpError::other(format!(
299                        "Invalid multipart Content-Type header value: {error}"
300                    ))
301                })?;
302            self.headers.insert(CONTENT_TYPE, value);
303        }
304        self.body = HttpRequestBody::Multipart(body.into());
305        self.streaming_body = None;
306        Ok(self)
307    }
308
309    /// Serializes records as NDJSON (`one JSON object per line`).
310    ///
311    /// # Parameters
312    /// - `records`: Serializable records to encode as NDJSON lines.
313    ///
314    /// # Returns
315    /// `Ok(self)` for chaining.
316    ///
317    /// # Errors
318    /// Returns [`HttpError`] when any record fails JSON serialization.
319    pub fn ndjson_body<T>(mut self, records: &[T]) -> HttpResult<Self>
320    where
321        T: Serialize,
322    {
323        let mut payload = String::new();
324        for record in records {
325            let line = serde_json::to_string(record).map_err(|error| {
326                HttpError::decode(format!("Failed to encode NDJSON record: {error}"))
327            })?;
328            payload.push_str(&line);
329            payload.push('\n');
330        }
331        if !self.headers.contains_key(CONTENT_TYPE) {
332            self.headers.insert(
333                CONTENT_TYPE,
334                HeaderValue::from_static("application/x-ndjson"),
335            );
336        }
337        self.body = HttpRequestBody::Ndjson(Bytes::from(payload));
338        self.streaming_body = None;
339        Ok(self)
340    }
341
342    /// Overrides the client-wide request timeout for this request only.
343    ///
344    /// This sets reqwest's per-request [`reqwest::RequestBuilder::timeout`], i.e. a
345    /// whole-request deadline for that HTTP call (see reqwest docs for exact semantics).
346    ///
347    /// # Parameters
348    /// - `timeout`: Maximum time for the whole request (reqwest `timeout`).
349    ///
350    /// # Returns
351    /// `self` for chaining.
352    pub fn request_timeout(mut self, timeout: Duration) -> Self {
353        self.request_timeout = Some(timeout);
354        self
355    }
356
357    /// Overrides the write-phase timeout for this request only.
358    ///
359    /// # Parameters
360    /// - `timeout`: Maximum time allowed for sending the request bytes.
361    ///
362    /// # Returns
363    /// `self` for chaining.
364    pub fn write_timeout(mut self, timeout: Duration) -> Self {
365        self.write_timeout = timeout;
366        self
367    }
368
369    /// Overrides the read-phase timeout for this request only.
370    ///
371    /// # Parameters
372    /// - `timeout`: Maximum time allowed for one read wait on response body.
373    ///
374    /// # Returns
375    /// `self` for chaining.
376    pub fn read_timeout(mut self, timeout: Duration) -> Self {
377        self.read_timeout = timeout;
378        self
379    }
380
381    /// Overrides the client default base URL for this request.
382    ///
383    /// # Parameters
384    /// - `base_url`: Base URL used when resolving relative request paths.
385    ///
386    /// # Returns
387    /// `self` for chaining.
388    pub fn base_url(mut self, base_url: Url) -> Self {
389        self.base_url = Some(base_url);
390        self
391    }
392
393    /// Clears the base URL for this request.
394    ///
395    /// # Returns
396    /// `self` for chaining.
397    pub fn clear_base_url(mut self) -> Self {
398        self.base_url = None;
399        self
400    }
401
402    /// Overrides whether this request enforces IPv4-only literal-host validation.
403    ///
404    /// # Parameters
405    /// - `enabled`: `true` to reject IPv6 literal hosts, `false` to allow them.
406    ///
407    /// # Returns
408    /// `self` for chaining.
409    pub fn ipv4_only(mut self, enabled: bool) -> Self {
410        self.ipv4_only = enabled;
411        self
412    }
413
414    /// Binds a [`CancellationToken`] to this request.
415    ///
416    /// # Parameters
417    /// - `token`: Cancellation token checked before send and during request/stream I/O.
418    ///
419    /// # Returns
420    /// `self` for chaining.
421    pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
422        self.cancellation_token = Some(token);
423        self
424    }
425
426    /// Forces retry enabled for this request even if client-level retry is disabled.
427    ///
428    /// # Returns
429    /// `self` for chaining.
430    pub fn force_retry(mut self) -> Self {
431        self.retry_override = self.retry_override.force_enable();
432        self
433    }
434
435    /// Disables retry for this request even if client-level retry is enabled.
436    ///
437    /// # Returns
438    /// `self` for chaining.
439    pub fn disable_retry(mut self) -> Self {
440        self.retry_override = self.retry_override.force_disable();
441        self
442    }
443
444    /// Overrides retryable-method policy for this request.
445    ///
446    /// # Parameters
447    /// - `policy`: Method policy to apply on this request only.
448    ///
449    /// # Returns
450    /// `self` for chaining.
451    pub fn retry_method_policy(mut self, policy: HttpRetryMethodPolicy) -> Self {
452        self.retry_override = self.retry_override.with_method_policy(policy);
453        self
454    }
455
456    /// Enables or disables honoring `Retry-After` for this request.
457    ///
458    /// # Parameters
459    /// - `enabled`: `true` to honor `Retry-After` on retryable status
460    ///   responses (`429` and `5xx`).
461    ///
462    /// # Returns
463    /// `self` for chaining.
464    pub fn honor_retry_after(mut self, enabled: bool) -> Self {
465        self.retry_override = self.retry_override.with_honor_retry_after(enabled);
466        self
467    }
468
469    /// Consumes the builder into a frozen [`HttpRequest`].
470    ///
471    /// # Returns
472    /// Built [`HttpRequest`].
473    pub fn build(self) -> HttpRequest {
474        HttpRequest::new(self)
475    }
476}