better_fetch/client_builder.rs
1//! [`ClientBuilder`] — configure and build a [`Client`](crate::Client).
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use reqwest::Client as ReqwestClient;
7use tokio::sync::Semaphore;
8use url::Url;
9
10use crate::auth::Auth;
11use crate::backend::{HttpBackend, ReqwestBackend};
12use crate::client::{Client, ClientConfig};
13use crate::error::Error;
14use crate::hooks::Hooks;
15use crate::plugin::PluginRegistry;
16use crate::request::parse_request_header;
17use crate::retry::RetryPolicy;
18use crate::streaming::RETRY_BODY_PEEK_DEFAULT;
19use crate::Result;
20
21#[cfg(feature = "json")]
22use crate::json_parser::JsonParserFn;
23
24#[cfg(feature = "schema")]
25use crate::schema::SchemaRegistry;
26
27/// Builder for [`Client`].
28#[must_use = "call `.build()` to create a `Client`"]
29pub struct ClientBuilder {
30 base_url: Option<Url>,
31 timeout: Option<Duration>,
32 retry: Option<RetryPolicy>,
33 auth: Option<Auth>,
34 default_headers: http::HeaderMap,
35 hooks: Hooks,
36 plugins: PluginRegistry,
37 reqwest_client: Option<ReqwestClient>,
38 custom_backend: Option<Arc<dyn HttpBackend>>,
39 max_in_flight: Option<usize>,
40 max_response_bytes: Option<u64>,
41 retry_body_peek_bytes: Option<u64>,
42 #[cfg(feature = "schema")]
43 schema_registry: Option<Arc<SchemaRegistry>>,
44 #[cfg(feature = "json")]
45 json_parser: Option<JsonParserFn>,
46}
47
48impl ClientBuilder {
49 /// Creates an empty builder; [`Self::base_url`] is required before [`Self::build`].
50 pub fn new() -> Self {
51 Self {
52 base_url: None,
53 timeout: None,
54 retry: None,
55 auth: None,
56 default_headers: http::HeaderMap::new(),
57 hooks: Hooks::default(),
58 plugins: PluginRegistry::new(),
59 reqwest_client: None,
60 custom_backend: None,
61 max_in_flight: None,
62 max_response_bytes: None,
63 retry_body_peek_bytes: None,
64 #[cfg(feature = "schema")]
65 schema_registry: None,
66 #[cfg(feature = "json")]
67 json_parser: None,
68 }
69 }
70
71 /// Sets the base URL (required).
72 pub fn base_url(mut self, base_url: impl AsRef<str>) -> Result<Self> {
73 self.base_url = Some(Url::parse(base_url.as_ref()).map_err(Error::InvalidBaseUrl)?);
74 Ok(self)
75 }
76
77 /// Sets the default request timeout.
78 pub fn timeout(mut self, timeout: Duration) -> Self {
79 self.timeout = Some(timeout);
80 self
81 }
82
83 /// Sets the default [`RetryPolicy`] for all requests.
84 pub fn retry(mut self, policy: RetryPolicy) -> Self {
85 self.retry = Some(policy);
86 self
87 }
88
89 /// Sets default authentication for all requests.
90 pub fn auth(mut self, auth: Auth) -> Self {
91 self.auth = Some(auth);
92 self
93 }
94
95 /// Adds a default header applied to every request.
96 pub fn default_header(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
97 let (name, value) = parse_request_header(key, value)?;
98 self.default_headers.insert(name, value);
99 Ok(self)
100 }
101
102 /// Sets client-level lifecycle hooks.
103 pub fn hooks(mut self, hooks: Hooks) -> Self {
104 self.hooks = hooks;
105 self
106 }
107
108 /// Registers a [`Plugin`] on this client.
109 pub fn plugin<P: crate::plugin::Plugin + 'static>(mut self, plugin: P) -> Self {
110 self.plugins.push(Box::new(plugin));
111 self
112 }
113
114 /// Uses a custom reqwest client for the default [`ReqwestBackend`].
115 pub fn reqwest_client(mut self, client: ReqwestClient) -> Self {
116 self.reqwest_client = Some(client);
117 self
118 }
119
120 /// Use a custom HTTP backend (for testing or alternate transports).
121 ///
122 /// # Examples
123 ///
124 /// ```no_run
125 /// # use better_fetch::{ClientBuilder, Error, HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse, Result};
126 /// # use async_trait::async_trait;
127 /// # use bytes::Bytes;
128 /// # use http::StatusCode;
129 /// # use std::sync::Arc;
130 /// # struct MockBackend;
131 /// # #[async_trait]
132 /// # impl HttpBackend for MockBackend {
133 /// # async fn execute(&self, _req: HttpRequest) -> Result<HttpResponse> {
134 /// # Ok(HttpResponse {
135 /// # status: StatusCode::OK,
136 /// # headers: Default::default(),
137 /// # body: Bytes::from_static(b"{}"),
138 /// # })
139 /// # }
140 /// # async fn execute_stream(
141 /// # &self,
142 /// # _req: HttpRequest,
143 /// # ) -> Result<HttpStreamingResponse> {
144 /// # Err(Error::Other("streaming not supported".into()))
145 /// # }
146 /// # }
147 /// # fn example() -> Result<()> {
148 /// let client = ClientBuilder::new()
149 /// .base_url("https://api.example.com")?
150 /// .backend(Arc::new(MockBackend))
151 /// .build()?;
152 /// # Ok(())
153 /// # }
154 /// ```
155 pub fn backend(mut self, backend: Arc<dyn HttpBackend>) -> Self {
156 self.custom_backend = Some(backend);
157 self
158 }
159
160 /// Limits how many requests this client may have in flight at once (including retries).
161 ///
162 /// Implemented with a tokio semaphore in the core client. This counts the full request
163 /// lifecycle (hooks and retries), not just the transport hop. For wire-level limits only,
164 /// use [`Self::transport_stack`] with Tower's [`ConcurrencyLimitLayer`](crate::tower::stack::ConcurrencyLimitLayer)
165 /// (feature `tower`) instead of—or deliberately alongside—this setting.
166 pub fn max_in_flight(mut self, limit: usize) -> Self {
167 self.max_in_flight = Some(limit);
168 self
169 }
170
171 /// Maximum response body size (in bytes) for [`RequestBuilder::send_stream`](crate::RequestBuilder::send_stream)
172 /// when the request does not set its own limit.
173 pub fn max_response_bytes(mut self, limit: u64) -> Self {
174 self.max_response_bytes = Some(limit);
175 self
176 }
177
178 /// Maximum bytes read from a streaming body when a custom retry predicate is configured.
179 ///
180 /// Defaults to 64 KiB. Capped by [`Self::max_response_bytes`] when that is also set.
181 pub fn retry_body_peek_bytes(mut self, limit: u64) -> Self {
182 self.retry_body_peek_bytes = Some(limit);
183 self
184 }
185
186 /// Attach a [`SchemaRegistry`] for strict route validation (feature `schema`).
187 #[cfg(feature = "schema")]
188 pub fn schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
189 self.schema_registry = Some(registry);
190 self
191 }
192
193 /// Use a Tower [`Service`](tower::Service) as the HTTP transport for **buffered** `send()` only.
194 ///
195 /// `send_stream()` uses the default reqwest streaming transport without your Tower layers.
196 /// For middleware on both paths, use [`Self::transport_stack`].
197 #[cfg(feature = "tower")]
198 pub fn http_service<S>(mut self, service: S) -> Self
199 where
200 S: tower::Service<
201 crate::backend::HttpRequest,
202 Response = crate::backend::HttpResponse,
203 Error = Error,
204 > + Clone
205 + Send
206 + 'static,
207 S::Future: Send + 'static,
208 {
209 use crate::tower::ServiceBackend;
210
211 let client = self.reqwest_client.clone().unwrap_or_default();
212 self.custom_backend = Some(Arc::new(ServiceBackend::buffered_with_reqwest_streaming(
213 service, client,
214 )));
215 self
216 }
217
218 /// Use a boxed Tower transport stack for **buffered** `send()` only (streaming uses plain reqwest).
219 ///
220 /// Prefer [`Self::transport_stack`] when `send_stream()` must see the same middleware.
221 #[cfg(feature = "tower")]
222 pub fn http_service_boxed(mut self, service: crate::tower::BoxHttpService) -> Self {
223 use crate::tower::ServiceBackend;
224
225 let client = self.reqwest_client.clone().unwrap_or_default();
226 self.custom_backend = Some(Arc::new(ServiceBackend::new(
227 service,
228 crate::tower::ReqwestStreamingHttpService::new(client),
229 )));
230 self
231 }
232
233 /// Build a Tower transport stack on top of the configured (or default) reqwest client.
234 ///
235 /// Application hooks and [`RetryPolicy`](crate::RetryPolicy) remain in the core client;
236 /// only wire-level behavior is configured here.
237 ///
238 /// # Examples
239 ///
240 /// ```no_run
241 /// # use better_fetch::{ClientBuilder, Result};
242 /// # use better_fetch::tower::stack::{ConcurrencyLimitLayer, IntoBoxHttpService, IntoBoxStreamingHttpService, ServiceBuilder};
243 /// let client = ClientBuilder::new()
244 /// .base_url("https://api.example.com")?
245 /// .transport_stack(|buffered, streaming| {
246 /// (
247 /// ServiceBuilder::new()
248 /// .layer(ConcurrencyLimitLayer::new(32))
249 /// .service(buffered)
250 /// .into_box(),
251 /// ServiceBuilder::new()
252 /// .layer(ConcurrencyLimitLayer::new(32))
253 /// .service(streaming)
254 /// .into_streaming_box(),
255 /// )
256 /// })
257 /// .build()?;
258 /// # Ok::<(), better_fetch::Error>(())
259 /// ```
260 #[cfg(feature = "tower")]
261 pub fn transport_stack<F>(mut self, configure: F) -> Self
262 where
263 F: FnOnce(
264 crate::tower::ReqwestHttpService,
265 crate::tower::ReqwestStreamingHttpService,
266 ) -> (
267 crate::tower::BoxHttpService,
268 crate::tower::BoxStreamingHttpService,
269 ),
270 {
271 use crate::tower::ServiceBackend;
272
273 let client = self.reqwest_client.clone().unwrap_or_default();
274 let (buffered, streaming) = crate::tower::stack::build_dual(client, configure);
275 self.custom_backend = Some(Arc::new(ServiceBackend::from_boxes(buffered, streaming)));
276 self
277 }
278
279 /// Sets a custom JSON parser for all responses from this client.
280 ///
281 /// See [`crate::json_parser`] for the two-step `Bytes` → `Value` → `T` pipeline vs the
282 /// default single-step fast path, and [`Response::into_json_with`](crate::response::Response::into_json_with)
283 /// for per-response `Bytes` → `T` without a global parser.
284 ///
285 /// # Examples
286 ///
287 /// ```no_run
288 /// # use better_fetch::{ClientBuilder, Result};
289 /// # use bytes::Bytes;
290 /// let client = ClientBuilder::new()
291 /// .base_url("https://api.example.com")?
292 /// .json_parser(|body: &Bytes| {
293 /// let slice = body.strip_prefix(b"\xef\xbb\xbf").unwrap_or(body);
294 /// serde_json::from_slice(slice).map_err(|e| e.to_string())
295 /// })
296 /// .build()?;
297 /// # Ok::<(), better_fetch::Error>(())
298 /// ```
299 #[cfg(feature = "json")]
300 pub fn json_parser<F>(mut self, f: F) -> Self
301 where
302 F: Fn(&bytes::Bytes) -> std::result::Result<serde_json::Value, String>
303 + Send
304 + Sync
305 + 'static,
306 {
307 self.json_parser = Some(crate::json_parser::json_parser(f));
308 self
309 }
310
311 /// Sets a custom JSON parser from an existing [`JsonParserFn`].
312 #[cfg(feature = "json")]
313 pub fn json_parser_fn(mut self, parser: JsonParserFn) -> Self {
314 self.json_parser = Some(parser);
315 self
316 }
317
318 /// Builds the [`Client`]. Requires [`Self::base_url`].
319 ///
320 /// # Examples
321 ///
322 /// ```no_run
323 /// # use better_fetch::{ClientBuilder, Result};
324 /// let client = ClientBuilder::new()
325 /// .base_url("https://api.example.com")?
326 /// .build()?;
327 /// # Ok::<(), better_fetch::Error>(())
328 /// ```
329 pub fn build(self) -> Result<Client> {
330 let base_url = self.base_url.ok_or(Error::MissingBaseUrl)?;
331
332 let backend: Arc<dyn HttpBackend> = if let Some(b) = self.custom_backend {
333 b
334 } else {
335 let reqwest_client = self.reqwest_client.unwrap_or_default();
336 Arc::new(ReqwestBackend::new(reqwest_client))
337 };
338
339 let plugins = Arc::new(self.plugins);
340 let merged_hooks = self.hooks.clone().merge(plugins.merged_hooks());
341
342 Ok(Client {
343 config: Arc::new(ClientConfig {
344 base_url,
345 timeout: self.timeout,
346 retry: self.retry,
347 auth: self.auth,
348 default_headers: self.default_headers,
349 hooks: self.hooks,
350 merged_hooks,
351 plugins,
352 max_in_flight: self.max_in_flight.map(|n| Arc::new(Semaphore::new(n))),
353 #[cfg(feature = "schema")]
354 schema_registry: self.schema_registry,
355 #[cfg(feature = "json")]
356 json_parser: self.json_parser,
357 max_response_bytes: self.max_response_bytes,
358 retry_body_peek_bytes: self
359 .retry_body_peek_bytes
360 .unwrap_or(RETRY_BODY_PEEK_DEFAULT),
361 }),
362 backend,
363 })
364 }
365}
366
367impl Default for ClientBuilder {
368 fn default() -> Self {
369 Self::new()
370 }
371}