qubit_http/request/http_request_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`super::http_request::HttpRequest`].
11
12use std::time::Duration;
13use std::{future::Future, pin::Pin};
14
15use bytes::Bytes;
16use http::header::CONTENT_TYPE;
17use http::{HeaderMap, HeaderValue, Method};
18use serde::Serialize;
19use tokio_util::sync::CancellationToken;
20use url::form_urlencoded;
21use url::Url;
22
23use crate::{
24 AsyncHttpHeaderInjector, HttpClient, HttpError, HttpHeaderInjector, HttpRequestBodyByteStream,
25 HttpRequestStreamingBody, HttpResult, HttpRetryMethodPolicy,
26};
27
28use super::http_request::HttpRequest;
29use super::http_request_body::HttpRequestBody;
30use super::http_request_retry_override::HttpRequestRetryOverride;
31use super::parse_header;
32
33/// Builder for [`HttpRequest`](super::http_request::HttpRequest).
34#[derive(Debug, Clone)]
35pub struct HttpRequestBuilder {
36 /// HTTP method (e.g. GET, POST).
37 pub(super) method: Method,
38 /// Request path without the query string.
39 pub(super) path: String,
40 /// Query parameters as `(key, value)` pairs, appended to the URL when built.
41 pub(super) query: Vec<(String, String)>,
42 /// Request headers.
43 pub(super) headers: HeaderMap,
44 /// Request body; empty if not set.
45 pub(super) body: HttpRequestBody,
46 /// Deferred streaming upload body factory for per-attempt stream creation.
47 pub(super) streaming_body: Option<HttpRequestStreamingBody>,
48 /// Per-request timeout; if unset, the client default applies.
49 pub(super) request_timeout: Option<Duration>,
50 /// Per-request write timeout used by the send phase.
51 pub(super) write_timeout: Duration,
52 /// Per-request read timeout used by buffered/stream response reading.
53 pub(super) read_timeout: Duration,
54 /// Base URL copied from client options and used by [`HttpRequest::resolve_url`].
55 pub(super) base_url: Option<Url>,
56 /// Whether IPv6 literal hosts are rejected during URL resolution.
57 pub(super) ipv4_only: bool,
58 /// Optional cancellation token for this request.
59 pub(super) cancellation_token: Option<CancellationToken>,
60 /// Per-request retry override for one-off retry behavior customization.
61 pub(super) retry_override: HttpRequestRetryOverride,
62 /// Default headers snapshot from the originating client.
63 pub(super) default_headers: HeaderMap,
64 /// Sync header injectors snapshot from the originating client.
65 pub(super) injectors: Vec<HttpHeaderInjector>,
66 /// Async header injectors snapshot from the originating client.
67 pub(super) async_injectors: Vec<AsyncHttpHeaderInjector>,
68}
69
70impl HttpRequestBuilder {
71 /// Starts a builder with method/path and copies supported defaults from client options.
72 ///
73 /// # Parameters
74 /// - `method`: HTTP verb.
75 /// - `path`: URL or relative path string.
76 /// - `client`: Source client whose relevant defaults are copied into this builder.
77 ///
78 /// # Returns
79 /// New [`HttpRequestBuilder`].
80 pub(crate) fn new(method: Method, path: &str, client: &HttpClient) -> Self {
81 let options = client.options();
82 Self {
83 method,
84 path: path.to_string(),
85 query: Vec::new(),
86 headers: HeaderMap::new(),
87 body: HttpRequestBody::Empty,
88 streaming_body: None,
89 request_timeout: options.timeouts.request_timeout,
90 write_timeout: options.timeouts.write_timeout,
91 read_timeout: options.timeouts.read_timeout,
92 base_url: options.base_url.clone(),
93 ipv4_only: options.ipv4_only,
94 cancellation_token: None,
95 retry_override: HttpRequestRetryOverride::default(),
96 default_headers: client.headers_snapshot(),
97 injectors: client.injectors_snapshot(),
98 async_injectors: client.async_injectors_snapshot(),
99 }
100 }
101
102 /// Appends a single `key=value` query pair (order preserved).
103 ///
104 /// # Parameters
105 /// - `key`: Query parameter name.
106 /// - `value`: Query parameter value.
107 ///
108 /// # Returns
109 /// `self` for chaining.
110 pub fn query_param(mut self, key: &str, value: &str) -> Self {
111 self.query.push((key.to_string(), value.to_string()));
112 self
113 }
114
115 /// Appends many query pairs via [`HttpRequestBuilder::query_param`].
116 ///
117 /// # Parameters
118 /// - `params`: Iterator of `(key, value)` pairs.
119 ///
120 /// # Returns
121 /// `self` for chaining.
122 pub fn query_params<'a, I>(mut self, params: I) -> Self
123 where
124 I: IntoIterator<Item = (&'a str, &'a str)>,
125 {
126 for (key, value) in params {
127 self = self.query_param(key, value);
128 }
129 self
130 }
131
132 /// Validates and inserts one header.
133 ///
134 /// # Parameters
135 /// - `name`: Header name (must be valid [`http::header::HeaderName`] bytes).
136 /// - `value`: Header value (must be valid [`http::header::HeaderValue`]).
137 ///
138 /// # Returns
139 /// `Ok(self)` or [`HttpError`] if name/value are invalid.
140 pub fn header(mut self, name: &str, value: &str) -> HttpResult<Self> {
141 let (header_name, header_value) = parse_header(name, value)?;
142 self.headers.insert(header_name, header_value);
143 Ok(self)
144 }
145
146 /// Merges all entries from `headers` into this builder (existing names may get extra values).
147 ///
148 /// # Parameters
149 /// - `headers`: Map to append.
150 ///
151 /// # Returns
152 /// `self` for chaining.
153 pub fn headers(mut self, headers: HeaderMap) -> Self {
154 self.headers.extend(headers);
155 self
156 }
157
158 /// Sets the body to raw bytes without changing `Content-Type` unless already set elsewhere.
159 ///
160 /// # Parameters
161 /// - `body`: Payload.
162 ///
163 /// # Returns
164 /// `self` for chaining.
165 pub fn bytes_body(mut self, body: impl Into<Bytes>) -> Self {
166 self.body = HttpRequestBody::Bytes(body.into());
167 self.streaming_body = None;
168 self
169 }
170
171 /// Sets the body to an ordered chunk stream for incremental upload.
172 ///
173 /// # Parameters
174 /// - `chunks`: Iterator of chunks in send order.
175 ///
176 /// # Returns
177 /// `self` for chaining.
178 pub fn stream_body<I, B>(mut self, chunks: I) -> Self
179 where
180 I: IntoIterator<Item = B>,
181 B: Into<Bytes>,
182 {
183 self.body = HttpRequestBody::Stream(chunks.into_iter().map(Into::into).collect());
184 self.streaming_body = None;
185 self
186 }
187
188 /// Sets a deferred streaming upload body factory.
189 ///
190 /// The factory runs once per send attempt and returns a fresh async byte
191 /// stream, which allows retries to rebuild the outbound stream body.
192 ///
193 /// # Parameters
194 /// - `factory`: Async stream factory for per-attempt body generation.
195 ///
196 /// # Returns
197 /// `self` for chaining.
198 pub fn streaming_body<F>(mut self, factory: F) -> Self
199 where
200 F: Fn() -> Pin<Box<dyn Future<Output = HttpRequestBodyByteStream> + Send + 'static>>
201 + Send
202 + Sync
203 + 'static,
204 {
205 self.streaming_body = Some(HttpRequestStreamingBody::new(factory));
206 self.body = HttpRequestBody::Empty;
207 self
208 }
209
210 /// Sets a UTF-8 text body and adds `text/plain; charset=utf-8` if `Content-Type` is absent.
211 ///
212 /// # Parameters
213 /// - `body`: Text payload.
214 ///
215 /// # Returns
216 /// `self` for chaining.
217 pub fn text_body(mut self, body: impl Into<String>) -> Self {
218 if !self.headers.contains_key(CONTENT_TYPE) {
219 self.headers.insert(
220 CONTENT_TYPE,
221 HeaderValue::from_static("text/plain; charset=utf-8"),
222 );
223 }
224 self.body = HttpRequestBody::Text(body.into());
225 self.streaming_body = None;
226 self
227 }
228
229 /// Serializes `value` to JSON, sets body to those bytes, and adds `application/json` if needed.
230 ///
231 /// # Parameters
232 /// - `value`: Serializable value.
233 ///
234 /// # Returns
235 /// `Ok(self)` or [`HttpError`] if JSON encoding fails.
236 pub fn json_body<T>(mut self, value: &T) -> HttpResult<Self>
237 where
238 T: Serialize,
239 {
240 let bytes = serde_json::to_vec(value)
241 .map_err(|error| HttpError::decode(format!("Failed to encode JSON body: {}", error)))?;
242 if !self.headers.contains_key(CONTENT_TYPE) {
243 self.headers
244 .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
245 }
246 self.body = HttpRequestBody::Json(Bytes::from(bytes));
247 self.streaming_body = None;
248 Ok(self)
249 }
250
251 /// Serializes key-value pairs as `application/x-www-form-urlencoded`.
252 ///
253 /// # Parameters
254 /// - `fields`: Iterable of `(key, value)` string pairs.
255 ///
256 /// # Returns
257 /// `self` for chaining.
258 pub fn form_body<'a, I>(mut self, fields: I) -> Self
259 where
260 I: IntoIterator<Item = (&'a str, &'a str)>,
261 {
262 let mut serializer = form_urlencoded::Serializer::new(String::new());
263 for (key, value) in fields {
264 serializer.append_pair(key, value);
265 }
266 let body = serializer.finish();
267 if !self.headers.contains_key(CONTENT_TYPE) {
268 self.headers.insert(
269 CONTENT_TYPE,
270 HeaderValue::from_static("application/x-www-form-urlencoded"),
271 );
272 }
273 self.body = HttpRequestBody::Form(Bytes::from(body));
274 self.streaming_body = None;
275 self
276 }
277
278 /// Sets multipart body bytes and optional auto content-type by boundary.
279 ///
280 /// # Parameters
281 /// - `body`: Multipart payload bytes.
282 /// - `boundary`: Multipart boundary used in payload framing.
283 ///
284 /// # Returns
285 /// `Ok(self)` for chaining.
286 ///
287 /// # Errors
288 /// Returns [`HttpError`] when `boundary` is empty or content-type cannot be built.
289 pub fn multipart_body(mut self, body: impl Into<Bytes>, boundary: &str) -> HttpResult<Self> {
290 if boundary.trim().is_empty() {
291 return Err(HttpError::other(
292 "Multipart boundary cannot be empty for multipart_body",
293 ));
294 }
295 if !self.headers.contains_key(CONTENT_TYPE) {
296 let value = HeaderValue::from_str(&format!("multipart/form-data; boundary={boundary}"))
297 .map_err(|error| {
298 HttpError::other(format!(
299 "Invalid multipart Content-Type header value: {error}"
300 ))
301 })?;
302 self.headers.insert(CONTENT_TYPE, value);
303 }
304 self.body = HttpRequestBody::Multipart(body.into());
305 self.streaming_body = None;
306 Ok(self)
307 }
308
309 /// Serializes records as NDJSON (`one JSON object per line`).
310 ///
311 /// # Parameters
312 /// - `records`: Serializable records to encode as NDJSON lines.
313 ///
314 /// # Returns
315 /// `Ok(self)` for chaining.
316 ///
317 /// # Errors
318 /// Returns [`HttpError`] when any record fails JSON serialization.
319 pub fn ndjson_body<T>(mut self, records: &[T]) -> HttpResult<Self>
320 where
321 T: Serialize,
322 {
323 let mut payload = String::new();
324 for record in records {
325 let line = serde_json::to_string(record).map_err(|error| {
326 HttpError::decode(format!("Failed to encode NDJSON record: {error}"))
327 })?;
328 payload.push_str(&line);
329 payload.push('\n');
330 }
331 if !self.headers.contains_key(CONTENT_TYPE) {
332 self.headers.insert(
333 CONTENT_TYPE,
334 HeaderValue::from_static("application/x-ndjson"),
335 );
336 }
337 self.body = HttpRequestBody::Ndjson(Bytes::from(payload));
338 self.streaming_body = None;
339 Ok(self)
340 }
341
342 /// Overrides the client-wide request timeout for this request only.
343 ///
344 /// This sets reqwest's per-request [`reqwest::RequestBuilder::timeout`], i.e. a
345 /// whole-request deadline for that HTTP call (see reqwest docs for exact semantics).
346 ///
347 /// # Parameters
348 /// - `timeout`: Maximum time for the whole request (reqwest `timeout`).
349 ///
350 /// # Returns
351 /// `self` for chaining.
352 pub fn request_timeout(mut self, timeout: Duration) -> Self {
353 self.request_timeout = Some(timeout);
354 self
355 }
356
357 /// Overrides the write-phase timeout for this request only.
358 ///
359 /// # Parameters
360 /// - `timeout`: Maximum time allowed for sending the request bytes.
361 ///
362 /// # Returns
363 /// `self` for chaining.
364 pub fn write_timeout(mut self, timeout: Duration) -> Self {
365 self.write_timeout = timeout;
366 self
367 }
368
369 /// Overrides the read-phase timeout for this request only.
370 ///
371 /// # Parameters
372 /// - `timeout`: Maximum time allowed for one read wait on response body.
373 ///
374 /// # Returns
375 /// `self` for chaining.
376 pub fn read_timeout(mut self, timeout: Duration) -> Self {
377 self.read_timeout = timeout;
378 self
379 }
380
381 /// Overrides the client default base URL for this request.
382 ///
383 /// # Parameters
384 /// - `base_url`: Base URL used when resolving relative request paths.
385 ///
386 /// # Returns
387 /// `self` for chaining.
388 pub fn base_url(mut self, base_url: Url) -> Self {
389 self.base_url = Some(base_url);
390 self
391 }
392
393 /// Clears the base URL for this request.
394 ///
395 /// # Returns
396 /// `self` for chaining.
397 pub fn clear_base_url(mut self) -> Self {
398 self.base_url = None;
399 self
400 }
401
402 /// Overrides whether this request enforces IPv4-only literal-host validation.
403 ///
404 /// # Parameters
405 /// - `enabled`: `true` to reject IPv6 literal hosts, `false` to allow them.
406 ///
407 /// # Returns
408 /// `self` for chaining.
409 pub fn ipv4_only(mut self, enabled: bool) -> Self {
410 self.ipv4_only = enabled;
411 self
412 }
413
414 /// Binds a [`CancellationToken`] to this request.
415 ///
416 /// # Parameters
417 /// - `token`: Cancellation token checked before send and during request/stream I/O.
418 ///
419 /// # Returns
420 /// `self` for chaining.
421 pub fn cancellation_token(mut self, token: CancellationToken) -> Self {
422 self.cancellation_token = Some(token);
423 self
424 }
425
426 /// Forces retry enabled for this request even if client-level retry is disabled.
427 ///
428 /// # Returns
429 /// `self` for chaining.
430 pub fn force_retry(mut self) -> Self {
431 self.retry_override = self.retry_override.force_enable();
432 self
433 }
434
435 /// Disables retry for this request even if client-level retry is enabled.
436 ///
437 /// # Returns
438 /// `self` for chaining.
439 pub fn disable_retry(mut self) -> Self {
440 self.retry_override = self.retry_override.force_disable();
441 self
442 }
443
444 /// Overrides retryable-method policy for this request.
445 ///
446 /// # Parameters
447 /// - `policy`: Method policy to apply on this request only.
448 ///
449 /// # Returns
450 /// `self` for chaining.
451 pub fn retry_method_policy(mut self, policy: HttpRetryMethodPolicy) -> Self {
452 self.retry_override = self.retry_override.with_method_policy(policy);
453 self
454 }
455
456 /// Enables or disables honoring `Retry-After` for this request.
457 ///
458 /// # Parameters
459 /// - `enabled`: `true` to honor `Retry-After` on retryable status
460 /// responses (`429` and `5xx`).
461 ///
462 /// # Returns
463 /// `self` for chaining.
464 pub fn honor_retry_after(mut self, enabled: bool) -> Self {
465 self.retry_override = self.retry_override.with_honor_retry_after(enabled);
466 self
467 }
468
469 /// Consumes the builder into a frozen [`HttpRequest`].
470 ///
471 /// # Returns
472 /// Built [`HttpRequest`].
473 pub fn build(self) -> HttpRequest {
474 HttpRequest::new(self)
475 }
476}