1use serde::Serialize;
4use url::Url;
5
6use crate::call::CallEndpoint;
7use crate::config::ClientConfig;
8use crate::error::ClientError;
9use crate::request_builder::{CallOverrides, RequestBuilder, ResponseMeta};
10use crate::retry::RetryPolicy;
11use crate::typed_response::TypedResponse;
12
13pub struct Client {
33 pub(crate) base_url: Url,
34 pub(crate) inner: reqwest::Client,
35 pub(crate) config: ClientConfig,
36}
37
38impl Client {
39 pub fn new(base_url: &str) -> Result<Self, ClientError> {
41 Self::with_config(base_url, ClientConfig::default())
42 }
43
44 pub fn with_config(base_url: &str, config: ClientConfig) -> Result<Self, ClientError> {
46 let base_url = Url::parse(base_url)?;
47 let mut builder = reqwest::Client::builder();
48 if let Some(timeout) = config.timeout {
49 builder = builder.timeout(timeout);
50 }
51 if let Some(connect_timeout) = config.connect_timeout {
52 builder = builder.connect_timeout(connect_timeout);
53 }
54 if !config.default_headers.is_empty() {
55 builder = builder.default_headers(config.default_headers.clone());
56 }
57 if config.cookie_store {
58 builder = builder.cookie_store(true);
59 }
60 let inner = builder.build().map_err(ClientError::Request)?;
61 Ok(Client {
62 base_url,
63 inner,
64 config,
65 })
66 }
67
68 pub fn with_reqwest(base_url: &str, client: reqwest::Client) -> Result<Self, ClientError> {
73 let base_url = Url::parse(base_url)?;
74 Ok(Client {
75 base_url,
76 inner: client,
77 config: ClientConfig::default(),
78 })
79 }
80
81 pub fn with_reqwest_and_config(
86 base_url: &str,
87 client: reqwest::Client,
88 config: ClientConfig,
89 ) -> Result<Self, ClientError> {
90 let base_url = Url::parse(base_url)?;
91 Ok(Client {
92 base_url,
93 inner: client,
94 config,
95 })
96 }
97
98 pub fn config(&self) -> &ClientConfig {
100 &self.config
101 }
102
103 pub async fn call<E: CallEndpoint>(&self, args: E::Args) -> Result<E::Response, ClientError> {
111 let policy = &self.config.retry_policy;
112
113 if policy.max_retries == 0 {
114 return self.call_once::<E>(&args).await;
115 }
116
117 self.call_with_retry::<E>(&args, policy).await
118 }
119
120 pub async fn call_with_query<E: CallEndpoint, Q: Serialize>(
136 &self,
137 args: E::Args,
138 query: &Q,
139 ) -> Result<E::Response, ClientError> {
140 let query_string = serde_urlencoded::to_string(query)
141 .map_err(|e| ClientError::Serialize(e.to_string()))?;
142 let overrides = CallOverrides {
143 extra_headers: None,
144 query_string: Some(query_string),
145 query_params: None,
146 timeout: None,
147 };
148 let policy = &self.config.retry_policy;
149
150 if policy.max_retries == 0 {
151 return self
152 .call_inner::<E>(&args, Some(&overrides))
153 .await
154 .map(|(_meta, body)| body);
155 }
156
157 self.call_with_retry_query::<E>(&args, &overrides, policy)
158 .await
159 }
160
161 pub async fn call_full<E: CallEndpoint>(
168 &self,
169 args: E::Args,
170 ) -> Result<TypedResponse<E::Response>, ClientError> {
171 let policy = &self.config.retry_policy;
172
173 if policy.max_retries == 0 {
174 let (meta, body) = self.call_inner::<E>(&args, None).await?;
175 return Ok(TypedResponse {
176 body,
177 status: meta.status,
178 headers: meta.headers,
179 });
180 }
181
182 self.call_with_retry_full::<E>(&args, policy).await
185 }
186
187 pub fn request<E: CallEndpoint>(&self, args: E::Args) -> RequestBuilder<'_, E> {
204 RequestBuilder::new(self, args)
205 }
206
207 async fn call_once<E: CallEndpoint>(&self, args: &E::Args) -> Result<E::Response, ClientError> {
209 self.call_inner::<E>(args, None)
210 .await
211 .map(|(_meta, body)| body)
212 }
213
214 pub(crate) async fn call_inner<E: CallEndpoint>(
220 &self,
221 args: &E::Args,
222 overrides: Option<&CallOverrides>,
223 ) -> Result<(ResponseMeta, E::Response), ClientError> {
224 let path = E::build_path(args);
225 let mut url = self.base_url.join(&path)?;
226 let method = E::method();
227
228 if let Some(ovr) = overrides {
230 if let Some(qs) = &ovr.query_string {
231 if !qs.is_empty() {
232 url.set_query(Some(qs));
233 }
234 }
235 }
236
237 if let Some(ovr) = overrides {
239 if let Some(params) = &ovr.query_params {
240 let mut pairs = url.query_pairs_mut();
241 for (key, value) in params {
242 pairs.append_pair(key, value);
243 }
244 }
245 }
246
247 let tracing_enabled = self.config.enable_tracing;
248 let start = if tracing_enabled {
249 Some(std::time::Instant::now())
250 } else {
251 None
252 };
253
254 let (trace_method, trace_url) = if tracing_enabled {
256 (Some(method.clone()), Some(url.clone()))
257 } else {
258 (None, None)
259 };
260
261 if tracing_enabled {
262 tracing::debug!(
263 http.method = %method,
264 http.url = %url,
265 "sending request"
266 );
267 }
268
269 let mut request = self.inner.request(method, url);
270
271 if let Some(body_result) = E::request_body(args) {
272 let body = body_result?;
273 request = request
274 .header(http::header::CONTENT_TYPE, "application/json")
275 .body(body);
276 }
277
278 if let Some(ovr) = overrides {
280 if let Some(headers) = &ovr.extra_headers {
281 for (name, value) in headers {
282 request = request.header(name, value);
283 }
284 }
285 if let Some(timeout) = ovr.timeout {
286 request = request.timeout(timeout);
287 }
288 }
289
290 for interceptor in &self.config.request_interceptors {
292 request = interceptor(request);
293 }
294
295 let response = match request.send().await {
296 Ok(resp) => resp,
297 Err(e) if e.is_timeout() => {
298 if let (Some(start), Some(m), Some(u)) = (start, &trace_method, &trace_url) {
299 tracing::debug!(
300 http.method = %m,
301 http.url = %u,
302 duration_ms = start.elapsed().as_millis() as u64,
303 "request timed out"
304 );
305 }
306 return Err(ClientError::Timeout);
307 }
308 Err(e) => {
309 if let (Some(start), Some(m), Some(u)) = (start, &trace_method, &trace_url) {
310 tracing::debug!(
311 http.method = %m,
312 http.url = %u,
313 duration_ms = start.elapsed().as_millis() as u64,
314 "request failed"
315 );
316 }
317 return Err(ClientError::Request(e));
318 }
319 };
320
321 for interceptor in &self.config.response_interceptors {
323 interceptor(&response);
324 }
325
326 let status = response.status();
327 let headers = response.headers().clone();
328
329 if let (Some(start), Some(m), Some(u)) = (start, &trace_method, &trace_url) {
330 tracing::debug!(
331 http.method = %m,
332 http.url = %u,
333 http.status = status.as_u16(),
334 duration_ms = start.elapsed().as_millis() as u64,
335 "received response"
336 );
337 }
338
339 if !status.is_success() {
340 let body = response.text().await.unwrap_or_default();
341 return Err(ClientError::Status { status, body });
342 }
343
344 let bytes = response.bytes().await?;
345 let body = E::parse_response(&bytes)?;
346
347 Ok((ResponseMeta { status, headers }, body))
348 }
349
350 async fn call_with_retry<E: CallEndpoint>(
352 &self,
353 args: &E::Args,
354 policy: &RetryPolicy,
355 ) -> Result<E::Response, ClientError> {
356 self.call_with_retry_full::<E>(args, policy)
357 .await
358 .map(|typed| typed.body)
359 }
360
361 async fn call_with_retry_full<E: CallEndpoint>(
363 &self,
364 args: &E::Args,
365 policy: &RetryPolicy,
366 ) -> Result<TypedResponse<E::Response>, ClientError> {
367 let mut last_error: ClientError;
368
369 match self.call_inner::<E>(args, None).await {
371 Ok((meta, body)) => {
372 return Ok(TypedResponse {
373 body,
374 status: meta.status,
375 headers: meta.headers,
376 });
377 }
378 Err(e) => {
379 if !Self::is_retryable(&e, policy) {
380 return Err(e);
381 }
382 last_error = e;
383 }
384 }
385
386 for attempt in 0..policy.max_retries {
388 let backoff = policy.backoff_for_attempt(attempt);
389 tokio::time::sleep(backoff).await;
390
391 match self.call_inner::<E>(args, None).await {
392 Ok((meta, body)) => {
393 return Ok(TypedResponse {
394 body,
395 status: meta.status,
396 headers: meta.headers,
397 });
398 }
399 Err(e) => {
400 if !Self::is_retryable(&e, policy) {
401 return Err(e);
402 }
403 last_error = e;
404 }
405 }
406 }
407
408 Err(ClientError::RetryExhausted {
409 last_error: Box::new(last_error),
410 attempts: policy.max_retries + 1,
411 })
412 }
413
414 async fn call_with_retry_query<E: CallEndpoint>(
416 &self,
417 args: &E::Args,
418 overrides: &CallOverrides,
419 policy: &RetryPolicy,
420 ) -> Result<E::Response, ClientError> {
421 let mut last_error: ClientError;
422
423 match self.call_inner::<E>(args, Some(overrides)).await {
425 Ok((_meta, body)) => return Ok(body),
426 Err(e) => {
427 if !Self::is_retryable(&e, policy) {
428 return Err(e);
429 }
430 last_error = e;
431 }
432 }
433
434 for attempt in 0..policy.max_retries {
436 let backoff = policy.backoff_for_attempt(attempt);
437 tokio::time::sleep(backoff).await;
438
439 match self.call_inner::<E>(args, Some(overrides)).await {
440 Ok((_meta, body)) => return Ok(body),
441 Err(e) => {
442 if !Self::is_retryable(&e, policy) {
443 return Err(e);
444 }
445 last_error = e;
446 }
447 }
448 }
449
450 Err(ClientError::RetryExhausted {
451 last_error: Box::new(last_error),
452 attempts: policy.max_retries + 1,
453 })
454 }
455
456 fn is_retryable(error: &ClientError, policy: &RetryPolicy) -> bool {
458 match error {
459 ClientError::Status { status, .. } => policy.should_retry_status(*status),
460 ClientError::Timeout | ClientError::Request(_) => {
461 if error.is_timeout() {
462 policy.retry_on_timeout
463 } else {
464 policy.retry_on_timeout
467 }
468 }
469 _ => false,
470 }
471 }
472}