1use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use indexmap::IndexMap;
15use tokio::sync::Semaphore;
16
17use http::Method;
18use reqwest::Client as ReqwestClient;
19use url::Url;
20
21use crate::auth::Auth;
22use crate::backend::{HttpBackend, HttpBody, HttpRequest, ReqwestBackend};
23use crate::cancel::execute_or_cancel;
24use crate::endpoint::{Endpoint, EndpointParams, EndpointParamsInitial, EndpointRequestBuilder};
25use crate::error::Error;
26use crate::hooks::{
27 ErrorContext, Hooks, RequestContext, ResponseContext, StreamingResponseContext,
28 StreamingSuccessContext, SuccessContext,
29};
30use crate::plugin::{PluginRegistry, PreparedRequest};
31use crate::request::RequestBuilder;
32use crate::response::Response;
33use crate::retry::{sleep_or_cancel, RetryPolicy};
34use crate::streaming::{
35 body_stream_from_bytes, drain_body_for_retry, wrap_cancellation, wrap_max_bytes,
36 StreamingResponse, RETRY_BODY_PEEK_DEFAULT,
37};
38use crate::url_build::build_url;
39use crate::Result;
40
41#[cfg(feature = "tower")]
42use crate::backend::HttpResponse;
43
44#[cfg(feature = "json")]
45use crate::json_parser::JsonParserFn;
46
47#[cfg(feature = "schema")]
48use crate::schema::SchemaRegistry;
49
50fn body_for_context(body: &HttpBody) -> Option<bytes::Bytes> {
51 match body {
52 HttpBody::Empty => None,
53 HttpBody::Bytes(b) => Some(b.clone()),
54 }
55}
56
57#[derive(Clone)]
59pub struct ClientConfig {
60 pub base_url: Url,
62 pub timeout: Option<Duration>,
64 pub retry: Option<RetryPolicy>,
66 pub auth: Option<Auth>,
68 pub default_headers: http::HeaderMap,
70 pub hooks: Hooks,
72 pub(crate) merged_hooks: Hooks,
73 pub plugins: Arc<PluginRegistry>,
75 pub max_in_flight: Option<Arc<Semaphore>>,
81 #[cfg(feature = "schema")]
82 pub schema_registry: Option<Arc<SchemaRegistry>>,
84 #[cfg(feature = "json")]
85 pub json_parser: Option<JsonParserFn>,
87 pub max_response_bytes: Option<u64>,
89 pub retry_body_peek_bytes: u64,
91}
92
93#[derive(Clone)]
95pub struct Client {
96 config: Arc<ClientConfig>,
97 backend: Arc<dyn HttpBackend>,
98}
99
100impl Client {
101 pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
115 ClientBuilder::new().base_url(base_url)?.build()
116 }
117
118 pub fn builder() -> ClientBuilder {
120 ClientBuilder::new()
121 }
122
123 pub fn with_http_client(
125 reqwest_client: ReqwestClient,
126 base_url: impl AsRef<str>,
127 ) -> Result<Self> {
128 ClientBuilder::new()
129 .reqwest_client(reqwest_client)
130 .base_url(base_url)?
131 .build()
132 }
133
134 pub fn call<E: Endpoint>(
173 &self,
174 ) -> EndpointRequestBuilder<'_, E, <E::Params as EndpointParams>::BuilderState>
175 where
176 E::Params: EndpointParamsInitial<E>,
177 {
178 E::Params::initial(self)
179 }
180
181 pub fn config(&self) -> &ClientConfig {
183 &self.config
184 }
185
186 pub fn get(&self, path: impl Into<String>) -> RequestBuilder<'_> {
188 self.request(Method::GET, path)
189 }
190
191 pub fn post(&self, path: impl Into<String>) -> RequestBuilder<'_> {
193 self.request(Method::POST, path)
194 }
195
196 pub fn put(&self, path: impl Into<String>) -> RequestBuilder<'_> {
198 self.request(Method::PUT, path)
199 }
200
201 pub fn patch(&self, path: impl Into<String>) -> RequestBuilder<'_> {
203 self.request(Method::PATCH, path)
204 }
205
206 pub fn delete(&self, path: impl Into<String>) -> RequestBuilder<'_> {
208 self.request(Method::DELETE, path)
209 }
210
211 pub fn head(&self, path: impl Into<String>) -> RequestBuilder<'_> {
213 self.request(Method::HEAD, path)
214 }
215
216 pub fn request(&self, method: Method, path: impl Into<String>) -> RequestBuilder<'_> {
218 RequestBuilder {
219 client: self,
220 method,
221 path: path.into(),
222 params: HashMap::new(),
223 query: IndexMap::new(),
224 headers: self.config.default_headers.clone(),
225 body: HttpBody::Empty,
226 #[cfg(feature = "multipart")]
227 multipart: None,
228 timeout: self.config.timeout,
229 retry: self.config.retry.clone(),
230 auth: self.config.auth.clone(),
231 cancellation: None,
232 throw_on_error: false,
233 #[cfg(feature = "json")]
234 json_parser: None,
235 #[cfg(feature = "validate")]
236 validate_response: true,
237 max_response_bytes: None,
238 retry_body_peek_bytes: None,
239 }
240 }
241
242 pub(crate) async fn execute_stream(
243 &self,
244 builder: RequestBuilder<'_>,
245 ) -> Result<StreamingResponse> {
246 #[cfg(feature = "json")]
247 let json_parser = builder
248 .json_parser
249 .clone()
250 .or_else(|| self.config.json_parser.clone());
251
252 let built = build_url(
253 &self.config.base_url,
254 &builder.path,
255 &builder.params,
256 &builder.query,
257 )?;
258
259 let mut method = builder.method;
260 if let Some(override_method) = built.method_override {
261 method = override_method;
262 }
263
264 #[cfg(feature = "schema")]
265 if let Some(registry) = &self.config.schema_registry {
266 registry.ensure_route(&builder.path, &method)?;
267 }
268
269 let mut url = built.url;
270 let mut headers = builder.headers;
271 let auth = builder.auth.or_else(|| self.config.auth.clone());
272 if let Some(auth) = auth {
273 auth.apply(&mut headers).await?;
274 }
275
276 let mut prepared = PreparedRequest {
277 url: url.clone(),
278 path: builder.path.clone(),
279 method: method.clone(),
280 headers: headers.clone(),
281 };
282 self.config.plugins.run_init_all(&mut prepared).await?;
283 url = prepared.url;
284 headers = prepared.headers;
285 method = prepared.method;
286
287 let mut req_ctx = RequestContext {
288 url: url.clone(),
289 method: method.clone(),
290 headers: headers.clone(),
291 body: body_for_context(&builder.body),
292 retry_attempt: 0,
293 };
294
295 let merged_hooks = &self.config.merged_hooks;
296 req_ctx = merged_hooks.run_on_request(req_ctx).await?;
297 url = req_ctx.url.clone();
298 headers = req_ctx.headers.clone();
299 method = req_ctx.method.clone();
300
301 let timeout = builder.timeout;
302 let retry_policy = builder.retry.or_else(|| self.config.retry.clone());
303 let throw_on_error = builder.throw_on_error;
304 let cancel = builder.cancellation;
305 let max_response_bytes = builder
306 .max_response_bytes
307 .or(self.config.max_response_bytes);
308 let retry_body_peek_bytes = builder
309 .retry_body_peek_bytes
310 .unwrap_or(self.config.retry_body_peek_bytes);
311
312 let backend = self.backend.clone();
313
314 let _in_flight_permit = match &self.config.max_in_flight {
315 Some(sem) => Some(
316 sem.acquire()
317 .await
318 .map_err(|_| Error::Other("max_in_flight semaphore closed".into()))?,
319 ),
320 None => None,
321 };
322
323 let mut attempt = 0u32;
324 let max_attempts = retry_policy.as_ref().map(|p| p.max_attempts()).unwrap_or(0);
325
326 let request_body = builder.body;
327 #[cfg(feature = "multipart")]
328 let mut multipart_body = builder.multipart;
329 #[cfg(feature = "multipart")]
330 let had_multipart = multipart_body.is_some();
331
332 let cancel_ref = cancel.as_ref();
333
334 loop {
335 req_ctx.retry_attempt = attempt;
336
337 #[cfg(feature = "multipart")]
338 if attempt > 0 && had_multipart {
339 return Err(Error::Other(
340 "automatic retry is not supported with multipart request bodies".into(),
341 ));
342 }
343
344 let http_req = HttpRequest {
345 method: method.clone(),
346 url: url.clone(),
347 headers: headers.clone(),
348 body: request_body.clone(),
349 timeout,
350 cancellation: cancel.clone(),
351 #[cfg(feature = "multipart")]
352 multipart: multipart_body.take(),
353 };
354 let request_url = http_req.url.clone();
355
356 let result = execute_or_cancel(cancel_ref, backend.execute_stream(http_req)).await;
357
358 match result {
359 Ok(http_res) => {
360 let status = http_res.status;
361 let headers = http_res.headers.clone();
362 let peek_limit = max_response_bytes
363 .map(|m| m.min(retry_body_peek_bytes))
364 .unwrap_or(retry_body_peek_bytes);
365
366 let mut body = http_res.body;
367 if let Some(policy) = retry_policy.as_ref() {
368 if policy.has_custom_should_retry() {
369 let peeked = drain_body_for_retry(body, peek_limit).await?;
370 let stub = Response::new(
371 status,
372 headers.clone(),
373 peeked.clone(),
374 Some(request_url.clone()),
375 #[cfg(feature = "json")]
376 None,
377 );
378 if policy.should_retry_response(&stub, false) && attempt < max_attempts
379 {
380 let stub = Response::new(
381 status,
382 headers.clone(),
383 bytes::Bytes::new(),
384 Some(request_url.clone()),
385 #[cfg(feature = "json")]
386 None,
387 );
388 merged_hooks
389 .run_on_retry(ResponseContext {
390 request: req_ctx.clone(),
391 response: stub,
392 })
393 .await;
394 let delay = policy.delay_after_response(attempt, &headers);
395 attempt += 1;
396 sleep_or_cancel(delay, cancel_ref).await?;
397 continue;
398 }
399 body = body_stream_from_bytes(peeked);
400 } else {
401 let stub = Response::new(
402 status,
403 headers.clone(),
404 bytes::Bytes::new(),
405 Some(request_url.clone()),
406 #[cfg(feature = "json")]
407 None,
408 );
409 if policy.should_retry_response(&stub, false) && attempt < max_attempts
410 {
411 let stub = Response::new(
412 status,
413 headers.clone(),
414 bytes::Bytes::new(),
415 Some(request_url.clone()),
416 #[cfg(feature = "json")]
417 None,
418 );
419 merged_hooks
420 .run_on_retry(ResponseContext {
421 request: req_ctx.clone(),
422 response: stub,
423 })
424 .await;
425 let delay = policy.delay_after_response(attempt, &headers);
426 attempt += 1;
427 sleep_or_cancel(delay, cancel_ref).await?;
428 continue;
429 }
430 }
431 }
432
433 let meta = merged_hooks
434 .run_on_response_stream(StreamingResponseContext {
435 request: req_ctx.clone(),
436 status,
437 headers,
438 })
439 .await?;
440 let status = meta.status;
441 let stream_headers = meta.headers;
442
443 if throw_on_error && !status.is_success() {
444 let http_err = Error::http_with_status_text(
445 status,
446 status.canonical_reason().unwrap_or("request failed"),
447 status.canonical_reason().unwrap_or("request failed"),
448 None,
449 );
450 merged_hooks
451 .run_on_error(ErrorContext {
452 request: req_ctx.clone(),
453 response: None,
454 error: http_err.clone(),
455 })
456 .await;
457 return Err(http_err);
458 }
459
460 if let Some(limit) = max_response_bytes {
461 body = wrap_max_bytes(body, limit);
462 }
463 if let Some(token) = cancel.clone() {
464 body = wrap_cancellation(body, token);
465 }
466
467 if status.is_success() {
468 merged_hooks
469 .run_on_success_stream(StreamingSuccessContext {
470 request: req_ctx.clone(),
471 status,
472 headers: stream_headers.clone(),
473 })
474 .await;
475 }
476
477 return Ok(StreamingResponse::new(
478 status,
479 stream_headers,
480 body,
481 Some(request_url),
482 #[cfg(feature = "json")]
483 json_parser,
484 ));
485 }
486 Err(err) => {
487 if err.is_cancelled() {
488 merged_hooks
489 .run_on_error(ErrorContext {
490 request: req_ctx.clone(),
491 response: None,
492 error: err.clone(),
493 })
494 .await;
495 return Err(err);
496 }
497
498 let retry_transport = matches!(&err, Error::Transport { .. } | Error::Timeout);
499 if retry_transport && retry_policy.is_some() && attempt < max_attempts {
500 merged_hooks
501 .run_on_retry(ResponseContext {
502 request: req_ctx.clone(),
503 response: Response::new(
504 http::StatusCode::SERVICE_UNAVAILABLE,
505 http::HeaderMap::new(),
506 bytes::Bytes::new(),
507 Some(request_url.clone()),
508 #[cfg(feature = "json")]
509 None,
510 ),
511 })
512 .await;
513 let delay = retry_policy
514 .as_ref()
515 .map(|p| p.delay_after_response(attempt, &http::HeaderMap::new()))
516 .unwrap_or(Duration::from_secs(1));
517 attempt += 1;
518 sleep_or_cancel(delay, cancel_ref).await?;
519 continue;
520 }
521
522 merged_hooks
523 .run_on_error(ErrorContext {
524 request: req_ctx.clone(),
525 response: None,
526 error: err.clone(),
527 })
528 .await;
529
530 if retry_transport && retry_policy.is_some() {
531 return Err(Error::retry_exhausted(attempt + 1, err));
532 }
533
534 return Err(err);
535 }
536 }
537 }
538 }
539
540 pub(crate) async fn execute(&self, builder: RequestBuilder<'_>) -> Result<Response> {
541 #[cfg(feature = "json")]
542 let json_parser = builder
543 .json_parser
544 .clone()
545 .or_else(|| self.config.json_parser.clone());
546
547 let built = build_url(
548 &self.config.base_url,
549 &builder.path,
550 &builder.params,
551 &builder.query,
552 )?;
553
554 let mut method = builder.method;
555 if let Some(override_method) = built.method_override {
556 method = override_method;
557 }
558
559 #[cfg(feature = "schema")]
560 if let Some(registry) = &self.config.schema_registry {
561 registry.ensure_route(&builder.path, &method)?;
562 }
563
564 let mut url = built.url;
565 let mut headers = builder.headers;
566 let auth = builder.auth.or_else(|| self.config.auth.clone());
567 if let Some(auth) = auth {
568 auth.apply(&mut headers).await?;
569 }
570
571 let mut prepared = PreparedRequest {
572 url: url.clone(),
573 path: builder.path.clone(),
574 method: method.clone(),
575 headers: headers.clone(),
576 };
577 self.config.plugins.run_init_all(&mut prepared).await?;
578 url = prepared.url;
579 headers = prepared.headers;
580 method = prepared.method;
581
582 let mut req_ctx = RequestContext {
583 url: url.clone(),
584 method: method.clone(),
585 headers: headers.clone(),
586 body: body_for_context(&builder.body),
587 retry_attempt: 0,
588 };
589
590 let merged_hooks = &self.config.merged_hooks;
591 req_ctx = merged_hooks.run_on_request(req_ctx).await?;
592 url = req_ctx.url.clone();
593 headers = req_ctx.headers.clone();
594 method = req_ctx.method.clone();
595
596 let timeout = builder.timeout;
597 let retry_policy = builder.retry.or_else(|| self.config.retry.clone());
598 let throw_on_error = builder.throw_on_error;
599 let cancel = builder.cancellation;
600
601 let backend = self.backend.clone();
602
603 let _in_flight_permit = match &self.config.max_in_flight {
604 Some(sem) => Some(
605 sem.acquire()
606 .await
607 .map_err(|_| Error::Other("max_in_flight semaphore closed".into()))?,
608 ),
609 None => None,
610 };
611
612 let mut attempt = 0u32;
613 let max_attempts = retry_policy.as_ref().map(|p| p.max_attempts()).unwrap_or(0);
614
615 let request_body = builder.body;
616 #[cfg(feature = "multipart")]
617 let mut multipart_body = builder.multipart;
618 #[cfg(feature = "multipart")]
619 let had_multipart = multipart_body.is_some();
620
621 let cancel_ref = cancel.as_ref();
622
623 loop {
624 req_ctx.retry_attempt = attempt;
625
626 #[cfg(feature = "multipart")]
627 if attempt > 0 && had_multipart {
628 return Err(Error::Other(
629 "automatic retry is not supported with multipart request bodies".into(),
630 ));
631 }
632
633 let http_req = HttpRequest {
634 method: method.clone(),
635 url: url.clone(),
636 headers: headers.clone(),
637 body: request_body.clone(),
638 timeout,
639 cancellation: cancel.clone(),
640 #[cfg(feature = "multipart")]
641 multipart: multipart_body.take(),
642 };
643 let request_url = http_req.url.clone();
644
645 let result = execute_or_cancel(cancel_ref, backend.execute(http_req)).await;
646
647 match result {
648 Ok(http_res) => {
649 let response = Response::new(
650 http_res.status,
651 http_res.headers.clone(),
652 http_res.body,
653 Some(request_url.clone()),
654 #[cfg(feature = "json")]
655 json_parser.clone(),
656 );
657
658 let response = merged_hooks
659 .run_on_response(ResponseContext {
660 request: req_ctx.clone(),
661 response,
662 })
663 .await?;
664
665 let should_retry = retry_policy
666 .as_ref()
667 .map(|p| p.should_retry_response(&response, false))
668 .unwrap_or(false);
669
670 if should_retry && attempt < max_attempts {
671 merged_hooks
672 .run_on_retry(ResponseContext {
673 request: req_ctx.clone(),
674 response: response.clone(),
675 })
676 .await;
677 let delay = retry_policy
678 .as_ref()
679 .map(|p| p.delay_after_response(attempt, response.headers()))
680 .unwrap_or(Duration::from_secs(1));
681 attempt += 1;
682 sleep_or_cancel(delay, cancel_ref).await?;
683 continue;
684 }
685
686 if response.is_success() {
687 merged_hooks
688 .run_on_success(SuccessContext {
689 request: req_ctx.clone(),
690 response: response.clone(),
691 })
692 .await;
693 return Ok(response);
694 }
695
696 let status = response.status();
697 let http_err = Error::http_with_status_text(
698 status,
699 status.canonical_reason().unwrap_or("request failed"),
700 status.canonical_reason().unwrap_or("request failed"),
701 Some(response.bytes().clone()),
702 );
703 merged_hooks
704 .run_on_error(ErrorContext {
705 request: req_ctx.clone(),
706 response: Some(response.clone()),
707 error: http_err.clone(),
708 })
709 .await;
710
711 if throw_on_error {
712 return Err(http_err);
713 }
714 return Ok(response);
715 }
716 Err(err) => {
717 if err.is_cancelled() {
718 merged_hooks
719 .run_on_error(ErrorContext {
720 request: req_ctx.clone(),
721 response: None,
722 error: err.clone(),
723 })
724 .await;
725 return Err(err);
726 }
727
728 let retry_transport = matches!(&err, Error::Transport { .. } | Error::Timeout);
729 if retry_transport && retry_policy.is_some() && attempt < max_attempts {
730 merged_hooks
731 .run_on_retry(ResponseContext {
732 request: req_ctx.clone(),
733 response: Response::new(
734 http::StatusCode::SERVICE_UNAVAILABLE,
735 http::HeaderMap::new(),
736 bytes::Bytes::new(),
737 Some(request_url.clone()),
738 #[cfg(feature = "json")]
739 None,
740 ),
741 })
742 .await;
743 let delay = retry_policy
744 .as_ref()
745 .map(|p| p.delay_after_response(attempt, &http::HeaderMap::new()))
746 .unwrap_or(Duration::from_secs(1));
747 attempt += 1;
748 sleep_or_cancel(delay, cancel_ref).await?;
749 continue;
750 }
751
752 merged_hooks
753 .run_on_error(ErrorContext {
754 request: req_ctx.clone(),
755 response: None,
756 error: err.clone(),
757 })
758 .await;
759
760 if retry_transport && retry_policy.is_some() {
761 return Err(Error::retry_exhausted(attempt + 1, err));
762 }
763
764 return Err(err);
765 }
766 }
767 }
768 }
769}
770
771pub struct ClientBuilder {
773 base_url: Option<Url>,
774 timeout: Option<Duration>,
775 retry: Option<RetryPolicy>,
776 auth: Option<Auth>,
777 default_headers: http::HeaderMap,
778 hooks: Hooks,
779 plugins: PluginRegistry,
780 reqwest_client: Option<ReqwestClient>,
781 custom_backend: Option<Arc<dyn HttpBackend>>,
782 max_in_flight: Option<usize>,
783 max_response_bytes: Option<u64>,
784 retry_body_peek_bytes: Option<u64>,
785 #[cfg(feature = "schema")]
786 schema_registry: Option<Arc<SchemaRegistry>>,
787 #[cfg(feature = "json")]
788 json_parser: Option<JsonParserFn>,
789}
790
791impl ClientBuilder {
792 pub fn new() -> Self {
794 Self {
795 base_url: None,
796 timeout: None,
797 retry: None,
798 auth: None,
799 default_headers: http::HeaderMap::new(),
800 hooks: Hooks::default(),
801 plugins: PluginRegistry::new(),
802 reqwest_client: None,
803 custom_backend: None,
804 max_in_flight: None,
805 max_response_bytes: None,
806 retry_body_peek_bytes: None,
807 #[cfg(feature = "schema")]
808 schema_registry: None,
809 #[cfg(feature = "json")]
810 json_parser: None,
811 }
812 }
813
814 pub fn base_url(mut self, base_url: impl AsRef<str>) -> Result<Self> {
816 self.base_url = Some(Url::parse(base_url.as_ref()).map_err(Error::InvalidBaseUrl)?);
817 Ok(self)
818 }
819
820 pub fn timeout(mut self, timeout: Duration) -> Self {
822 self.timeout = Some(timeout);
823 self
824 }
825
826 pub fn retry(mut self, policy: RetryPolicy) -> Self {
828 self.retry = Some(policy);
829 self
830 }
831
832 pub fn auth(mut self, auth: Auth) -> Self {
834 self.auth = Some(auth);
835 self
836 }
837
838 pub fn default_header(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
840 let name = http::HeaderName::from_bytes(key.as_ref().as_bytes())
841 .map_err(|e| Error::Other(format!("invalid header name: {e}")))?;
842 let value = http::HeaderValue::from_str(value.as_ref())
843 .map_err(|e| Error::Other(format!("invalid header value: {e}")))?;
844 self.default_headers.insert(name, value);
845 Ok(self)
846 }
847
848 pub fn hooks(mut self, hooks: Hooks) -> Self {
850 self.hooks = hooks;
851 self
852 }
853
854 pub fn plugin<P: crate::plugin::Plugin + 'static>(mut self, plugin: P) -> Self {
856 self.plugins.push(Box::new(plugin));
857 self
858 }
859
860 pub fn reqwest_client(mut self, client: ReqwestClient) -> Self {
862 self.reqwest_client = Some(client);
863 self
864 }
865
866 pub fn backend(mut self, backend: Arc<dyn HttpBackend>) -> Self {
902 self.custom_backend = Some(backend);
903 self
904 }
905
906 pub fn max_in_flight(mut self, limit: usize) -> Self {
913 self.max_in_flight = Some(limit);
914 self
915 }
916
917 pub fn max_response_bytes(mut self, limit: u64) -> Self {
920 self.max_response_bytes = Some(limit);
921 self
922 }
923
924 pub fn retry_body_peek_bytes(mut self, limit: u64) -> Self {
928 self.retry_body_peek_bytes = Some(limit);
929 self
930 }
931
932 #[cfg(feature = "schema")]
934 pub fn schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
935 self.schema_registry = Some(registry);
936 self
937 }
938
939 #[cfg(feature = "tower")]
941 pub fn http_service<S>(mut self, service: S) -> Self
942 where
943 S: tower::Service<HttpRequest, Response = HttpResponse, Error = Error>
944 + Clone
945 + Send
946 + 'static,
947 S::Future: Send + 'static,
948 {
949 use crate::backend::ReqwestBackend;
950 use crate::tower::ServiceBackend;
951
952 let client = self.reqwest_client.clone().unwrap_or_default();
953 let streaming = ReqwestBackend::new(client);
954 self.custom_backend = Some(Arc::new(ServiceBackend::new(service, streaming)));
955 self
956 }
957
958 #[cfg(feature = "tower")]
960 pub fn http_service_boxed(mut self, service: crate::tower::BoxHttpService) -> Self {
961 use crate::backend::ReqwestBackend;
962 use crate::tower::ServiceBackend;
963
964 let client = self.reqwest_client.clone().unwrap_or_default();
965 let streaming = ReqwestBackend::new(client);
966 self.custom_backend = Some(Arc::new(ServiceBackend::from_box(service, streaming)));
967 self
968 }
969
970 #[cfg(feature = "tower")]
992 pub fn transport_stack<F>(mut self, configure: F) -> Self
993 where
994 F: FnOnce(crate::tower::ReqwestHttpService) -> crate::tower::BoxHttpService,
995 {
996 use crate::backend::ReqwestBackend;
997 use crate::tower::ServiceBackend;
998
999 let client = self.reqwest_client.clone().unwrap_or_default();
1000 let streaming = ReqwestBackend::new(client.clone());
1001 let stacked = configure(crate::tower::ReqwestHttpService::new(client));
1002 self.custom_backend = Some(Arc::new(ServiceBackend::from_box(stacked, streaming)));
1003 self
1004 }
1005
1006 #[cfg(feature = "json")]
1027 pub fn json_parser<F>(mut self, f: F) -> Self
1028 where
1029 F: Fn(&bytes::Bytes) -> std::result::Result<serde_json::Value, String>
1030 + Send
1031 + Sync
1032 + 'static,
1033 {
1034 self.json_parser = Some(crate::json_parser::json_parser(f));
1035 self
1036 }
1037
1038 #[cfg(feature = "json")]
1040 pub fn json_parser_fn(mut self, parser: JsonParserFn) -> Self {
1041 self.json_parser = Some(parser);
1042 self
1043 }
1044
1045 pub fn build(self) -> Result<Client> {
1057 let base_url = self.base_url.ok_or(Error::MissingBaseUrl)?;
1058
1059 let backend: Arc<dyn HttpBackend> = if let Some(b) = self.custom_backend {
1060 b
1061 } else {
1062 let reqwest_client = self.reqwest_client.unwrap_or_default();
1063 Arc::new(ReqwestBackend::new(reqwest_client))
1064 };
1065
1066 let plugins = Arc::new(self.plugins);
1067 let merged_hooks = self.hooks.clone().merge(plugins.merged_hooks());
1068
1069 Ok(Client {
1070 config: Arc::new(ClientConfig {
1071 base_url,
1072 timeout: self.timeout,
1073 retry: self.retry,
1074 auth: self.auth,
1075 default_headers: self.default_headers,
1076 hooks: self.hooks,
1077 merged_hooks,
1078 plugins,
1079 max_in_flight: self.max_in_flight.map(|n| Arc::new(Semaphore::new(n))),
1080 #[cfg(feature = "schema")]
1081 schema_registry: self.schema_registry,
1082 #[cfg(feature = "json")]
1083 json_parser: self.json_parser,
1084 max_response_bytes: self.max_response_bytes,
1085 retry_body_peek_bytes: self
1086 .retry_body_peek_bytes
1087 .unwrap_or(RETRY_BODY_PEEK_DEFAULT),
1088 }),
1089 backend,
1090 })
1091 }
1092}
1093
1094impl Default for ClientBuilder {
1095 fn default() -> Self {
1096 Self::new()
1097 }
1098}