Skip to main content

better_fetch/
client_builder.rs

1//! [`ClientBuilder`] — configure and build a [`Client`](crate::Client).
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use reqwest::Client as ReqwestClient;
7use tokio::sync::Semaphore;
8use url::Url;
9
10use crate::auth::Auth;
11use crate::backend::{HttpBackend, ReqwestBackend};
12use crate::client::{Client, ClientConfig};
13use crate::error::Error;
14use crate::hooks::Hooks;
15use crate::plugin::PluginRegistry;
16use crate::request::parse_request_header;
17use crate::retry::RetryPolicy;
18use crate::streaming::RETRY_BODY_PEEK_DEFAULT;
19use crate::Result;
20
21#[cfg(feature = "json")]
22use crate::json_parser::JsonParserFn;
23
24#[cfg(feature = "schema")]
25use crate::schema::SchemaRegistry;
26
27/// Builder for [`Client`].
28#[must_use = "call `.build()` to create a `Client`"]
29pub struct ClientBuilder {
30    base_url: Option<Url>,
31    timeout: Option<Duration>,
32    retry: Option<RetryPolicy>,
33    auth: Option<Auth>,
34    default_headers: http::HeaderMap,
35    hooks: Hooks,
36    plugins: PluginRegistry,
37    reqwest_client: Option<ReqwestClient>,
38    custom_backend: Option<Arc<dyn HttpBackend>>,
39    max_in_flight: Option<usize>,
40    /// Declared Tower [`ConcurrencyLimitLayer`](crate::tower::stack::ConcurrencyLimitLayer) limit for build-time diagnostics.
41    wire_concurrency_limit: Option<usize>,
42    max_response_bytes: Option<u64>,
43    retry_body_peek_bytes: Option<u64>,
44    #[cfg(feature = "schema")]
45    schema_registry: Option<Arc<SchemaRegistry>>,
46    #[cfg(feature = "json")]
47    json_parser: Option<JsonParserFn>,
48}
49
50impl ClientBuilder {
51    /// Creates an empty builder; [`Self::base_url`] is required before [`Self::build`].
52    pub fn new() -> Self {
53        Self {
54            base_url: None,
55            timeout: None,
56            retry: None,
57            auth: None,
58            default_headers: http::HeaderMap::new(),
59            hooks: Hooks::default(),
60            plugins: PluginRegistry::new(),
61            reqwest_client: None,
62            custom_backend: None,
63            max_in_flight: None,
64            wire_concurrency_limit: None,
65            max_response_bytes: None,
66            retry_body_peek_bytes: None,
67            #[cfg(feature = "schema")]
68            schema_registry: None,
69            #[cfg(feature = "json")]
70            json_parser: None,
71        }
72    }
73
74    /// Sets the base URL (required).
75    pub fn base_url(mut self, base_url: impl AsRef<str>) -> Result<Self> {
76        self.base_url = Some(Url::parse(base_url.as_ref()).map_err(Error::InvalidBaseUrl)?);
77        Ok(self)
78    }
79
80    /// Sets the default request timeout.
81    pub fn timeout(mut self, timeout: Duration) -> Self {
82        self.timeout = Some(timeout);
83        self
84    }
85
86    /// Sets the default [`RetryPolicy`] for all requests.
87    pub fn retry(mut self, policy: RetryPolicy) -> Self {
88        self.retry = Some(policy);
89        self
90    }
91
92    /// Sets default authentication for all requests.
93    pub fn auth(mut self, auth: Auth) -> Self {
94        self.auth = Some(auth);
95        self
96    }
97
98    /// Adds a default header applied to every request.
99    pub fn default_header(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
100        let (name, value) = parse_request_header(key, value)?;
101        self.default_headers.insert(name, value);
102        Ok(self)
103    }
104
105    /// Sets client-level lifecycle hooks.
106    pub fn hooks(mut self, hooks: Hooks) -> Self {
107        self.hooks = hooks;
108        self
109    }
110
111    /// Registers a [`Plugin`] on this client.
112    pub fn plugin<P: crate::plugin::Plugin + 'static>(mut self, plugin: P) -> Self {
113        self.plugins.push(Box::new(plugin));
114        self
115    }
116
117    /// Uses a custom reqwest client for the default [`ReqwestBackend`].
118    pub fn reqwest_client(mut self, client: ReqwestClient) -> Self {
119        self.reqwest_client = Some(client);
120        self
121    }
122
123    /// Use a custom HTTP backend (for testing or alternate transports).
124    ///
125    /// # Examples
126    ///
127    /// ```no_run
128    /// # use better_fetch::{ClientBuilder, Error, HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse, Result};
129    /// # use async_trait::async_trait;
130    /// # use bytes::Bytes;
131    /// # use http::StatusCode;
132    /// # use std::sync::Arc;
133    /// # struct MockBackend;
134    /// # #[async_trait]
135    /// # impl HttpBackend for MockBackend {
136    /// #     async fn execute(&self, _req: HttpRequest) -> Result<HttpResponse> {
137    /// #         Ok(HttpResponse {
138    /// #             status: StatusCode::OK,
139    /// #             headers: Default::default(),
140    /// #             body: Bytes::from_static(b"{}"),
141    /// #         })
142    /// #     }
143    /// #     async fn execute_stream(
144    /// #         &self,
145    /// #         _req: HttpRequest,
146    /// #     ) -> Result<HttpStreamingResponse> {
147    /// #         Err(Error::Other("streaming not supported".into()))
148    /// #     }
149    /// # }
150    /// # fn example() -> Result<()> {
151    /// let client = ClientBuilder::new()
152    ///     .base_url("https://api.example.com")?
153    ///     .backend(Arc::new(MockBackend))
154    ///     .build()?;
155    /// # Ok(())
156    /// # }
157    /// ```
158    pub fn backend(mut self, backend: Arc<dyn HttpBackend>) -> Self {
159        self.custom_backend = Some(backend);
160        warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
161        self
162    }
163
164    /// Declares the limit used by [`ConcurrencyLimitLayer`](crate::tower::stack::ConcurrencyLimitLayer)
165    /// on your transport stack so [`Self::build`] can warn when it matches [`Self::max_in_flight`].
166    ///
167    /// This does not configure Tower; it is only for diagnostics when stacking client and transport caps.
168    pub fn wire_concurrency_limit(mut self, limit: usize) -> Self {
169        self.wire_concurrency_limit = Some(limit);
170        warn_stacked_concurrency_limits(
171            self.max_in_flight,
172            self.custom_backend.is_some(),
173            self.wire_concurrency_limit,
174        );
175        self
176    }
177
178    /// Limits how many requests this client may have in flight at once (including retries).
179    ///
180    /// Implemented with a tokio semaphore in the core client. This counts the full request
181    /// lifecycle (hooks and retries), not just the transport hop. For wire-level limits only,
182    /// use [`Self::transport_stack`] with Tower's [`ConcurrencyLimitLayer`](crate::tower::stack::ConcurrencyLimitLayer)
183    /// (feature `tower`) instead of—or deliberately alongside—this setting.
184    pub fn max_in_flight(mut self, limit: usize) -> Self {
185        self.max_in_flight = Some(limit);
186        warn_stacked_concurrency_limits(
187            self.max_in_flight,
188            self.custom_backend.is_some(),
189            self.wire_concurrency_limit,
190        );
191        self
192    }
193
194    /// Maximum response body size (in bytes) for [`RequestBuilder::send`](crate::RequestBuilder::send),
195    /// [`send_json`](crate::RequestBuilder::send_json), and [`send_stream`](crate::RequestBuilder::send_stream)
196    /// when the request does not set its own limit.
197    pub fn max_response_bytes(mut self, limit: u64) -> Self {
198        self.max_response_bytes = Some(limit);
199        self
200    }
201
202    /// Maximum bytes read from a streaming body when a custom retry predicate is configured.
203    ///
204    /// Defaults to 64 KiB. Capped by [`Self::max_response_bytes`] when that is also set.
205    pub fn retry_body_peek_bytes(mut self, limit: u64) -> Self {
206        self.retry_body_peek_bytes = Some(limit);
207        self
208    }
209
210    /// Attach a [`SchemaRegistry`] for strict route validation (feature `schema`).
211    #[cfg(feature = "schema")]
212    pub fn schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
213        self.schema_registry = Some(registry);
214        self
215    }
216
217    /// Use a Tower [`Service`](tower::Service) as the HTTP transport for **buffered** `send()` only.
218    ///
219    /// `send_stream()` uses the default reqwest streaming transport without your Tower layers.
220    /// For middleware on both paths, use [`Self::transport_stack`].
221    #[cfg(feature = "tower")]
222    pub fn http_service<S>(mut self, service: S) -> Self
223    where
224        S: tower::Service<
225                crate::backend::HttpRequest,
226                Response = crate::backend::HttpResponse,
227                Error = Error,
228            > + Clone
229            + Send
230            + 'static,
231        S::Future: Send + 'static,
232    {
233        use crate::tower::ServiceBackend;
234
235        let client = self.reqwest_client.clone().unwrap_or_default();
236        self.custom_backend = Some(Arc::new(ServiceBackend::buffered_with_reqwest_streaming(
237            service, client,
238        )));
239        warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
240        self
241    }
242
243    /// Use a boxed Tower transport stack for **buffered** `send()` only (streaming uses plain reqwest).
244    ///
245    /// Prefer [`Self::transport_stack`] when `send_stream()` must see the same middleware.
246    #[cfg(feature = "tower")]
247    pub fn http_service_boxed(mut self, service: crate::tower::BoxHttpService) -> Self {
248        use crate::tower::ServiceBackend;
249
250        let client = self.reqwest_client.clone().unwrap_or_default();
251        self.custom_backend = Some(Arc::new(ServiceBackend::new(
252            service,
253            crate::tower::ReqwestStreamingHttpService::new(client),
254        )));
255        warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
256        self
257    }
258
259    /// Build a Tower transport stack on top of the configured (or default) reqwest client.
260    ///
261    /// Application hooks and [`RetryPolicy`](crate::RetryPolicy) remain in the core client;
262    /// only wire-level behavior is configured here.
263    ///
264    /// # Examples
265    ///
266    /// ```no_run
267    /// # use better_fetch::{ClientBuilder, Result};
268    /// # use better_fetch::tower::stack::{ConcurrencyLimitLayer, IntoBoxHttpService, IntoBoxStreamingHttpService, ServiceBuilder};
269    /// let client = ClientBuilder::new()
270    ///     .base_url("https://api.example.com")?
271    ///     .transport_stack(|buffered, streaming| {
272    ///         (
273    ///             ServiceBuilder::new()
274    ///                 .layer(ConcurrencyLimitLayer::new(32))
275    ///                 .service(buffered)
276    ///                 .into_box(),
277    ///             ServiceBuilder::new()
278    ///                 .layer(ConcurrencyLimitLayer::new(32))
279    ///                 .service(streaming)
280    ///                 .into_streaming_box(),
281    ///         )
282    ///     })
283    ///     .build()?;
284    /// # Ok::<(), better_fetch::Error>(())
285    /// ```
286    #[cfg(feature = "tower")]
287    pub fn transport_stack<F>(mut self, configure: F) -> Self
288    where
289        F: FnOnce(
290            crate::tower::ReqwestHttpService,
291            crate::tower::ReqwestStreamingHttpService,
292        ) -> (
293            crate::tower::BoxHttpService,
294            crate::tower::BoxStreamingHttpService,
295        ),
296    {
297        use crate::tower::ServiceBackend;
298
299        let client = self.reqwest_client.clone().unwrap_or_default();
300        let (buffered, streaming) = crate::tower::stack::build_dual(client, configure);
301        self.custom_backend = Some(Arc::new(ServiceBackend::from_boxes(buffered, streaming)));
302        warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
303        self
304    }
305
306    /// Sets a custom JSON parser for all responses from this client.
307    ///
308    /// See [`crate::json_parser`] for the two-step `Bytes` → `Value` → `T` pipeline vs the
309    /// default single-step fast path, and [`Response::into_json_with`](crate::response::Response::into_json_with)
310    /// for per-response `Bytes` → `T` without a global parser.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// # use better_fetch::{ClientBuilder, Result};
316    /// # use bytes::Bytes;
317    /// let client = ClientBuilder::new()
318    ///     .base_url("https://api.example.com")?
319    ///     .json_parser(|body: &Bytes| {
320    ///         let slice = body.strip_prefix(b"\xef\xbb\xbf").unwrap_or(body);
321    ///         serde_json::from_slice(slice).map_err(|e| e.to_string())
322    ///     })
323    ///     .build()?;
324    /// # Ok::<(), better_fetch::Error>(())
325    /// ```
326    #[cfg(feature = "json")]
327    pub fn json_parser<F>(mut self, f: F) -> Self
328    where
329        F: Fn(&bytes::Bytes) -> std::result::Result<serde_json::Value, String>
330            + Send
331            + Sync
332            + 'static,
333    {
334        self.json_parser = Some(crate::json_parser::json_parser(f));
335        self
336    }
337
338    /// Sets a custom JSON parser from an existing [`JsonParserFn`].
339    #[cfg(feature = "json")]
340    pub fn json_parser_fn(mut self, parser: JsonParserFn) -> Self {
341        self.json_parser = Some(parser);
342        self
343    }
344
345    /// Builds the [`Client`]. Requires [`Self::base_url`].
346    ///
347    /// # Examples
348    ///
349    /// ```no_run
350    /// # use better_fetch::{ClientBuilder, Result};
351    /// let client = ClientBuilder::new()
352    ///     .base_url("https://api.example.com")?
353    ///     .build()?;
354    /// # Ok::<(), better_fetch::Error>(())
355    /// ```
356    pub fn build(self) -> Result<Client> {
357        let base_url = self.base_url.ok_or(Error::MissingBaseUrl)?;
358
359        warn_stacked_concurrency_limits(
360            self.max_in_flight,
361            self.custom_backend.is_some(),
362            self.wire_concurrency_limit,
363        );
364
365        let backend: Arc<dyn HttpBackend> = if let Some(b) = self.custom_backend {
366            b
367        } else {
368            let reqwest_client = self.reqwest_client.unwrap_or_default();
369            Arc::new(ReqwestBackend::new(reqwest_client))
370        };
371
372        let plugins = Arc::new(self.plugins);
373        let merged_hooks = self.hooks.clone().merge(plugins.merged_hooks());
374
375        Ok(Client {
376            config: Arc::new(ClientConfig {
377                base_url,
378                timeout: self.timeout,
379                retry: self.retry,
380                auth: self.auth,
381                default_headers: self.default_headers,
382                hooks: self.hooks,
383                merged_hooks,
384                plugins,
385                max_in_flight: self.max_in_flight.map(|n| Arc::new(Semaphore::new(n))),
386                #[cfg(feature = "schema")]
387                schema_registry: self.schema_registry,
388                #[cfg(feature = "json")]
389                json_parser: self.json_parser,
390                max_response_bytes: self.max_response_bytes,
391                retry_body_peek_bytes: self
392                    .retry_body_peek_bytes
393                    .unwrap_or(RETRY_BODY_PEEK_DEFAULT),
394            }),
395            backend,
396        })
397    }
398}
399
400impl Default for ClientBuilder {
401    fn default() -> Self {
402        Self::new()
403    }
404}
405
406fn warn_stacked_concurrency_limits(
407    max_in_flight: Option<usize>,
408    has_custom_backend: bool,
409    wire_concurrency_limit: Option<usize>,
410) {
411    let Some(client_limit) = max_in_flight else {
412        return;
413    };
414    if !has_custom_backend {
415        return;
416    }
417    if wire_concurrency_limit == Some(client_limit) {
418        tracing::warn!(
419            client_max_in_flight = client_limit,
420            wire_concurrency_limit = client_limit,
421            "max_in_flight and Tower ConcurrencyLimitLayer use the same limit ({client_limit}); \
422             effective throughput is roughly halved because the client semaphore counts hooks and \
423             retries while Tower limits only transport hops. Use one cap or different values intentionally."
424        );
425    } else {
426        tracing::warn!(
427            client_max_in_flight = client_limit,
428            wire_concurrency_limit = ?wire_concurrency_limit,
429            "custom transport stack combined with max_in_flight: the client semaphore applies to the \
430             full request lifecycle (hooks and retries) while Tower ConcurrencyLimitLayer limits only \
431             transport concurrency. Call ClientBuilder::wire_concurrency_limit when your stack uses \
432             ConcurrencyLimitLayer so build() can detect matching numeric limits."
433        );
434    }
435}