qubit_http/request/http_request_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! Builder for [`super::http_request::HttpRequest`].
10
11use std::time::Duration;
12use std::{future::Future, pin::Pin};
13
14use bytes::Bytes;
15use http::header::CONTENT_TYPE;
16use http::{HeaderMap, HeaderValue, Method};
17use serde::Serialize;
18use tokio_util::sync::CancellationToken;
19use url::form_urlencoded;
20use url::Url;
21
22use crate::{
23 AsyncHttpHeaderInjector, HttpClient, HttpError, HttpHeaderInjector, HttpRequestBodyByteStream,
24 HttpRequestStreamingBody, HttpResult, HttpRetryMethodPolicy,
25};
26
27use super::http_request::HttpRequest;
28use super::http_request_body::HttpRequestBody;
29use super::http_request_retry_override::HttpRequestRetryOverride;
30use super::parse_header;
31
32/// Builder for [`HttpRequest`](super::http_request::HttpRequest).
33#[derive(Debug, Clone)]
34pub struct HttpRequestBuilder {
35 /// HTTP method (e.g. GET, POST).
36 pub(super) method: Method,
37 /// Request path without the query string.
38 pub(super) path: String,
39 /// Query parameters as `(key, value)` pairs, appended to the URL when built.
40 pub(super) query: Vec<(String, String)>,
41 /// Request headers.
42 pub(super) headers: HeaderMap,
43 /// Request body; empty if not set.
44 pub(super) body: HttpRequestBody,
45 /// Deferred streaming upload body factory for per-attempt stream creation.
46 pub(super) streaming_body: Option<HttpRequestStreamingBody>,
47 /// Per-request timeout; if unset, the client default applies.
48 pub(super) request_timeout: Option<Duration>,
49 /// Per-request write timeout used by the send phase.
50 pub(super) write_timeout: Duration,
51 /// Per-request read timeout used by buffered/stream response reading.
52 pub(super) read_timeout: Duration,
53 /// Base URL copied from client options and used by [`HttpRequest::resolve_url`].
54 pub(super) base_url: Option<Url>,
55 /// Whether IPv6 literal hosts are rejected during URL resolution.
56 pub(super) ipv4_only: bool,
57 /// Optional cancellation token for this request.
58 pub(super) cancellation_token: Option<CancellationToken>,
59 /// Per-request retry override for one-off retry behavior customization.
60 pub(super) retry_override: HttpRequestRetryOverride,
61 /// Default headers snapshot from the originating client.
62 pub(super) default_headers: HeaderMap,
63 /// Sync header injectors snapshot from the originating client.
64 pub(super) injectors: Vec<HttpHeaderInjector>,
65 /// Async header injectors snapshot from the originating client.
66 pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
67}
68
69impl HttpRequestBuilder {
70 /// Starts a builder with method/path and copies supported defaults from client options.
71 ///
72 /// # Parameters
73 /// - `method`: HTTP verb.
74 /// - `path`: URL or relative path string.
75 /// - `client`: Source client whose relevant defaults are copied into this builder.
76 ///
77 /// # Returns
78 /// New [`HttpRequestBuilder`].
79 pub(crate) fn new(method: Method, path: &str, client: &HttpClient) -> Self {
80 let options = client.options();
81 Self {
82 method,
83 path: path.to_string(),
84 query: Vec::new(),
85 headers: HeaderMap::new(),
86 body: HttpRequestBody::Empty,
87 streaming_body: None,
88 request_timeout: options.timeouts.request_timeout,
89 write_timeout: options.timeouts.write_timeout,
90 read_timeout: options.timeouts.read_timeout,
91 base_url: options.base_url.clone(),
92 ipv4_only: options.ipv4_only,
93 cancellation_token: None,
94 retry_override: HttpRequestRetryOverride::default(),
95 default_headers: client.headers_snapshot(),
96 injectors: client.injectors_snapshot(),
97 async_injectors: client.async_injectors_snapshot(),
98 }
99 }
100
101 /// Appends a single `key=value` query pair (order preserved).
102 ///
103 /// # Parameters
104 /// - `key`: Query parameter name.
105 /// - `value`: Query parameter value.
106 ///
107 /// # Returns
108 /// `self` for chaining.
109 pub fn query_param(mut self, key: &str, value: &str) -> Self {
110 self.query.push((key.to_string(), value.to_string()));
111 self
112 }
113
114 /// Appends many query pairs via [`HttpRequestBuilder::query_param`].
115 ///
116 /// # Parameters
117 /// - `params`: Iterator of `(key, value)` pairs.
118 ///
119 /// # Returns
120 /// `self` for chaining.
121 pub fn query_params<'a, I>(mut self, params: I) -> Self
122 where
123 I: IntoIterator<Item = (&'a str, &'a str)>,
124 {
125 for (key, value) in params {
126 self = self.query_param(key, value);
127 }
128 self
129 }
130
131 /// Validates and inserts one header.
132 ///
133 /// # Parameters
134 /// - `name`: Header name (must be valid [`http::header::HeaderName`] bytes).
135 /// - `value`: Header value (must be valid [`http::header::HeaderValue`]).
136 ///
137 /// # Returns
138 /// `Ok(self)` or [`HttpError`] if name/value are invalid.
139 pub fn header(mut self, name: &str, value: &str) -> HttpResult<Self> {
140 let (header_name, header_value) = parse_header(name, value)?;
141 self.headers.insert(header_name, header_value);
142 Ok(self)
143 }
144
145 /// Merges all entries from `headers` into this builder (existing names may get extra values).
146 ///
147 /// # Parameters
148 /// - `headers`: Map to append.
149 ///
150 /// # Returns
151 /// `self` for chaining.
152 pub fn headers(mut self, headers: HeaderMap) -> Self {
153 self.headers.extend(headers);
154 self
155 }
156
157 /// Sets the body to raw bytes without changing `Content-Type` unless already set elsewhere.
158 ///
159 /// # Parameters
160 /// - `body`: Payload.
161 ///
162 /// # Returns
163 /// `self` for chaining.
164 pub fn bytes_body(mut self, body: impl Into<Bytes>) -> Self {
165 self.body = HttpRequestBody::Bytes(body.into());
166 self.streaming_body = None;
167 self
168 }
169
170 /// Sets the body to an ordered chunk stream for incremental upload.
171 ///
172 /// # Parameters
173 /// - `chunks`: Iterator of chunks in send order.
174 ///
175 /// # Returns
176 /// `self` for chaining.
177 pub fn stream_body<I, B>(mut self, chunks: I) -> Self
178 where
179 I: IntoIterator<Item = B>,
180 B: Into<Bytes>,
181 {
182 self.body = HttpRequestBody::Stream(chunks.into_iter().map(Into::into).collect());
183 self.streaming_body = None;
184 self
185 }
186
187 /// Sets a deferred streaming upload body factory.
188 ///
189 /// The factory runs once per send attempt and returns a fresh async byte
190 /// stream, which allows retries to rebuild the outbound stream body.
191 ///
192 /// # Parameters
193 /// - `factory`: Async stream factory for per-attempt body generation.
194 ///
195 /// # Returns
196 /// `self` for chaining.
197 pub fn streaming_body<F>(mut self, factory: F) -> Self
198 where
199 F: Fn() -> Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>
200 + Send
201 + Sync
202 + 'static,
203 {
204 self.streaming_body = Some(HttpRequestStreamingBody::new(factory));
205 self.body = HttpRequestBody::Empty;
206 self
207 }
208
209 /// Sets a UTF-8 text body and adds `text/plain; charset=utf-8` if `Content-Type` is absent.
210 ///
211 /// # Parameters
212 /// - `body`: Text payload.
213 ///
214 /// # Returns
215 /// `self` for chaining.
216 pub fn text_body(mut self, body: impl Into<String>) -> Self {
217 if !self.headers.contains_key(CONTENT_TYPE) {
218 self.headers.insert(
219 CONTENT_TYPE,
220 HeaderValue::from_static("text/plain; charset=utf-8"),
221 );
222 }
223 self.body = HttpRequestBody::Text(body.into());
224 self.streaming_body = None;
225 self
226 }
227
228 /// Serializes `value` to JSON, sets body to those bytes, and adds `application/json` if needed.
229 ///
230 /// # Parameters
231 /// - `value`: Serializable value.
232 ///
233 /// # Returns
234 /// `Ok(self)` or [`HttpError`] if JSON encoding fails.
235 pub fn json_body<T>(mut self, value: &T) -> HttpResult<Self>
236 where
237 T: Serialize,
238 {
239 let bytes = serde_json::to_vec(value)
240 .map_err(|error| HttpError::decode(format!("Failed to encode JSON body: {}", error)))?;
241 if !self.headers.contains_key(CONTENT_TYPE) {
242 self.headers
243 .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
244 }
245 self.body = HttpRequestBody::Json(Bytes::from(bytes));
246 self.streaming_body = None;
247 Ok(self)
248 }
249
250 /// Serializes key-value pairs as `application/x-www-form-urlencoded`.
251 ///
252 /// # Parameters
253 /// - `fields`: Iterable of `(key, value)` string pairs.
254 ///
255 /// # Returns
256 /// `self` for chaining.
257 pub fn form_body<'a, I>(mut self, fields: I) -> Self
258 where
259 I: IntoIterator<Item = (&'a str, &'a str)>,
260 {
261 let mut serializer = form_urlencoded::Serializer::new(String::new());
262 for (key, value) in fields {
263 serializer.append_pair(key, value);
264 }
265 let body = serializer.finish();
266 if !self.headers.contains_key(CONTENT_TYPE) {
267 self.headers.insert(
268 CONTENT_TYPE,
269 HeaderValue::from_static("application/x-www-form-urlencoded"),
270 );
271 }
272 self.body = HttpRequestBody::Form(Bytes::from(body));
273 self.streaming_body = None;
274 self
275 }
276
277 /// Sets multipart body bytes and optional auto content-type by boundary.
278 ///
279 /// # Parameters
280 /// - `body`: Multipart payload bytes.
281 /// - `boundary`: Multipart boundary used in payload framing.
282 ///
283 /// # Returns
284 /// `Ok(self)` for chaining.
285 ///
286 /// # Errors
287 /// Returns [`HttpError`] when `boundary` is empty or content-type cannot be built.
288 pub fn multipart_body(mut self, body: impl Into<Bytes>, boundary: &str) -> HttpResult<Self> {
289 if boundary.trim().is_empty() {
290 return Err(HttpError::other(
291 "Multipart boundary cannot be empty for multipart_body",
292 ));
293 }
294 if !self.headers.contains_key(CONTENT_TYPE) {
295 let value = HeaderValue::from_str(&format!("multipart/form-data; boundary={boundary}"))
296 .map_err(|error| {
297 HttpError::other(format!(
298 "Invalid multipart Content-Type header value: {error}"
299 ))
300 })?;
301 self.headers.insert(CONTENT_TYPE, value);
302 }
303 self.body = HttpRequestBody::Multipart(body.into());
304 self.streaming_body = None;
305 Ok(self)
306 }
307
308 /// Serializes records as NDJSON (`one JSON object per line`).
309 ///
310 /// # Parameters
311 /// - `records`: Serializable records to encode as NDJSON lines.
312 ///
313 /// # Returns
314 /// `Ok(self)` for chaining.
315 ///
316 /// # Errors
317 /// Returns [`HttpError`] when any record fails JSON serialization.
318 pub fn ndjson_body<T>(mut self, records: &[T]) -> HttpResult<Self>
319 where
320 T: Serialize,
321 {
322 let mut payload = String::new();
323 for record in records {
324 let line = serde_json::to_string(record).map_err(|error| {
325 HttpError::decode(format!("Failed to encode NDJSON record: {error}"))
326 })?;
327 payload.push_str(&line);
328 payload.push('\n');
329 }
330 if !self.headers.contains_key(CONTENT_TYPE) {
331 self.headers.insert(
332 CONTENT_TYPE,
333 HeaderValue::from_static("application/x-ndjson"),
334 );
335 }
336 self.body = HttpRequestBody::Ndjson(Bytes::from(payload));
337 self.streaming_body = None;
338 Ok(self)
339 }
340
341 /// Overrides the client-wide request timeout for this request only.
342 ///
343 /// This sets reqwest's per-request [`reqwest::RequestBuilder::timeout`], i.e. a
344 /// whole-request deadline for that HTTP call (see reqwest docs for exact semantics).
345 ///
346 /// # Parameters
347 /// - `timeout`: Maximum time for the whole request (reqwest `timeout`).
348 ///
349 /// # Returns
350 /// `self` for chaining.
351 pub fn request_timeout(mut self, timeout: Duration) -> Self {
352 self.request_timeout = Some(timeout);
353 self
354 }
355
356 /// Overrides the write-phase timeout for this request only.
357 ///
358 /// # Parameters
359 /// - `timeout`: Maximum time allowed for sending the request bytes.
360 ///
361 /// # Returns
362 /// `self` for chaining.
363 pub fn write_timeout(mut self, timeout: Duration) -> Self {
364 self.write_timeout = timeout;
365 self
366 }
367
368 /// Overrides the read-phase timeout for this request only.
369 ///
370 /// # Parameters
371 /// - `timeout`: Maximum time allowed for one read wait on response body.
372 ///
373 /// # Returns
374 /// `self` for chaining.
375 pub fn read_timeout(mut self, timeout: Duration) -> Self {
376 self.read_timeout = timeout;
377 self
378 }
379
380 /// Overrides the client default base URL for this request.
381 ///
382 /// # Parameters
383 /// - `base_url`: Base URL used when resolving relative request paths.
384 ///
385 /// # Returns
386 /// `self` for chaining.
387 pub fn base_url(mut self, base_url: Url) -> Self {
388 self.base_url = Some(base_url);
389 self
390 }
391
392 /// Clears the base URL for this request.
393 ///
394 /// # Returns
395 /// `self` for chaining.
396 pub fn clear_base_url(mut self) -> Self {
397 self.base_url = None;
398 self
399 }
400
401 /// Overrides whether this request enforces IPv4-only literal-host validation.
402 ///
403 /// # Parameters
404 /// - `enabled`: `true` to reject IPv6 literal hosts, `false` to allow them.
405 ///
406 /// # Returns
407 /// `self` for chaining.
408 pub fn ipv4_only(mut self, enabled: bool) -> Self {
409 self.ipv4_only = enabled;
410 self
411 }
412
413 /// Binds a [`CancellationToken`] to this request.
414 ///
415 /// # Parameters
416 /// - `token`: Cancellation token checked before send and during request/stream I/O.
417 ///
418 /// # Returns
419 /// `self` for chaining.
420 pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
421 self.cancellation_token = Some(token);
422 self
423 }
424
425 /// Forces retry enabled for this request even if client-level retry is disabled.
426 ///
427 /// # Returns
428 /// `self` for chaining.
429 pub fn force_retry(mut self) -> Self {
430 self.retry_override = self.retry_override.force_enable();
431 self
432 }
433
434 /// Disables retry for this request even if client-level retry is enabled.
435 ///
436 /// # Returns
437 /// `self` for chaining.
438 pub fn disable_retry(mut self) -> Self {
439 self.retry_override = self.retry_override.force_disable();
440 self
441 }
442
443 /// Overrides retryable-method policy for this request.
444 ///
445 /// # Parameters
446 /// - `policy`: Method policy to apply on this request only.
447 ///
448 /// # Returns
449 /// `self` for chaining.
450 pub fn retry_method_policy(mut self, policy: HttpRetryMethodPolicy) -> Self {
451 self.retry_override = self.retry_override.with_method_policy(policy);
452 self
453 }
454
455 /// Enables or disables honoring `Retry-After` for this request.
456 ///
457 /// # Parameters
458 /// - `enabled`: `true` to honor `Retry-After` on retryable status
459 /// responses (`429` and `5xx`).
460 ///
461 /// # Returns
462 /// `self` for chaining.
463 pub fn honor_retry_after(mut self, enabled: bool) -> Self {
464 self.retry_override = self.retry_override.with_honor_retry_after(enabled);
465 self
466 }
467
468 /// Consumes the builder into a frozen [`HttpRequest`].
469 ///
470 /// # Returns
471 /// Built [`HttpRequest`].
472 pub fn build(self) -> HttpRequest {
473 HttpRequest::new(self)
474 }
475}