1use std::sync::Arc;
4use std::time::Duration;
5
6use reqwest::Client as ReqwestClient;
7use tokio::sync::Semaphore;
8use url::Url;
9
10use crate::auth::Auth;
11use crate::backend::{HttpBackend, ReqwestBackend};
12use crate::client::{Client, ClientConfig};
13use crate::error::Error;
14use crate::hooks::Hooks;
15use crate::plugin::PluginRegistry;
16use crate::request::parse_request_header;
17use crate::retry::RetryPolicy;
18use crate::streaming::RETRY_BODY_PEEK_DEFAULT;
19use crate::Result;
20
21#[cfg(feature = "json")]
22use crate::json_parser::JsonParserFn;
23
24#[cfg(feature = "schema")]
25use crate::schema::SchemaRegistry;
26
27#[must_use = "call `.build()` to create a `Client`"]
29pub struct ClientBuilder {
30 base_url: Option<Url>,
31 timeout: Option<Duration>,
32 retry: Option<RetryPolicy>,
33 auth: Option<Auth>,
34 default_headers: http::HeaderMap,
35 hooks: Hooks,
36 plugins: PluginRegistry,
37 reqwest_client: Option<ReqwestClient>,
38 custom_backend: Option<Arc<dyn HttpBackend>>,
39 max_in_flight: Option<usize>,
40 wire_concurrency_limit: Option<usize>,
42 max_response_bytes: Option<u64>,
43 retry_body_peek_bytes: Option<u64>,
44 #[cfg(feature = "schema")]
45 schema_registry: Option<Arc<SchemaRegistry>>,
46 #[cfg(feature = "json")]
47 json_parser: Option<JsonParserFn>,
48}
49
50impl ClientBuilder {
51 pub fn new() -> Self {
53 Self {
54 base_url: None,
55 timeout: None,
56 retry: None,
57 auth: None,
58 default_headers: http::HeaderMap::new(),
59 hooks: Hooks::default(),
60 plugins: PluginRegistry::new(),
61 reqwest_client: None,
62 custom_backend: None,
63 max_in_flight: None,
64 wire_concurrency_limit: None,
65 max_response_bytes: None,
66 retry_body_peek_bytes: None,
67 #[cfg(feature = "schema")]
68 schema_registry: None,
69 #[cfg(feature = "json")]
70 json_parser: None,
71 }
72 }
73
74 pub fn base_url(mut self, base_url: impl AsRef<str>) -> Result<Self> {
76 self.base_url = Some(Url::parse(base_url.as_ref()).map_err(Error::InvalidBaseUrl)?);
77 Ok(self)
78 }
79
80 pub fn timeout(mut self, timeout: Duration) -> Self {
82 self.timeout = Some(timeout);
83 self
84 }
85
86 pub fn retry(mut self, policy: RetryPolicy) -> Self {
88 self.retry = Some(policy);
89 self
90 }
91
92 pub fn auth(mut self, auth: Auth) -> Self {
94 self.auth = Some(auth);
95 self
96 }
97
98 pub fn default_header(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
100 let (name, value) = parse_request_header(key, value)?;
101 self.default_headers.insert(name, value);
102 Ok(self)
103 }
104
105 pub fn hooks(mut self, hooks: Hooks) -> Self {
107 self.hooks = hooks;
108 self
109 }
110
111 pub fn plugin<P: crate::plugin::Plugin + 'static>(mut self, plugin: P) -> Self {
113 self.plugins.push(Box::new(plugin));
114 self
115 }
116
117 pub fn reqwest_client(mut self, client: ReqwestClient) -> Self {
119 self.reqwest_client = Some(client);
120 self
121 }
122
123 pub fn backend(mut self, backend: Arc<dyn HttpBackend>) -> Self {
159 self.custom_backend = Some(backend);
160 warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
161 self
162 }
163
164 pub fn wire_concurrency_limit(mut self, limit: usize) -> Self {
169 self.wire_concurrency_limit = Some(limit);
170 warn_stacked_concurrency_limits(
171 self.max_in_flight,
172 self.custom_backend.is_some(),
173 self.wire_concurrency_limit,
174 );
175 self
176 }
177
178 pub fn max_in_flight(mut self, limit: usize) -> Self {
185 self.max_in_flight = Some(limit);
186 warn_stacked_concurrency_limits(
187 self.max_in_flight,
188 self.custom_backend.is_some(),
189 self.wire_concurrency_limit,
190 );
191 self
192 }
193
194 pub fn max_response_bytes(mut self, limit: u64) -> Self {
198 self.max_response_bytes = Some(limit);
199 self
200 }
201
202 pub fn retry_body_peek_bytes(mut self, limit: u64) -> Self {
206 self.retry_body_peek_bytes = Some(limit);
207 self
208 }
209
210 #[cfg(feature = "schema")]
212 pub fn schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
213 self.schema_registry = Some(registry);
214 self
215 }
216
217 #[cfg(feature = "tower")]
222 pub fn http_service<S>(mut self, service: S) -> Self
223 where
224 S: tower::Service<
225 crate::backend::HttpRequest,
226 Response = crate::backend::HttpResponse,
227 Error = Error,
228 > + Clone
229 + Send
230 + 'static,
231 S::Future: Send + 'static,
232 {
233 use crate::tower::ServiceBackend;
234
235 let client = self.reqwest_client.clone().unwrap_or_default();
236 self.custom_backend = Some(Arc::new(ServiceBackend::buffered_with_reqwest_streaming(
237 service, client,
238 )));
239 warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
240 self
241 }
242
243 #[cfg(feature = "tower")]
247 pub fn http_service_boxed(mut self, service: crate::tower::BoxHttpService) -> Self {
248 use crate::tower::ServiceBackend;
249
250 let client = self.reqwest_client.clone().unwrap_or_default();
251 self.custom_backend = Some(Arc::new(ServiceBackend::new(
252 service,
253 crate::tower::ReqwestStreamingHttpService::new(client),
254 )));
255 warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
256 self
257 }
258
259 #[cfg(feature = "tower")]
287 pub fn transport_stack<F>(mut self, configure: F) -> Self
288 where
289 F: FnOnce(
290 crate::tower::ReqwestHttpService,
291 crate::tower::ReqwestStreamingHttpService,
292 ) -> (
293 crate::tower::BoxHttpService,
294 crate::tower::BoxStreamingHttpService,
295 ),
296 {
297 use crate::tower::ServiceBackend;
298
299 let client = self.reqwest_client.clone().unwrap_or_default();
300 let (buffered, streaming) = crate::tower::stack::build_dual(client, configure);
301 self.custom_backend = Some(Arc::new(ServiceBackend::from_boxes(buffered, streaming)));
302 warn_stacked_concurrency_limits(self.max_in_flight, true, self.wire_concurrency_limit);
303 self
304 }
305
306 #[cfg(feature = "json")]
327 pub fn json_parser<F>(mut self, f: F) -> Self
328 where
329 F: Fn(&bytes::Bytes) -> std::result::Result<serde_json::Value, String>
330 + Send
331 + Sync
332 + 'static,
333 {
334 self.json_parser = Some(crate::json_parser::json_parser(f));
335 self
336 }
337
338 #[cfg(feature = "json")]
340 pub fn json_parser_fn(mut self, parser: JsonParserFn) -> Self {
341 self.json_parser = Some(parser);
342 self
343 }
344
345 pub fn build(self) -> Result<Client> {
357 let base_url = self.base_url.ok_or(Error::MissingBaseUrl)?;
358
359 warn_stacked_concurrency_limits(
360 self.max_in_flight,
361 self.custom_backend.is_some(),
362 self.wire_concurrency_limit,
363 );
364
365 let backend: Arc<dyn HttpBackend> = if let Some(b) = self.custom_backend {
366 b
367 } else {
368 let reqwest_client = self.reqwest_client.unwrap_or_default();
369 Arc::new(ReqwestBackend::new(reqwest_client))
370 };
371
372 let plugins = Arc::new(self.plugins);
373 let merged_hooks = self.hooks.clone().merge(plugins.merged_hooks());
374
375 Ok(Client {
376 config: Arc::new(ClientConfig {
377 base_url,
378 timeout: self.timeout,
379 retry: self.retry,
380 auth: self.auth,
381 default_headers: self.default_headers,
382 hooks: self.hooks,
383 merged_hooks,
384 plugins,
385 max_in_flight: self.max_in_flight.map(|n| Arc::new(Semaphore::new(n))),
386 #[cfg(feature = "schema")]
387 schema_registry: self.schema_registry,
388 #[cfg(feature = "json")]
389 json_parser: self.json_parser,
390 max_response_bytes: self.max_response_bytes,
391 retry_body_peek_bytes: self
392 .retry_body_peek_bytes
393 .unwrap_or(RETRY_BODY_PEEK_DEFAULT),
394 }),
395 backend,
396 })
397 }
398}
399
400impl Default for ClientBuilder {
401 fn default() -> Self {
402 Self::new()
403 }
404}
405
406fn warn_stacked_concurrency_limits(
407 max_in_flight: Option<usize>,
408 has_custom_backend: bool,
409 wire_concurrency_limit: Option<usize>,
410) {
411 let Some(client_limit) = max_in_flight else {
412 return;
413 };
414 if !has_custom_backend {
415 return;
416 }
417 if wire_concurrency_limit == Some(client_limit) {
418 tracing::warn!(
419 client_max_in_flight = client_limit,
420 wire_concurrency_limit = client_limit,
421 "max_in_flight and Tower ConcurrencyLimitLayer use the same limit ({client_limit}); \
422 effective throughput is roughly halved because the client semaphore counts hooks and \
423 retries while Tower limits only transport hops. Use one cap or different values intentionally."
424 );
425 } else {
426 tracing::warn!(
427 client_max_in_flight = client_limit,
428 wire_concurrency_limit = ?wire_concurrency_limit,
429 "custom transport stack combined with max_in_flight: the client semaphore applies to the \
430 full request lifecycle (hooks and retries) while Tower ConcurrencyLimitLayer limits only \
431 transport concurrency. Call ClientBuilder::wire_concurrency_limit when your stack uses \
432 ConcurrencyLimitLayer so build() can detect matching numeric limits."
433 );
434 }
435}