Skip to main content

better_fetch/
client.rs

1//! HTTP client, builder, and shared configuration.
2//!
3//! Start with [`Client::new`] or [`ClientBuilder`], then:
4//!
5//! - [`Client::get`] / [`Client::post`] — flexible [`RequestBuilder`] (string paths, `.param("id", 1)`).
6//! - [`Client::call`] — typed [`Endpoint`] routes ([`.params()`](EndpointRequestBuilder::params) with structs).
7//!
8//! See [`crate::request`] for per-request options on [`RequestBuilder`].
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use indexmap::IndexMap;
15use tokio::sync::Semaphore;
16
17use http::Method;
18use reqwest::Client as ReqwestClient;
19use url::Url;
20
21use crate::auth::Auth;
22use crate::backend::{HttpBackend, HttpBody, HttpRequest, ReqwestBackend};
23use crate::cancel::execute_or_cancel;
24use crate::endpoint::{Endpoint, EndpointParams, EndpointParamsInitial, EndpointRequestBuilder};
25use crate::error::Error;
26use crate::hooks::{
27    ErrorContext, Hooks, RequestContext, ResponseContext, StreamingResponseContext,
28    StreamingSuccessContext, SuccessContext,
29};
30use crate::plugin::{PluginRegistry, PreparedRequest};
31use crate::request::RequestBuilder;
32use crate::response::Response;
33use crate::retry::{sleep_or_cancel, RetryPolicy};
34use crate::streaming::{
35    body_stream_from_bytes, drain_body_for_retry, wrap_cancellation, wrap_max_bytes,
36    StreamingResponse, RETRY_BODY_PEEK_DEFAULT,
37};
38use crate::url_build::build_url;
39use crate::Result;
40
41#[cfg(feature = "tower")]
42use crate::backend::HttpResponse;
43
44#[cfg(feature = "json")]
45use crate::json_parser::JsonParserFn;
46
47#[cfg(feature = "schema")]
48use crate::schema::SchemaRegistry;
49
50fn body_for_context(body: &HttpBody) -> Option<bytes::Bytes> {
51    match body {
52        HttpBody::Empty => None,
53        HttpBody::Bytes(b) => Some(b.clone()),
54    }
55}
56
57/// Shared client configuration (returned by [`Client::config`]).
58#[derive(Clone)]
59pub struct ClientConfig {
60    /// Base URL joined with request paths.
61    pub base_url: Url,
62    /// Default per-request timeout when the builder does not override it.
63    pub timeout: Option<Duration>,
64    /// Default retry policy for requests that do not set their own.
65    pub retry: Option<RetryPolicy>,
66    /// Default authentication applied when a request has no per-request auth.
67    pub auth: Option<Auth>,
68    /// Headers merged into every request unless overridden.
69    pub default_headers: http::HeaderMap,
70    /// Client-level lifecycle hooks (merged with plugin hooks at build time).
71    pub hooks: Hooks,
72    pub(crate) merged_hooks: Hooks,
73    /// Registered plugins (init hooks + merged hook chains).
74    pub plugins: Arc<PluginRegistry>,
75    /// Limits concurrent in-flight requests for this client (including retries).
76    ///
77    /// This is separate from Tower's [`ConcurrencyLimitLayer`](crate::tower::stack::ConcurrencyLimitLayer):
78    /// the client semaphore applies to the full request lifecycle (hooks + retries), while Tower
79    /// limits only transport-layer concurrency. Avoid stacking both without accounting for that.
80    pub max_in_flight: Option<Arc<Semaphore>>,
81    #[cfg(feature = "schema")]
82    /// Optional strict route registry (feature `schema`).
83    pub schema_registry: Option<Arc<SchemaRegistry>>,
84    #[cfg(feature = "json")]
85    /// Client-wide custom JSON parser (feature `json`).
86    pub json_parser: Option<JsonParserFn>,
87    /// Default maximum response body size for [`RequestBuilder::send_stream`](crate::RequestBuilder::send_stream).
88    pub max_response_bytes: Option<u64>,
89    /// Maximum bytes read from a streaming body when evaluating a custom retry predicate.
90    pub retry_body_peek_bytes: u64,
91}
92
93/// Typed HTTP client built on reqwest.
94#[derive(Clone)]
95pub struct Client {
96    config: Arc<ClientConfig>,
97    backend: Arc<dyn HttpBackend>,
98}
99
100impl Client {
101    /// Creates a client with default reqwest settings and the given base URL.
102    ///
103    /// # Examples
104    ///
105    /// ```no_run
106    /// # use better_fetch::{Client, Result};
107    /// # #[tokio::main]
108    /// # async fn main() -> Result<()> {
109    /// let client = Client::new("https://api.example.com")?;
110    /// let _ = client.get("/health").send().await?;
111    /// # Ok(())
112    /// # }
113    /// ```
114    pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
115        ClientBuilder::new().base_url(base_url)?.build()
116    }
117
118    /// Returns a [`ClientBuilder`] for advanced configuration.
119    pub fn builder() -> ClientBuilder {
120        ClientBuilder::new()
121    }
122
123    /// Builds a client with a custom reqwest instance. [`ClientBuilder::base_url`] is required.
124    pub fn with_http_client(
125        reqwest_client: ReqwestClient,
126        base_url: impl AsRef<str>,
127    ) -> Result<Self> {
128        ClientBuilder::new()
129            .reqwest_client(reqwest_client)
130            .base_url(base_url)?
131            .build()
132    }
133
134    /// Starts a typed request for [`Endpoint`] `E`.
135    ///
136    /// When `E::Params` is not unit, returns a builder in [`NeedsParams`](crate::NeedsParams) state
137    /// that requires [`.params()`](EndpointRequestBuilder::params) before
138    /// [`.send_json()`](EndpointRequestBuilder::send_json).
139    ///
140    /// For ad-hoc requests with string paths, use [`Self::get`] / [`Self::post`] instead.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// # use better_fetch::{Client, Endpoint, Result, define_params};
146    /// # use http::Method;
147    /// # use serde::Deserialize;
148    /// define_params!(GetTodoParams for "/todos/:id" { id: u64 });
149    ///
150    /// struct GetTodo;
151    /// impl Endpoint for GetTodo {
152    ///     const METHOD: http::Method = http::Method::GET;
153    ///     const PATH: &'static str = "/todos/:id";
154    ///     type Response = Todo;
155    ///     type Params = GetTodoParams;
156    ///     type Query = ();
157    /// }
158    ///
159    /// # #[derive(Deserialize)]
160    /// # struct Todo { id: u64, title: String }
161    /// # #[tokio::main]
162    /// # async fn main() -> Result<()> {
163    /// let client = Client::new("https://api.example.com")?;
164    /// let todo = client
165    ///     .call::<GetTodo>()
166    ///     .params(GetTodoParams { id: 1 })
167    ///     .send_json()
168    ///     .await?;
169    /// # Ok(())
170    /// # }
171    /// ```
172    pub fn call<E: Endpoint>(
173        &self,
174    ) -> EndpointRequestBuilder<'_, E, <E::Params as EndpointParams>::BuilderState>
175    where
176        E::Params: EndpointParamsInitial<E>,
177    {
178        E::Params::initial(self)
179    }
180
181    /// Returns a snapshot of this client's configuration.
182    pub fn config(&self) -> &ClientConfig {
183        &self.config
184    }
185
186    /// Starts a `GET` request for `path` (supports `:param` templates).
187    pub fn get(&self, path: impl Into<String>) -> RequestBuilder<'_> {
188        self.request(Method::GET, path)
189    }
190
191    /// Starts a `POST` request for `path`.
192    pub fn post(&self, path: impl Into<String>) -> RequestBuilder<'_> {
193        self.request(Method::POST, path)
194    }
195
196    /// Starts a `PUT` request for `path`.
197    pub fn put(&self, path: impl Into<String>) -> RequestBuilder<'_> {
198        self.request(Method::PUT, path)
199    }
200
201    /// Starts a `PATCH` request for `path`.
202    pub fn patch(&self, path: impl Into<String>) -> RequestBuilder<'_> {
203        self.request(Method::PATCH, path)
204    }
205
206    /// Starts a `DELETE` request for `path`.
207    pub fn delete(&self, path: impl Into<String>) -> RequestBuilder<'_> {
208        self.request(Method::DELETE, path)
209    }
210
211    /// Starts a `HEAD` request for `path`.
212    pub fn head(&self, path: impl Into<String>) -> RequestBuilder<'_> {
213        self.request(Method::HEAD, path)
214    }
215
216    /// Starts a request with an explicit HTTP method and path.
217    pub fn request(&self, method: Method, path: impl Into<String>) -> RequestBuilder<'_> {
218        RequestBuilder {
219            client: self,
220            method,
221            path: path.into(),
222            params: HashMap::new(),
223            query: IndexMap::new(),
224            headers: self.config.default_headers.clone(),
225            body: HttpBody::Empty,
226            #[cfg(feature = "multipart")]
227            multipart: None,
228            timeout: self.config.timeout,
229            retry: self.config.retry.clone(),
230            auth: self.config.auth.clone(),
231            cancellation: None,
232            throw_on_error: false,
233            #[cfg(feature = "json")]
234            json_parser: None,
235            #[cfg(feature = "validate")]
236            validate_response: true,
237            max_response_bytes: None,
238            retry_body_peek_bytes: None,
239        }
240    }
241
242    pub(crate) async fn execute_stream(
243        &self,
244        builder: RequestBuilder<'_>,
245    ) -> Result<StreamingResponse> {
246        #[cfg(feature = "json")]
247        let json_parser = builder
248            .json_parser
249            .clone()
250            .or_else(|| self.config.json_parser.clone());
251
252        let built = build_url(
253            &self.config.base_url,
254            &builder.path,
255            &builder.params,
256            &builder.query,
257        )?;
258
259        let mut method = builder.method;
260        if let Some(override_method) = built.method_override {
261            method = override_method;
262        }
263
264        #[cfg(feature = "schema")]
265        if let Some(registry) = &self.config.schema_registry {
266            registry.ensure_route(&builder.path, &method)?;
267        }
268
269        let mut url = built.url;
270        let mut headers = builder.headers;
271        let auth = builder.auth.or_else(|| self.config.auth.clone());
272        if let Some(auth) = auth {
273            auth.apply(&mut headers).await?;
274        }
275
276        let mut prepared = PreparedRequest {
277            url: url.clone(),
278            path: builder.path.clone(),
279            method: method.clone(),
280            headers: headers.clone(),
281        };
282        self.config.plugins.run_init_all(&mut prepared).await?;
283        url = prepared.url;
284        headers = prepared.headers;
285        method = prepared.method;
286
287        let mut req_ctx = RequestContext {
288            url: url.clone(),
289            method: method.clone(),
290            headers: headers.clone(),
291            body: body_for_context(&builder.body),
292            retry_attempt: 0,
293        };
294
295        let merged_hooks = &self.config.merged_hooks;
296        req_ctx = merged_hooks.run_on_request(req_ctx).await?;
297        url = req_ctx.url.clone();
298        headers = req_ctx.headers.clone();
299        method = req_ctx.method.clone();
300
301        let timeout = builder.timeout;
302        let retry_policy = builder.retry.or_else(|| self.config.retry.clone());
303        let throw_on_error = builder.throw_on_error;
304        let cancel = builder.cancellation;
305        let max_response_bytes = builder
306            .max_response_bytes
307            .or(self.config.max_response_bytes);
308        let retry_body_peek_bytes = builder
309            .retry_body_peek_bytes
310            .unwrap_or(self.config.retry_body_peek_bytes);
311
312        let backend = self.backend.clone();
313
314        let _in_flight_permit = match &self.config.max_in_flight {
315            Some(sem) => Some(
316                sem.acquire()
317                    .await
318                    .map_err(|_| Error::Other("max_in_flight semaphore closed".into()))?,
319            ),
320            None => None,
321        };
322
323        let mut attempt = 0u32;
324        let max_attempts = retry_policy.as_ref().map(|p| p.max_attempts()).unwrap_or(0);
325
326        let request_body = builder.body;
327        #[cfg(feature = "multipart")]
328        let mut multipart_body = builder.multipart;
329        #[cfg(feature = "multipart")]
330        let had_multipart = multipart_body.is_some();
331
332        let cancel_ref = cancel.as_ref();
333
334        loop {
335            req_ctx.retry_attempt = attempt;
336
337            #[cfg(feature = "multipart")]
338            if attempt > 0 && had_multipart {
339                return Err(Error::Other(
340                    "automatic retry is not supported with multipart request bodies".into(),
341                ));
342            }
343
344            let http_req = HttpRequest {
345                method: method.clone(),
346                url: url.clone(),
347                headers: headers.clone(),
348                body: request_body.clone(),
349                timeout,
350                cancellation: cancel.clone(),
351                #[cfg(feature = "multipart")]
352                multipart: multipart_body.take(),
353            };
354            let request_url = http_req.url.clone();
355
356            let result = execute_or_cancel(cancel_ref, backend.execute_stream(http_req)).await;
357
358            match result {
359                Ok(http_res) => {
360                    let status = http_res.status;
361                    let headers = http_res.headers.clone();
362                    let peek_limit = max_response_bytes
363                        .map(|m| m.min(retry_body_peek_bytes))
364                        .unwrap_or(retry_body_peek_bytes);
365
366                    let mut body = http_res.body;
367                    if let Some(policy) = retry_policy.as_ref() {
368                        if policy.has_custom_should_retry() {
369                            let peeked = drain_body_for_retry(body, peek_limit).await?;
370                            let stub = Response::new(
371                                status,
372                                headers.clone(),
373                                peeked.clone(),
374                                Some(request_url.clone()),
375                                #[cfg(feature = "json")]
376                                None,
377                            );
378                            if policy.should_retry_response(&stub, false) && attempt < max_attempts
379                            {
380                                let stub = Response::new(
381                                    status,
382                                    headers.clone(),
383                                    bytes::Bytes::new(),
384                                    Some(request_url.clone()),
385                                    #[cfg(feature = "json")]
386                                    None,
387                                );
388                                merged_hooks
389                                    .run_on_retry(ResponseContext {
390                                        request: req_ctx.clone(),
391                                        response: stub,
392                                    })
393                                    .await;
394                                let delay = policy.delay_after_response(attempt, &headers);
395                                attempt += 1;
396                                sleep_or_cancel(delay, cancel_ref).await?;
397                                continue;
398                            }
399                            body = body_stream_from_bytes(peeked);
400                        } else {
401                            let stub = Response::new(
402                                status,
403                                headers.clone(),
404                                bytes::Bytes::new(),
405                                Some(request_url.clone()),
406                                #[cfg(feature = "json")]
407                                None,
408                            );
409                            if policy.should_retry_response(&stub, false) && attempt < max_attempts
410                            {
411                                let stub = Response::new(
412                                    status,
413                                    headers.clone(),
414                                    bytes::Bytes::new(),
415                                    Some(request_url.clone()),
416                                    #[cfg(feature = "json")]
417                                    None,
418                                );
419                                merged_hooks
420                                    .run_on_retry(ResponseContext {
421                                        request: req_ctx.clone(),
422                                        response: stub,
423                                    })
424                                    .await;
425                                let delay = policy.delay_after_response(attempt, &headers);
426                                attempt += 1;
427                                sleep_or_cancel(delay, cancel_ref).await?;
428                                continue;
429                            }
430                        }
431                    }
432
433                    let meta = merged_hooks
434                        .run_on_response_stream(StreamingResponseContext {
435                            request: req_ctx.clone(),
436                            status,
437                            headers,
438                        })
439                        .await?;
440                    let status = meta.status;
441                    let stream_headers = meta.headers;
442
443                    if throw_on_error && !status.is_success() {
444                        let http_err = Error::http_with_status_text(
445                            status,
446                            status.canonical_reason().unwrap_or("request failed"),
447                            status.canonical_reason().unwrap_or("request failed"),
448                            None,
449                        );
450                        merged_hooks
451                            .run_on_error(ErrorContext {
452                                request: req_ctx.clone(),
453                                response: None,
454                                error: http_err.clone(),
455                            })
456                            .await;
457                        return Err(http_err);
458                    }
459
460                    if let Some(limit) = max_response_bytes {
461                        body = wrap_max_bytes(body, limit);
462                    }
463                    if let Some(token) = cancel.clone() {
464                        body = wrap_cancellation(body, token);
465                    }
466
467                    if status.is_success() {
468                        merged_hooks
469                            .run_on_success_stream(StreamingSuccessContext {
470                                request: req_ctx.clone(),
471                                status,
472                                headers: stream_headers.clone(),
473                            })
474                            .await;
475                    }
476
477                    return Ok(StreamingResponse::new(
478                        status,
479                        stream_headers,
480                        body,
481                        Some(request_url),
482                        #[cfg(feature = "json")]
483                        json_parser,
484                    ));
485                }
486                Err(err) => {
487                    if err.is_cancelled() {
488                        merged_hooks
489                            .run_on_error(ErrorContext {
490                                request: req_ctx.clone(),
491                                response: None,
492                                error: err.clone(),
493                            })
494                            .await;
495                        return Err(err);
496                    }
497
498                    let retry_transport = matches!(&err, Error::Transport { .. } | Error::Timeout);
499                    if retry_transport && retry_policy.is_some() && attempt < max_attempts {
500                        merged_hooks
501                            .run_on_retry(ResponseContext {
502                                request: req_ctx.clone(),
503                                response: Response::new(
504                                    http::StatusCode::SERVICE_UNAVAILABLE,
505                                    http::HeaderMap::new(),
506                                    bytes::Bytes::new(),
507                                    Some(request_url.clone()),
508                                    #[cfg(feature = "json")]
509                                    None,
510                                ),
511                            })
512                            .await;
513                        let delay = retry_policy
514                            .as_ref()
515                            .map(|p| p.delay_after_response(attempt, &http::HeaderMap::new()))
516                            .unwrap_or(Duration::from_secs(1));
517                        attempt += 1;
518                        sleep_or_cancel(delay, cancel_ref).await?;
519                        continue;
520                    }
521
522                    merged_hooks
523                        .run_on_error(ErrorContext {
524                            request: req_ctx.clone(),
525                            response: None,
526                            error: err.clone(),
527                        })
528                        .await;
529
530                    if retry_transport && retry_policy.is_some() {
531                        return Err(Error::retry_exhausted(attempt + 1, err));
532                    }
533
534                    return Err(err);
535                }
536            }
537        }
538    }
539
540    pub(crate) async fn execute(&self, builder: RequestBuilder<'_>) -> Result<Response> {
541        #[cfg(feature = "json")]
542        let json_parser = builder
543            .json_parser
544            .clone()
545            .or_else(|| self.config.json_parser.clone());
546
547        let built = build_url(
548            &self.config.base_url,
549            &builder.path,
550            &builder.params,
551            &builder.query,
552        )?;
553
554        let mut method = builder.method;
555        if let Some(override_method) = built.method_override {
556            method = override_method;
557        }
558
559        #[cfg(feature = "schema")]
560        if let Some(registry) = &self.config.schema_registry {
561            registry.ensure_route(&builder.path, &method)?;
562        }
563
564        let mut url = built.url;
565        let mut headers = builder.headers;
566        let auth = builder.auth.or_else(|| self.config.auth.clone());
567        if let Some(auth) = auth {
568            auth.apply(&mut headers).await?;
569        }
570
571        let mut prepared = PreparedRequest {
572            url: url.clone(),
573            path: builder.path.clone(),
574            method: method.clone(),
575            headers: headers.clone(),
576        };
577        self.config.plugins.run_init_all(&mut prepared).await?;
578        url = prepared.url;
579        headers = prepared.headers;
580        method = prepared.method;
581
582        let mut req_ctx = RequestContext {
583            url: url.clone(),
584            method: method.clone(),
585            headers: headers.clone(),
586            body: body_for_context(&builder.body),
587            retry_attempt: 0,
588        };
589
590        let merged_hooks = &self.config.merged_hooks;
591        req_ctx = merged_hooks.run_on_request(req_ctx).await?;
592        url = req_ctx.url.clone();
593        headers = req_ctx.headers.clone();
594        method = req_ctx.method.clone();
595
596        let timeout = builder.timeout;
597        let retry_policy = builder.retry.or_else(|| self.config.retry.clone());
598        let throw_on_error = builder.throw_on_error;
599        let cancel = builder.cancellation;
600
601        let backend = self.backend.clone();
602
603        let _in_flight_permit = match &self.config.max_in_flight {
604            Some(sem) => Some(
605                sem.acquire()
606                    .await
607                    .map_err(|_| Error::Other("max_in_flight semaphore closed".into()))?,
608            ),
609            None => None,
610        };
611
612        let mut attempt = 0u32;
613        let max_attempts = retry_policy.as_ref().map(|p| p.max_attempts()).unwrap_or(0);
614
615        let request_body = builder.body;
616        #[cfg(feature = "multipart")]
617        let mut multipart_body = builder.multipart;
618        #[cfg(feature = "multipart")]
619        let had_multipart = multipart_body.is_some();
620
621        let cancel_ref = cancel.as_ref();
622
623        loop {
624            req_ctx.retry_attempt = attempt;
625
626            #[cfg(feature = "multipart")]
627            if attempt > 0 && had_multipart {
628                return Err(Error::Other(
629                    "automatic retry is not supported with multipart request bodies".into(),
630                ));
631            }
632
633            let http_req = HttpRequest {
634                method: method.clone(),
635                url: url.clone(),
636                headers: headers.clone(),
637                body: request_body.clone(),
638                timeout,
639                cancellation: cancel.clone(),
640                #[cfg(feature = "multipart")]
641                multipart: multipart_body.take(),
642            };
643            let request_url = http_req.url.clone();
644
645            let result = execute_or_cancel(cancel_ref, backend.execute(http_req)).await;
646
647            match result {
648                Ok(http_res) => {
649                    let response = Response::new(
650                        http_res.status,
651                        http_res.headers.clone(),
652                        http_res.body,
653                        Some(request_url.clone()),
654                        #[cfg(feature = "json")]
655                        json_parser.clone(),
656                    );
657
658                    let response = merged_hooks
659                        .run_on_response(ResponseContext {
660                            request: req_ctx.clone(),
661                            response,
662                        })
663                        .await?;
664
665                    let should_retry = retry_policy
666                        .as_ref()
667                        .map(|p| p.should_retry_response(&response, false))
668                        .unwrap_or(false);
669
670                    if should_retry && attempt < max_attempts {
671                        merged_hooks
672                            .run_on_retry(ResponseContext {
673                                request: req_ctx.clone(),
674                                response: response.clone(),
675                            })
676                            .await;
677                        let delay = retry_policy
678                            .as_ref()
679                            .map(|p| p.delay_after_response(attempt, response.headers()))
680                            .unwrap_or(Duration::from_secs(1));
681                        attempt += 1;
682                        sleep_or_cancel(delay, cancel_ref).await?;
683                        continue;
684                    }
685
686                    if response.is_success() {
687                        merged_hooks
688                            .run_on_success(SuccessContext {
689                                request: req_ctx.clone(),
690                                response: response.clone(),
691                            })
692                            .await;
693                        return Ok(response);
694                    }
695
696                    let status = response.status();
697                    let http_err = Error::http_with_status_text(
698                        status,
699                        status.canonical_reason().unwrap_or("request failed"),
700                        status.canonical_reason().unwrap_or("request failed"),
701                        Some(response.bytes().clone()),
702                    );
703                    merged_hooks
704                        .run_on_error(ErrorContext {
705                            request: req_ctx.clone(),
706                            response: Some(response.clone()),
707                            error: http_err.clone(),
708                        })
709                        .await;
710
711                    if throw_on_error {
712                        return Err(http_err);
713                    }
714                    return Ok(response);
715                }
716                Err(err) => {
717                    if err.is_cancelled() {
718                        merged_hooks
719                            .run_on_error(ErrorContext {
720                                request: req_ctx.clone(),
721                                response: None,
722                                error: err.clone(),
723                            })
724                            .await;
725                        return Err(err);
726                    }
727
728                    let retry_transport = matches!(&err, Error::Transport { .. } | Error::Timeout);
729                    if retry_transport && retry_policy.is_some() && attempt < max_attempts {
730                        merged_hooks
731                            .run_on_retry(ResponseContext {
732                                request: req_ctx.clone(),
733                                response: Response::new(
734                                    http::StatusCode::SERVICE_UNAVAILABLE,
735                                    http::HeaderMap::new(),
736                                    bytes::Bytes::new(),
737                                    Some(request_url.clone()),
738                                    #[cfg(feature = "json")]
739                                    None,
740                                ),
741                            })
742                            .await;
743                        let delay = retry_policy
744                            .as_ref()
745                            .map(|p| p.delay_after_response(attempt, &http::HeaderMap::new()))
746                            .unwrap_or(Duration::from_secs(1));
747                        attempt += 1;
748                        sleep_or_cancel(delay, cancel_ref).await?;
749                        continue;
750                    }
751
752                    merged_hooks
753                        .run_on_error(ErrorContext {
754                            request: req_ctx.clone(),
755                            response: None,
756                            error: err.clone(),
757                        })
758                        .await;
759
760                    if retry_transport && retry_policy.is_some() {
761                        return Err(Error::retry_exhausted(attempt + 1, err));
762                    }
763
764                    return Err(err);
765                }
766            }
767        }
768    }
769}
770
771/// Builder for [`Client`].
772pub struct ClientBuilder {
773    base_url: Option<Url>,
774    timeout: Option<Duration>,
775    retry: Option<RetryPolicy>,
776    auth: Option<Auth>,
777    default_headers: http::HeaderMap,
778    hooks: Hooks,
779    plugins: PluginRegistry,
780    reqwest_client: Option<ReqwestClient>,
781    custom_backend: Option<Arc<dyn HttpBackend>>,
782    max_in_flight: Option<usize>,
783    max_response_bytes: Option<u64>,
784    retry_body_peek_bytes: Option<u64>,
785    #[cfg(feature = "schema")]
786    schema_registry: Option<Arc<SchemaRegistry>>,
787    #[cfg(feature = "json")]
788    json_parser: Option<JsonParserFn>,
789}
790
791impl ClientBuilder {
792    /// Creates an empty builder; [`Self::base_url`] is required before [`Self::build`].
793    pub fn new() -> Self {
794        Self {
795            base_url: None,
796            timeout: None,
797            retry: None,
798            auth: None,
799            default_headers: http::HeaderMap::new(),
800            hooks: Hooks::default(),
801            plugins: PluginRegistry::new(),
802            reqwest_client: None,
803            custom_backend: None,
804            max_in_flight: None,
805            max_response_bytes: None,
806            retry_body_peek_bytes: None,
807            #[cfg(feature = "schema")]
808            schema_registry: None,
809            #[cfg(feature = "json")]
810            json_parser: None,
811        }
812    }
813
814    /// Sets the base URL (required).
815    pub fn base_url(mut self, base_url: impl AsRef<str>) -> Result<Self> {
816        self.base_url = Some(Url::parse(base_url.as_ref()).map_err(Error::InvalidBaseUrl)?);
817        Ok(self)
818    }
819
820    /// Sets the default request timeout.
821    pub fn timeout(mut self, timeout: Duration) -> Self {
822        self.timeout = Some(timeout);
823        self
824    }
825
826    /// Sets the default [`RetryPolicy`] for all requests.
827    pub fn retry(mut self, policy: RetryPolicy) -> Self {
828        self.retry = Some(policy);
829        self
830    }
831
832    /// Sets default authentication for all requests.
833    pub fn auth(mut self, auth: Auth) -> Self {
834        self.auth = Some(auth);
835        self
836    }
837
838    /// Adds a default header applied to every request.
839    pub fn default_header(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
840        let name = http::HeaderName::from_bytes(key.as_ref().as_bytes())
841            .map_err(|e| Error::Other(format!("invalid header name: {e}")))?;
842        let value = http::HeaderValue::from_str(value.as_ref())
843            .map_err(|e| Error::Other(format!("invalid header value: {e}")))?;
844        self.default_headers.insert(name, value);
845        Ok(self)
846    }
847
848    /// Sets client-level lifecycle hooks.
849    pub fn hooks(mut self, hooks: Hooks) -> Self {
850        self.hooks = hooks;
851        self
852    }
853
854    /// Registers a [`Plugin`] on this client.
855    pub fn plugin<P: crate::plugin::Plugin + 'static>(mut self, plugin: P) -> Self {
856        self.plugins.push(Box::new(plugin));
857        self
858    }
859
860    /// Uses a custom reqwest client for the default [`ReqwestBackend`].
861    pub fn reqwest_client(mut self, client: ReqwestClient) -> Self {
862        self.reqwest_client = Some(client);
863        self
864    }
865
866    /// Use a custom HTTP backend (for testing or alternate transports).
867    ///
868    /// # Examples
869    ///
870    /// ```no_run
871    /// # use better_fetch::{ClientBuilder, Error, HttpBackend, HttpRequest, HttpResponse, HttpStreamingResponse, Result};
872    /// # use async_trait::async_trait;
873    /// # use bytes::Bytes;
874    /// # use http::StatusCode;
875    /// # use std::sync::Arc;
876    /// # struct MockBackend;
877    /// # #[async_trait]
878    /// # impl HttpBackend for MockBackend {
879    /// #     async fn execute(&self, _req: HttpRequest) -> Result<HttpResponse> {
880    /// #         Ok(HttpResponse {
881    /// #             status: StatusCode::OK,
882    /// #             headers: Default::default(),
883    /// #             body: Bytes::from_static(b"{}"),
884    /// #         })
885    /// #     }
886    /// #     async fn execute_stream(
887    /// #         &self,
888    /// #         _req: HttpRequest,
889    /// #     ) -> Result<HttpStreamingResponse> {
890    /// #         Err(Error::Other("streaming not supported".into()))
891    /// #     }
892    /// # }
893    /// # fn example() -> Result<()> {
894    /// let client = ClientBuilder::new()
895    ///     .base_url("https://api.example.com")?
896    ///     .backend(Arc::new(MockBackend))
897    ///     .build()?;
898    /// # Ok(())
899    /// # }
900    /// ```
901    pub fn backend(mut self, backend: Arc<dyn HttpBackend>) -> Self {
902        self.custom_backend = Some(backend);
903        self
904    }
905
906    /// Limits how many requests this client may have in flight at once (including retries).
907    ///
908    /// Implemented with a tokio semaphore in the core client. This counts the full request
909    /// lifecycle (hooks and retries), not just the transport hop. For wire-level limits only,
910    /// use [`Self::transport_stack`] with Tower's [`ConcurrencyLimitLayer`](crate::tower::stack::ConcurrencyLimitLayer)
911    /// (feature `tower`) instead of—or deliberately alongside—this setting.
912    pub fn max_in_flight(mut self, limit: usize) -> Self {
913        self.max_in_flight = Some(limit);
914        self
915    }
916
917    /// Maximum response body size (in bytes) for [`RequestBuilder::send_stream`](crate::RequestBuilder::send_stream)
918    /// when the request does not set its own limit.
919    pub fn max_response_bytes(mut self, limit: u64) -> Self {
920        self.max_response_bytes = Some(limit);
921        self
922    }
923
924    /// Maximum bytes read from a streaming body when a custom retry predicate is configured.
925    ///
926    /// Defaults to 64 KiB. Capped by [`Self::max_response_bytes`] when that is also set.
927    pub fn retry_body_peek_bytes(mut self, limit: u64) -> Self {
928        self.retry_body_peek_bytes = Some(limit);
929        self
930    }
931
932    /// Attach a [`SchemaRegistry`] for strict route validation (feature `schema`).
933    #[cfg(feature = "schema")]
934    pub fn schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
935        self.schema_registry = Some(registry);
936        self
937    }
938
939    /// Use a Tower [`Service`](tower::Service) as the HTTP transport (feature `tower`).
940    #[cfg(feature = "tower")]
941    pub fn http_service<S>(mut self, service: S) -> Self
942    where
943        S: tower::Service<HttpRequest, Response = HttpResponse, Error = Error>
944            + Clone
945            + Send
946            + 'static,
947        S::Future: Send + 'static,
948    {
949        use crate::backend::ReqwestBackend;
950        use crate::tower::ServiceBackend;
951
952        let client = self.reqwest_client.clone().unwrap_or_default();
953        let streaming = ReqwestBackend::new(client);
954        self.custom_backend = Some(Arc::new(ServiceBackend::new(service, streaming)));
955        self
956    }
957
958    /// Use a boxed Tower transport stack (feature `tower`).
959    #[cfg(feature = "tower")]
960    pub fn http_service_boxed(mut self, service: crate::tower::BoxHttpService) -> Self {
961        use crate::backend::ReqwestBackend;
962        use crate::tower::ServiceBackend;
963
964        let client = self.reqwest_client.clone().unwrap_or_default();
965        let streaming = ReqwestBackend::new(client);
966        self.custom_backend = Some(Arc::new(ServiceBackend::from_box(service, streaming)));
967        self
968    }
969
970    /// Build a Tower transport stack on top of the configured (or default) reqwest client.
971    ///
972    /// Application hooks and [`RetryPolicy`](crate::RetryPolicy) remain in the core client;
973    /// only wire-level behavior is configured here.
974    ///
975    /// # Examples
976    ///
977    /// ```no_run
978    /// # use better_fetch::{ClientBuilder, Result};
979    /// # use better_fetch::tower::stack::{ConcurrencyLimitLayer, IntoBoxHttpService, ServiceBuilder};
980    /// let client = ClientBuilder::new()
981    ///     .base_url("https://api.example.com")?
982    ///     .transport_stack(|inner| {
983    ///         ServiceBuilder::new()
984    ///             .layer(ConcurrencyLimitLayer::new(32))
985    ///             .service(inner)
986    ///             .into_box()
987    ///     })
988    ///     .build()?;
989    /// # Ok::<(), better_fetch::Error>(())
990    /// ```
991    #[cfg(feature = "tower")]
992    pub fn transport_stack<F>(mut self, configure: F) -> Self
993    where
994        F: FnOnce(crate::tower::ReqwestHttpService) -> crate::tower::BoxHttpService,
995    {
996        use crate::backend::ReqwestBackend;
997        use crate::tower::ServiceBackend;
998
999        let client = self.reqwest_client.clone().unwrap_or_default();
1000        let streaming = ReqwestBackend::new(client.clone());
1001        let stacked = configure(crate::tower::ReqwestHttpService::new(client));
1002        self.custom_backend = Some(Arc::new(ServiceBackend::from_box(stacked, streaming)));
1003        self
1004    }
1005
1006    /// Sets a custom JSON parser for all responses from this client.
1007    ///
1008    /// See [`crate::json_parser`] for the two-step `Bytes` → `Value` → `T` pipeline vs the
1009    /// default single-step fast path, and [`Response::into_json_with`](crate::response::Response::into_json_with)
1010    /// for per-response `Bytes` → `T` without a global parser.
1011    ///
1012    /// # Examples
1013    ///
1014    /// ```no_run
1015    /// # use better_fetch::{ClientBuilder, Result};
1016    /// # use bytes::Bytes;
1017    /// let client = ClientBuilder::new()
1018    ///     .base_url("https://api.example.com")?
1019    ///     .json_parser(|body: &Bytes| {
1020    ///         let slice = body.strip_prefix(b"\xef\xbb\xbf").unwrap_or(body);
1021    ///         serde_json::from_slice(slice).map_err(|e| e.to_string())
1022    ///     })
1023    ///     .build()?;
1024    /// # Ok::<(), better_fetch::Error>(())
1025    /// ```
1026    #[cfg(feature = "json")]
1027    pub fn json_parser<F>(mut self, f: F) -> Self
1028    where
1029        F: Fn(&bytes::Bytes) -> std::result::Result<serde_json::Value, String>
1030            + Send
1031            + Sync
1032            + 'static,
1033    {
1034        self.json_parser = Some(crate::json_parser::json_parser(f));
1035        self
1036    }
1037
1038    /// Sets a custom JSON parser from an existing [`JsonParserFn`].
1039    #[cfg(feature = "json")]
1040    pub fn json_parser_fn(mut self, parser: JsonParserFn) -> Self {
1041        self.json_parser = Some(parser);
1042        self
1043    }
1044
1045    /// Builds the [`Client`]. Requires [`Self::base_url`].
1046    ///
1047    /// # Examples
1048    ///
1049    /// ```no_run
1050    /// # use better_fetch::{ClientBuilder, Result};
1051    /// let client = ClientBuilder::new()
1052    ///     .base_url("https://api.example.com")?
1053    ///     .build()?;
1054    /// # Ok::<(), better_fetch::Error>(())
1055    /// ```
1056    pub fn build(self) -> Result<Client> {
1057        let base_url = self.base_url.ok_or(Error::MissingBaseUrl)?;
1058
1059        let backend: Arc<dyn HttpBackend> = if let Some(b) = self.custom_backend {
1060            b
1061        } else {
1062            let reqwest_client = self.reqwest_client.unwrap_or_default();
1063            Arc::new(ReqwestBackend::new(reqwest_client))
1064        };
1065
1066        let plugins = Arc::new(self.plugins);
1067        let merged_hooks = self.hooks.clone().merge(plugins.merged_hooks());
1068
1069        Ok(Client {
1070            config: Arc::new(ClientConfig {
1071                base_url,
1072                timeout: self.timeout,
1073                retry: self.retry,
1074                auth: self.auth,
1075                default_headers: self.default_headers,
1076                hooks: self.hooks,
1077                merged_hooks,
1078                plugins,
1079                max_in_flight: self.max_in_flight.map(|n| Arc::new(Semaphore::new(n))),
1080                #[cfg(feature = "schema")]
1081                schema_registry: self.schema_registry,
1082                #[cfg(feature = "json")]
1083                json_parser: self.json_parser,
1084                max_response_bytes: self.max_response_bytes,
1085                retry_body_peek_bytes: self
1086                    .retry_body_peek_bytes
1087                    .unwrap_or(RETRY_BODY_PEEK_DEFAULT),
1088            }),
1089            backend,
1090        })
1091    }
1092}
1093
1094impl Default for ClientBuilder {
1095    fn default() -> Self {
1096        Self::new()
1097    }
1098}