1use std::{
2 sync::Arc,
3 time::{Duration, Instant},
4};
5
6use reqwest::{
7 header::{HeaderName, HeaderValue, ACCEPT},
8 Method, StatusCode,
9};
10use serde::de::DeserializeOwned;
11use tokio::time::sleep;
12
13#[cfg(all(feature = "client", feature = "streaming"))]
14use crate::chat::ChatStreamAdapter;
15use crate::chat::{CustomerChatRequestBuilder, CustomerProxyRequestBody};
16use crate::{
17 customers::CustomersClient,
18 errors::{Error, Result, RetryMetadata, TransportError, TransportErrorKind, ValidationError},
19 http::{
20 parse_api_error_parts, request_id_from_headers, HeaderList, ProxyOptions, RetryConfig,
21 StreamFormat,
22 },
23 telemetry::{HttpRequestMetrics, RequestContext, Telemetry, TokenUsageMetrics},
24 tiers::TiersClient,
25 types::{
26 APIKey, FrontendToken, FrontendTokenAutoProvisionRequest, FrontendTokenRequest, Model,
27 ProxyRequest, ProxyResponse,
28 },
29 API_KEY_HEADER, DEFAULT_BASE_URL, DEFAULT_CLIENT_HEADER, DEFAULT_CONNECT_TIMEOUT,
30 DEFAULT_REQUEST_TIMEOUT, REQUEST_ID_HEADER,
31};
32
33#[cfg(all(feature = "client", feature = "streaming"))]
34use crate::sse::StreamHandle;
35
36#[derive(Clone, Debug, Default)]
37pub struct Config {
38 pub base_url: Option<String>,
39 pub api_key: Option<String>,
40 pub access_token: Option<String>,
41 pub client_header: Option<String>,
42 pub http_client: Option<reqwest::Client>,
43 pub connect_timeout: Option<Duration>,
45 pub timeout: Option<Duration>,
47 pub retry: Option<RetryConfig>,
49 pub default_headers: Option<crate::http::HeaderList>,
51 pub default_metadata: Option<crate::http::HeaderList>,
53 pub metrics: Option<crate::telemetry::MetricsCallbacks>,
55}
56
57#[derive(Clone)]
58pub struct Client {
59 inner: Arc<ClientInner>,
60}
61
62pub(crate) struct ClientInner {
63 pub(crate) base_url: reqwest::Url,
64 pub(crate) api_key: Option<String>,
65 pub(crate) access_token: Option<String>,
66 pub(crate) client_header: Option<String>,
67 pub(crate) http: reqwest::Client,
68 pub(crate) request_timeout: Duration,
69 pub(crate) retry: RetryConfig,
70 pub(crate) default_headers: Option<crate::http::HeaderList>,
71 pub(crate) default_metadata: Option<crate::http::HeaderList>,
72 pub(crate) telemetry: Telemetry,
73}
74
75impl Client {
76 pub fn new(cfg: Config) -> Result<Self> {
84 let has_key = cfg
86 .api_key
87 .as_ref()
88 .map(|v| !v.trim().is_empty())
89 .unwrap_or(false);
90 let has_token = cfg
91 .access_token
92 .as_ref()
93 .map(|v| !v.trim().is_empty())
94 .unwrap_or(false);
95 if !has_key && !has_token {
96 return Err(Error::Validation(crate::errors::ValidationError::new(
97 "api key or access token is required",
98 )));
99 }
100
101 let base_source = cfg
102 .base_url
103 .clone()
104 .unwrap_or_else(|| DEFAULT_BASE_URL.to_string());
105 let base = format!("{}/", base_source.trim_end_matches('/'));
107 let base_url = reqwest::Url::parse(&base)
108 .map_err(|err| Error::Validation(format!("invalid base url: {err}").into()))?;
109
110 let connect_timeout = cfg.connect_timeout.unwrap_or(DEFAULT_CONNECT_TIMEOUT);
111 let request_timeout = cfg.timeout.unwrap_or(DEFAULT_REQUEST_TIMEOUT);
112 let retry = cfg.retry.unwrap_or_default();
113
114 let http = match cfg.http_client {
115 Some(client) => client,
116 None => reqwest::Client::builder()
117 .connect_timeout(connect_timeout)
118 .build()
119 .map_err(|err| TransportError {
120 kind: TransportErrorKind::Connect,
121 message: "failed to build http client".to_string(),
122 source: Some(err),
123 retries: None,
124 })?,
125 };
126
127 let client_header = cfg
128 .client_header
129 .filter(|s| !s.trim().is_empty())
130 .or_else(|| Some(DEFAULT_CLIENT_HEADER.to_string()));
131
132 Ok(Self {
133 inner: Arc::new(ClientInner {
134 base_url,
135 api_key: cfg.api_key.filter(|s| !s.trim().is_empty()),
136 access_token: cfg.access_token.filter(|s| !s.trim().is_empty()),
137 client_header,
138 http,
139 request_timeout,
140 retry,
141 default_headers: cfg.default_headers,
142 default_metadata: cfg.default_metadata,
143 telemetry: Telemetry::new(cfg.metrics),
144 }),
145 })
146 }
147
148 pub fn with_key(key: impl Into<String>) -> ClientBuilder {
162 ClientBuilder::new().api_key(key)
163 }
164
165 pub fn with_token(token: impl Into<String>) -> ClientBuilder {
179 ClientBuilder::new().access_token(token)
180 }
181
182 pub fn builder() -> ClientBuilder {
184 ClientBuilder::new()
185 }
186
187 pub fn llm(&self) -> LLMClient {
188 LLMClient {
189 inner: self.inner.clone(),
190 }
191 }
192
193 pub fn auth(&self) -> AuthClient {
194 AuthClient {
195 inner: self.inner.clone(),
196 }
197 }
198
199 pub fn customers(&self) -> CustomersClient {
200 CustomersClient {
201 inner: self.inner.clone(),
202 }
203 }
204
205 pub fn tiers(&self) -> TiersClient {
206 TiersClient {
207 inner: self.inner.clone(),
208 }
209 }
210}
211
212fn apply_header_list(
213 mut builder: reqwest::RequestBuilder,
214 headers: &HeaderList,
215) -> Result<reqwest::RequestBuilder> {
216 for entry in headers.iter() {
217 if !entry.is_valid() {
218 continue;
219 }
220 let name = HeaderName::from_bytes(entry.key.trim().as_bytes())
221 .map_err(|err| Error::Validation(format!("invalid header name: {err}").into()))?;
222 let val = HeaderValue::from_str(entry.value.trim())
223 .map_err(|err| Error::Validation(format!("invalid header value: {err}").into()))?;
224 builder = builder.header(name, val);
225 }
226 Ok(builder)
227}
228
229#[derive(Clone)]
230pub struct LLMClient {
231 inner: Arc<ClientInner>,
232}
233
234impl LLMClient {
235 pub async fn proxy(&self, req: ProxyRequest, options: ProxyOptions) -> Result<ProxyResponse> {
236 self.inner.ensure_auth()?;
237 let req = self.inner.apply_metadata(req, &options.metadata);
238 req.validate()?;
239 let mut builder = self.inner.request(Method::POST, "/llm/proxy")?.json(&req);
240 builder = self.inner.with_headers(
241 builder,
242 options.request_id.as_deref(),
243 &options.headers,
244 Some("application/json"),
245 )?;
246
247 builder = self.inner.with_timeout(builder, options.timeout, true);
248 let retry = options
249 .retry
250 .clone()
251 .unwrap_or_else(|| self.inner.retry.clone());
252
253 let ctx = self.inner.make_context(
254 &Method::POST,
255 "/llm/proxy",
256 Some(req.model.clone()),
257 options.request_id.clone(),
258 );
259 let resp = self
260 .inner
261 .send_with_retry(builder, Method::POST, retry, ctx)
262 .await?;
263 let request_id = request_id_from_headers(resp.headers()).or(options.request_id);
264
265 let bytes = resp
266 .bytes()
267 .await
268 .map_err(|err| self.inner.to_transport_error(err, None))?;
269 let mut payload: ProxyResponse =
270 serde_json::from_slice(&bytes).map_err(Error::Serialization)?;
271 payload.request_id = request_id;
272 if self.inner.telemetry.usage_enabled() {
273 let ctx = RequestContext::new(Method::POST.as_str(), "/llm/proxy")
274 .with_model(Some(payload.model.clone()))
275 .with_request_id(payload.request_id.clone())
276 .with_response_id(Some(payload.id.clone()));
277 self.inner.telemetry.record_usage(TokenUsageMetrics {
278 usage: payload.usage.clone(),
279 context: ctx,
280 });
281 }
282 Ok(payload)
283 }
284
285 #[cfg(feature = "streaming")]
286 pub async fn proxy_stream(
287 &self,
288 req: ProxyRequest,
289 options: ProxyOptions,
290 ) -> Result<StreamHandle> {
291 self.inner.ensure_auth()?;
292 let req = self.inner.apply_metadata(req, &options.metadata);
293 req.validate()?;
294 let mut builder = self.inner.request(Method::POST, "/llm/proxy")?.json(&req);
295 let accept = match options.stream_format {
296 StreamFormat::Ndjson => "application/x-ndjson",
297 StreamFormat::Sse => "text/event-stream",
298 };
299 builder = self.inner.with_headers(
300 builder,
301 options.request_id.as_deref(),
302 &options.headers,
303 Some(accept),
304 )?;
305
306 builder = self.inner.with_timeout(builder, options.timeout, false);
307 let retry = options
308 .retry
309 .clone()
310 .unwrap_or_else(|| self.inner.retry.clone());
311
312 let mut ctx = self.inner.make_context(
313 &Method::POST,
314 "/llm/proxy",
315 Some(req.model.clone()),
316 options.request_id.clone(),
317 );
318 let stream_start = Instant::now();
319 let resp = self
320 .inner
321 .send_with_retry(builder, Method::POST, retry, ctx.clone())
322 .await?;
323 let request_id = request_id_from_headers(resp.headers()).or(options.request_id);
324 ctx = ctx.with_request_id(request_id.clone());
325 let stream_telemetry = self.inner.telemetry.stream_state(ctx, Some(stream_start));
326
327 Ok(match options.stream_format {
328 StreamFormat::Ndjson => StreamHandle::new_ndjson(resp, request_id, stream_telemetry),
329 StreamFormat::Sse => StreamHandle::new(resp, request_id, stream_telemetry),
330 })
331 }
332
333 #[cfg(all(feature = "client", feature = "streaming"))]
335 pub async fn proxy_stream_deltas(
336 &self,
337 req: ProxyRequest,
338 options: ProxyOptions,
339 ) -> Result<std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<String>> + Send>>> {
340 let stream = self.proxy_stream(req, options).await?;
341 Ok(Box::pin(
342 ChatStreamAdapter::<crate::StreamHandle>::new(stream).into_stream(),
343 ))
344 }
345
346 pub fn for_customer(&self, customer_id: impl Into<String>) -> CustomerChatRequestBuilder {
351 CustomerChatRequestBuilder::new(customer_id)
352 }
353
354 pub async fn proxy_customer(
358 &self,
359 customer_id: &str,
360 body: CustomerProxyRequestBody,
361 options: ProxyOptions,
362 ) -> Result<ProxyResponse> {
363 if customer_id.is_empty() {
364 return Err(Error::Validation(ValidationError::new(
365 "customer ID is required",
366 )));
367 }
368 self.inner.ensure_auth()?;
369 let mut builder = self.inner.request(Method::POST, "/llm/proxy")?.json(&body);
370 let options = options.with_header(crate::chat::CUSTOMER_ID_HEADER, customer_id);
372 builder = self.inner.with_headers(
373 builder,
374 options.request_id.as_deref(),
375 &options.headers,
376 Some("application/json"),
377 )?;
378
379 builder = self.inner.with_timeout(builder, options.timeout, true);
380 let retry = options
381 .retry
382 .clone()
383 .unwrap_or_else(|| self.inner.retry.clone());
384
385 let ctx = self.inner.make_context(
386 &Method::POST,
387 "/llm/proxy",
388 None,
389 options.request_id.clone(),
390 );
391 let resp = self
392 .inner
393 .send_with_retry(builder, Method::POST, retry, ctx)
394 .await?;
395 let request_id = request_id_from_headers(resp.headers()).or(options.request_id);
396
397 let bytes = resp
398 .bytes()
399 .await
400 .map_err(|err| self.inner.to_transport_error(err, None))?;
401 let mut payload: ProxyResponse =
402 serde_json::from_slice(&bytes).map_err(Error::Serialization)?;
403 payload.request_id = request_id;
404 if self.inner.telemetry.usage_enabled() {
405 let ctx = RequestContext::new(Method::POST.as_str(), "/llm/proxy")
406 .with_model(Some(payload.model.clone()))
407 .with_request_id(payload.request_id.clone())
408 .with_response_id(Some(payload.id.clone()));
409 self.inner.telemetry.record_usage(TokenUsageMetrics {
410 usage: payload.usage.clone(),
411 context: ctx,
412 });
413 }
414 Ok(payload)
415 }
416
417 #[cfg(feature = "streaming")]
421 pub async fn proxy_customer_stream(
422 &self,
423 customer_id: &str,
424 body: CustomerProxyRequestBody,
425 options: ProxyOptions,
426 ) -> Result<StreamHandle> {
427 if customer_id.is_empty() {
428 return Err(Error::Validation(ValidationError::new(
429 "customer ID is required",
430 )));
431 }
432 self.inner.ensure_auth()?;
433 let mut builder = self.inner.request(Method::POST, "/llm/proxy")?.json(&body);
434 let options = options.with_header(crate::chat::CUSTOMER_ID_HEADER, customer_id);
436 let accept = match options.stream_format {
437 StreamFormat::Ndjson => "application/x-ndjson",
438 StreamFormat::Sse => "text/event-stream",
439 };
440 builder = self.inner.with_headers(
441 builder,
442 options.request_id.as_deref(),
443 &options.headers,
444 Some(accept),
445 )?;
446
447 builder = self.inner.with_timeout(builder, options.timeout, false);
448 let retry = options
449 .retry
450 .clone()
451 .unwrap_or_else(|| self.inner.retry.clone());
452
453 let mut ctx = self.inner.make_context(
454 &Method::POST,
455 "/llm/proxy",
456 None,
457 options.request_id.clone(),
458 );
459 let stream_start = Instant::now();
460 let resp = self
461 .inner
462 .send_with_retry(builder, Method::POST, retry, ctx.clone())
463 .await?;
464 let request_id = request_id_from_headers(resp.headers()).or(options.request_id);
465 ctx = ctx.with_request_id(request_id.clone());
466 let stream_telemetry = self.inner.telemetry.stream_state(ctx, Some(stream_start));
467
468 Ok(match options.stream_format {
469 StreamFormat::Ndjson => StreamHandle::new_ndjson(resp, request_id, stream_telemetry),
470 StreamFormat::Sse => StreamHandle::new(resp, request_id, stream_telemetry),
471 })
472 }
473}
474
475#[derive(Clone)]
476pub struct AuthClient {
477 inner: Arc<ClientInner>,
478}
479
480impl AuthClient {
481 pub async fn frontend_token(&self, req: FrontendTokenRequest) -> Result<FrontendToken> {
483 if req.customer_id.trim().is_empty() {
484 return Err(Error::Validation(
485 ValidationError::new("customer_id is required").with_field("customer_id"),
486 ));
487 }
488 if req.publishable_key.trim().is_empty() {
489 return Err(Error::Validation(
490 ValidationError::new("publishable key is required").with_field("publishable_key"),
491 ));
492 }
493
494 self.send_frontend_token_request(&req).await
495 }
496
497 pub async fn frontend_token_auto_provision(
500 &self,
501 req: FrontendTokenAutoProvisionRequest,
502 ) -> Result<FrontendToken> {
503 if req.customer_id.trim().is_empty() {
504 return Err(Error::Validation(
505 ValidationError::new("customer_id is required").with_field("customer_id"),
506 ));
507 }
508 if req.publishable_key.trim().is_empty() {
509 return Err(Error::Validation(
510 ValidationError::new("publishable key is required").with_field("publishable_key"),
511 ));
512 }
513 if req.email.trim().is_empty() {
514 return Err(Error::Validation(
515 ValidationError::new("email is required for auto-provisioning").with_field("email"),
516 ));
517 }
518
519 self.send_frontend_token_request(&req).await
520 }
521
522 async fn send_frontend_token_request<T: serde::Serialize>(
523 &self,
524 req: &T,
525 ) -> Result<FrontendToken> {
526 let mut builder = self
527 .inner
528 .request(Method::POST, "/auth/frontend-token")?
529 .json(req);
530 builder = self.inner.with_headers(
531 builder,
532 None,
533 &HeaderList::default(),
534 Some("application/json"),
535 )?;
536
537 builder = self.inner.with_timeout(builder, None, true);
538 let ctx = self
539 .inner
540 .make_context(&Method::POST, "/auth/frontend-token", None, None);
541 self.inner
542 .execute_json(builder, Method::POST, None, ctx)
543 .await
544 }
545}
546
547impl ClientInner {
548 pub(crate) fn request(&self, method: Method, path: &str) -> Result<reqwest::RequestBuilder> {
549 let url = if path.starts_with("http://") || path.starts_with("https://") {
550 reqwest::Url::parse(path).map_err(|err| Error::Validation(err.to_string().into()))?
551 } else {
552 let rel = path.trim_start_matches('/');
553 self.base_url
554 .join(rel)
555 .map_err(|err| Error::Validation(format!("invalid path: {err}").into()))?
556 };
557 Ok(self.http.request(method, url))
558 }
559
560 pub(crate) fn with_headers(
561 &self,
562 mut builder: reqwest::RequestBuilder,
563 request_id: Option<&str>,
564 headers: &HeaderList,
565 accept: Option<&str>,
566 ) -> Result<reqwest::RequestBuilder> {
567 if let Some(accept) = accept {
568 builder = builder.header(ACCEPT, accept);
569 }
570 if let Some(req_id) = request_id {
571 if !req_id.trim().is_empty() {
572 builder = builder.header(REQUEST_ID_HEADER, req_id);
573 }
574 }
575 if let Some(client_header) = self.client_header.as_deref() {
576 builder = builder.header("X-ModelRelay-Client", client_header);
577 }
578 builder = self.apply_auth(builder);
579
580 if let Some(defaults) = &self.default_headers {
581 builder = apply_header_list(builder, defaults)?;
582 }
583 builder = apply_header_list(builder, headers)?;
584
585 Ok(builder)
586 }
587
588 pub(crate) fn with_timeout(
589 &self,
590 builder: reqwest::RequestBuilder,
591 timeout: Option<Duration>,
592 use_default: bool,
593 ) -> reqwest::RequestBuilder {
594 if let Some(duration) = timeout {
595 builder.timeout(duration)
596 } else if use_default {
597 builder.timeout(self.request_timeout)
598 } else {
599 builder
600 }
601 }
602
603 fn ensure_auth(&self) -> Result<()> {
604 if self
605 .api_key
606 .as_ref()
607 .map(|v| !v.trim().is_empty())
608 .unwrap_or(false)
609 || self
610 .access_token
611 .as_ref()
612 .map(|v| !v.trim().is_empty())
613 .unwrap_or(false)
614 {
615 return Ok(());
616 }
617 Err(Error::Validation(ValidationError::new(
618 "api key or access token is required",
619 )))
620 }
621
622 fn apply_auth(&self, mut builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
623 if let Some(token) = &self.access_token {
624 let bearer = token
625 .trim()
626 .strip_prefix("Bearer ")
627 .or_else(|| token.trim().strip_prefix("bearer "))
628 .unwrap_or(token.trim());
629 builder = builder.bearer_auth(bearer.to_string());
630 }
631 if let Some(key) = &self.api_key {
632 builder = builder.header(API_KEY_HEADER, key);
633 }
634 builder
635 }
636
637 fn apply_metadata(&self, mut req: ProxyRequest, metadata: &Option<HeaderList>) -> ProxyRequest {
638 if let Some(default_meta) = &self.default_metadata {
639 let mut map = req.metadata.unwrap_or_default();
640 for entry in default_meta.iter() {
641 if entry.is_valid() {
642 map.entry(entry.key.clone())
643 .or_insert_with(|| entry.value.clone());
644 }
645 }
646 req.metadata = Some(map);
647 }
648 if let Some(meta) = metadata {
649 let mut map = req.metadata.unwrap_or_default();
650 for entry in meta.iter() {
651 if entry.is_valid() {
652 map.insert(entry.key.clone(), entry.value.clone());
653 }
654 }
655 req.metadata = Some(map);
656 }
657 req
658 }
659
660 pub(crate) fn make_context(
661 &self,
662 method: &Method,
663 path: &str,
664 model: Option<Model>,
665 request_id: Option<String>,
666 ) -> RequestContext {
667 RequestContext::new(method.as_str(), path)
668 .with_model(model)
669 .with_request_id(request_id)
670 }
671 pub(crate) async fn execute_json<T: DeserializeOwned>(
672 &self,
673 builder: reqwest::RequestBuilder,
674 method: Method,
675 retry: Option<RetryConfig>,
676 ctx: RequestContext,
677 ) -> Result<T> {
678 let retry_cfg = retry.unwrap_or_else(|| self.retry.clone());
679 let resp = self
680 .send_with_retry(builder, method, retry_cfg, ctx)
681 .await?;
682 let bytes = resp
683 .bytes()
684 .await
685 .map_err(|err| self.to_transport_error(err, None))?;
686 let parsed = serde_json::from_slice::<T>(&bytes).map_err(Error::Serialization)?;
687 Ok(parsed)
688 }
689
690 pub(crate) async fn send_with_retry(
691 &self,
692 builder: reqwest::RequestBuilder,
693 method: Method,
694 retry: RetryConfig,
695 ctx: RequestContext,
696 ) -> Result<reqwest::Response> {
697 let max_attempts = retry.max_attempts.max(1);
698 let mut state = RetryState::new();
699 let start = Instant::now();
700
701 for attempt in 1..=max_attempts {
702 let attempt_builder = builder.try_clone().ok_or_else(|| {
703 Error::Validation("request body is not cloneable for retry".into())
704 })?;
705 #[cfg(feature = "tracing")]
706 let span = tracing::debug_span!(
707 "modelrelay.http",
708 method = %ctx.method,
709 path = %ctx.path,
710 attempt,
711 max_attempts
712 );
713 #[cfg(feature = "tracing")]
714 let _guard = span.enter();
715 let result = attempt_builder.send().await;
716
717 match result {
718 Ok(resp) => {
719 let status = resp.status();
720 if status.is_success() {
721 let mut http_ctx = ctx.clone();
722 if http_ctx.request_id.is_none() {
723 http_ctx.request_id =
724 request_id_from_headers(resp.headers()).or(http_ctx.request_id);
725 }
726 if self.telemetry.http_enabled() {
727 self.telemetry.record_http(HttpRequestMetrics {
728 latency: start.elapsed(),
729 status: Some(status.as_u16()),
730 error: None,
731 retries: state.metadata(),
732 context: http_ctx,
733 });
734 }
735 #[cfg(feature = "tracing")]
736 tracing::debug!(
737 status = %status,
738 elapsed_ms = start.elapsed().as_millis() as u64,
739 "request completed"
740 );
741 return Ok(resp);
742 }
743 state.record_attempt(attempt);
744 state.record_status(status);
745
746 let should_retry = retry.should_retry_status(&method, status);
747 if should_retry && attempt < max_attempts {
748 sleep(retry.backoff_delay(attempt)).await;
749 continue;
750 }
751
752 let retries = state.metadata();
753 let headers = resp.headers().clone();
754 let mut http_ctx = ctx.clone();
755 if http_ctx.request_id.is_none() {
756 http_ctx.request_id =
757 request_id_from_headers(&headers).or(http_ctx.request_id);
758 }
759 if self.telemetry.http_enabled() {
760 self.telemetry.record_http(HttpRequestMetrics {
761 latency: start.elapsed(),
762 status: Some(status.as_u16()),
763 error: Some(format!("http {}", status.as_u16())),
764 retries: retries.clone(),
765 context: http_ctx,
766 });
767 }
768 #[cfg(feature = "tracing")]
769 tracing::warn!(
770 status = %status,
771 attempt,
772 "request failed; returning error"
773 );
774 let body = resp.text().await.unwrap_or_default();
775 return Err(parse_api_error_parts(status, &headers, body, retries));
776 }
777 Err(err) => {
778 state.record_attempt(attempt);
779 state.record_error(&err);
780 let should_retry = retry.should_retry_error(&method, &err);
781 if should_retry && attempt < max_attempts {
782 sleep(retry.backoff_delay(attempt)).await;
783 continue;
784 }
785
786 let retries = state.metadata();
787 if self.telemetry.http_enabled() {
788 self.telemetry.record_http(HttpRequestMetrics {
789 latency: start.elapsed(),
790 status: None,
791 error: Some(err.to_string()),
792 retries: retries.clone(),
793 context: ctx.clone(),
794 });
795 }
796 #[cfg(feature = "tracing")]
797 tracing::warn!(attempt, error = %err, "transport error");
798 return Err(self.to_transport_error(err, retries));
799 }
800 }
801 }
802
803 Err(Error::Transport(TransportError {
804 kind: TransportErrorKind::Other,
805 message: "request failed".to_string(),
806 source: None,
807 retries: state.metadata(),
808 }))
809 }
810
811 fn to_transport_error(&self, err: reqwest::Error, retries: Option<RetryMetadata>) -> Error {
812 let kind = if err.is_timeout() {
813 TransportErrorKind::Timeout
814 } else if err.is_connect() {
815 TransportErrorKind::Connect
816 } else if err.is_request() {
817 TransportErrorKind::Request
818 } else {
819 TransportErrorKind::Other
820 };
821
822 TransportError {
823 kind,
824 message: err.to_string(),
825 source: Some(err),
826 retries,
827 }
828 .into()
829 }
830}
831
832#[derive(Default)]
833struct RetryState {
834 attempts: u32,
835 last_status: Option<u16>,
836 last_error: Option<String>,
837}
838
839impl RetryState {
840 fn new() -> Self {
841 Self {
842 attempts: 0,
843 last_status: None,
844 last_error: None,
845 }
846 }
847
848 fn record_attempt(&mut self, attempt: u32) {
849 self.attempts = attempt;
850 }
851
852 fn record_status(&mut self, status: StatusCode) {
853 self.last_status = Some(status.as_u16());
854 }
855
856 fn record_error(&mut self, err: &reqwest::Error) {
857 self.last_error = Some(err.to_string());
858 }
859
860 fn metadata(&self) -> Option<RetryMetadata> {
861 if self.attempts <= 1 {
862 None
863 } else {
864 Some(RetryMetadata {
865 attempts: self.attempts,
866 last_status: self.last_status,
867 last_error: self.last_error.clone(),
868 })
869 }
870 }
871}
872
873#[derive(serde::Deserialize)]
874struct APIKeyResponse {
875 #[serde(rename = "api_key")]
876 api_key: APIKey,
877}
878
879#[derive(Clone, Debug, Default)]
900pub struct ClientBuilder {
901 config: Config,
902}
903
904impl ClientBuilder {
905 pub fn new() -> Self {
907 Self {
908 config: Config::default(),
909 }
910 }
911
912 pub fn api_key(mut self, key: impl Into<String>) -> Self {
916 self.config.api_key = Some(key.into());
917 self
918 }
919
920 pub fn access_token(mut self, token: impl Into<String>) -> Self {
922 self.config.access_token = Some(token.into());
923 self
924 }
925
926 pub fn base_url(mut self, url: impl Into<String>) -> Self {
928 self.config.base_url = Some(url.into());
929 self
930 }
931
932 pub fn http_client(mut self, client: reqwest::Client) -> Self {
934 self.config.http_client = Some(client);
935 self
936 }
937
938 pub fn client_header(mut self, header: impl Into<String>) -> Self {
940 self.config.client_header = Some(header.into());
941 self
942 }
943
944 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
946 self.config.connect_timeout = Some(timeout);
947 self
948 }
949
950 pub fn timeout(mut self, timeout: Duration) -> Self {
952 self.config.timeout = Some(timeout);
953 self
954 }
955
956 pub fn retry(mut self, retry: RetryConfig) -> Self {
958 self.config.retry = Some(retry);
959 self
960 }
961
962 pub fn default_headers(mut self, headers: crate::http::HeaderList) -> Self {
964 self.config.default_headers = Some(headers);
965 self
966 }
967
968 pub fn default_metadata(mut self, metadata: crate::http::HeaderList) -> Self {
970 self.config.default_metadata = Some(metadata);
971 self
972 }
973
974 pub fn metrics(mut self, callbacks: crate::telemetry::MetricsCallbacks) -> Self {
976 self.config.metrics = Some(callbacks);
977 self
978 }
979
980 pub fn build(self) -> Result<Client> {
984 Client::new(self.config)
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991
992 #[test]
993 fn client_new_requires_auth() {
994 let result = Client::new(Config::default());
996 match result {
997 Err(Error::Validation(v)) => {
998 assert!(v.to_string().contains("api key or access token"));
999 }
1000 Err(e) => panic!("expected ValidationError, got {:?}", e),
1001 Ok(_) => panic!("expected error, got Ok"),
1002 }
1003 }
1004
1005 #[test]
1006 fn client_new_accepts_api_key() {
1007 let result = Client::new(Config {
1008 api_key: Some("mr_sk_test".to_string()),
1009 ..Config::default()
1010 });
1011 assert!(result.is_ok());
1012 }
1013
1014 #[test]
1015 fn client_new_accepts_access_token() {
1016 let result = Client::new(Config {
1017 access_token: Some("eyJ...".to_string()),
1018 ..Config::default()
1019 });
1020 assert!(result.is_ok());
1021 }
1022
1023 #[test]
1024 fn client_new_rejects_empty_strings() {
1025 let result = Client::new(Config {
1027 api_key: Some("".to_string()),
1028 ..Config::default()
1029 });
1030 assert!(result.is_err());
1031
1032 let result = Client::new(Config {
1034 api_key: Some(" ".to_string()),
1035 ..Config::default()
1036 });
1037 assert!(result.is_err());
1038 }
1039
1040 #[test]
1041 fn client_with_key_creates_builder() {
1042 let client = Client::with_key("mr_sk_test").build();
1043 assert!(client.is_ok());
1044 }
1045
1046 #[test]
1047 fn client_with_token_creates_builder() {
1048 let client = Client::with_token("eyJ...").build();
1049 assert!(client.is_ok());
1050 }
1051}