1#[cfg(not(target_family = "wasm"))]
2use std::pin::Pin;
3
4use bytes::Bytes;
5#[cfg(not(target_family = "wasm"))]
6use futures::{stream::StreamExt, Stream};
7use reqwest::{header::HeaderMap, multipart::Form, Response};
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(not(target_family = "wasm"))]
11use crate::error::StreamError;
12use crate::{
13 config::{Config, OpenAIConfig},
14 error::{map_deserialization_error, ApiError, OpenAIError, WrappedError},
15 traits::AsyncTryFrom,
16 RequestOptions,
17};
18
19#[cfg(feature = "administration")]
20use crate::admin::Admin;
21#[cfg(feature = "chatkit")]
22use crate::chatkit::Chatkit;
23#[cfg(feature = "file")]
24use crate::file::Files;
25#[cfg(feature = "image")]
26use crate::image::Images;
27#[cfg(feature = "moderation")]
28use crate::moderation::Moderations;
29#[cfg(feature = "assistant")]
30#[allow(deprecated)]
31use crate::Assistants;
32#[cfg(feature = "audio")]
33use crate::Audio;
34#[cfg(feature = "batch")]
35use crate::Batches;
36#[cfg(feature = "chat-completion")]
37use crate::Chat;
38#[cfg(feature = "completions")]
39use crate::Completions;
40#[cfg(feature = "container")]
41use crate::Containers;
42#[cfg(feature = "responses")]
43use crate::Conversations;
44#[cfg(feature = "embedding")]
45use crate::Embeddings;
46#[cfg(feature = "evals")]
47use crate::Evals;
48#[cfg(feature = "finetuning")]
49use crate::FineTuning;
50#[cfg(feature = "model")]
51use crate::Models;
52#[cfg(feature = "realtime")]
53use crate::Realtime;
54#[cfg(feature = "responses")]
55use crate::Responses;
56#[cfg(feature = "skill")]
57use crate::Skills;
58#[cfg(feature = "assistant")]
59#[allow(deprecated)]
60use crate::Threads;
61#[cfg(feature = "upload")]
62use crate::Uploads;
63#[cfg(feature = "vectorstore")]
64use crate::VectorStores;
65#[cfg(feature = "video")]
66use crate::Videos;
67
68#[derive(Debug, Clone)]
69pub struct Client<C: Config> {
72 http_client: reqwest::Client,
73 config: C,
74 #[cfg(not(target_family = "wasm"))]
75 backoff: backoff::ExponentialBackoff,
76}
77
78impl<C: Config> Default for Client<C>
79where
80 C: Default,
81{
82 fn default() -> Self {
83 Self {
84 http_client: reqwest::Client::new(),
85 config: C::default(),
86 #[cfg(not(target_family = "wasm"))]
87 backoff: Default::default(),
88 }
89 }
90}
91
92impl Client<OpenAIConfig> {
93 pub fn new() -> Self {
95 Self::default()
96 }
97}
98
99impl<C: Config> Client<C> {
100 #[cfg(not(target_family = "wasm"))]
102 pub fn build(
103 http_client: reqwest::Client,
104 config: C,
105 backoff: backoff::ExponentialBackoff,
106 ) -> Self {
107 Self {
108 http_client,
109 config,
110 backoff,
111 }
112 }
113
114 #[cfg(target_family = "wasm")]
116 pub fn build(http_client: reqwest::Client, config: C) -> Self {
117 Self {
118 http_client,
119 config,
120 }
121 }
122
123 pub fn with_config(config: C) -> Self {
125 Self {
126 http_client: reqwest::Client::new(),
127 config,
128 #[cfg(not(target_family = "wasm"))]
129 backoff: Default::default(),
130 }
131 }
132
133 pub fn with_http_client(mut self, http_client: reqwest::Client) -> Self {
137 self.http_client = http_client;
138 self
139 }
140
141 #[cfg(not(target_family = "wasm"))]
143 pub fn with_backoff(mut self, backoff: backoff::ExponentialBackoff) -> Self {
144 self.backoff = backoff;
145 self
146 }
147
148 #[cfg(feature = "model")]
152 pub fn models(&self) -> Models<'_, C> {
153 Models::new(self)
154 }
155
156 #[cfg(feature = "completions")]
158 pub fn completions(&self) -> Completions<'_, C> {
159 Completions::new(self)
160 }
161
162 #[cfg(feature = "chat-completion")]
164 pub fn chat(&self) -> Chat<'_, C> {
165 Chat::new(self)
166 }
167
168 #[cfg(feature = "image")]
170 pub fn images(&self) -> Images<'_, C> {
171 Images::new(self)
172 }
173
174 #[cfg(feature = "moderation")]
176 pub fn moderations(&self) -> Moderations<'_, C> {
177 Moderations::new(self)
178 }
179
180 #[cfg(feature = "file")]
182 pub fn files(&self) -> Files<'_, C> {
183 Files::new(self)
184 }
185
186 #[cfg(feature = "upload")]
188 pub fn uploads(&self) -> Uploads<'_, C> {
189 Uploads::new(self)
190 }
191
192 #[cfg(feature = "finetuning")]
194 pub fn fine_tuning(&self) -> FineTuning<'_, C> {
195 FineTuning::new(self)
196 }
197
198 #[cfg(feature = "embedding")]
200 pub fn embeddings(&self) -> Embeddings<'_, C> {
201 Embeddings::new(self)
202 }
203
204 #[cfg(feature = "audio")]
206 pub fn audio(&self) -> Audio<'_, C> {
207 Audio::new(self)
208 }
209
210 #[cfg(feature = "video")]
212 pub fn videos(&self) -> Videos<'_, C> {
213 Videos::new(self)
214 }
215
216 #[cfg(feature = "assistant")]
218 #[deprecated(
219 note = "Assistants API is deprecated and will be removed in August 2026. Use the Responses API."
220 )]
221 #[allow(deprecated)]
222 pub fn assistants(&self) -> Assistants<'_, C> {
223 Assistants::new(self)
224 }
225
226 #[cfg(feature = "assistant")]
228 #[deprecated(
229 note = "Assistants API is deprecated and will be removed in August 2026. Use the Responses API."
230 )]
231 #[allow(deprecated)]
232 pub fn threads(&self) -> Threads<'_, C> {
233 Threads::new(self)
234 }
235
236 #[cfg(feature = "vectorstore")]
238 pub fn vector_stores(&self) -> VectorStores<'_, C> {
239 VectorStores::new(self)
240 }
241
242 #[cfg(feature = "batch")]
244 pub fn batches(&self) -> Batches<'_, C> {
245 Batches::new(self)
246 }
247
248 #[cfg(feature = "administration")]
251 pub fn admin(&self) -> Admin<'_, C> {
252 Admin::new(self)
253 }
254
255 #[cfg(feature = "responses")]
257 pub fn responses(&self) -> Responses<'_, C> {
258 Responses::new(self)
259 }
260
261 #[cfg(feature = "responses")]
263 pub fn conversations(&self) -> Conversations<'_, C> {
264 Conversations::new(self)
265 }
266
267 #[cfg(feature = "container")]
269 pub fn containers(&self) -> Containers<'_, C> {
270 Containers::new(self)
271 }
272
273 #[cfg(feature = "skill")]
275 pub fn skills(&self) -> Skills<'_, C> {
276 Skills::new(self)
277 }
278
279 #[cfg(feature = "evals")]
281 pub fn evals(&self) -> Evals<'_, C> {
282 Evals::new(self)
283 }
284
285 #[cfg(feature = "chatkit")]
286 pub fn chatkit(&self) -> Chatkit<'_, C> {
287 Chatkit::new(self)
288 }
289
290 #[cfg(feature = "realtime")]
292 pub fn realtime(&self) -> Realtime<'_, C> {
293 Realtime::new(self)
294 }
295
296 pub fn config(&self) -> &C {
297 &self.config
298 }
299
300 fn build_request_builder(
302 &self,
303 method: reqwest::Method,
304 path: &str,
305 request_options: &RequestOptions,
306 ) -> reqwest::RequestBuilder {
307 let mut request_builder = if let Some(path) = request_options.path() {
308 self.http_client
309 .request(method, self.config.url(path.as_str()))
310 } else {
311 self.http_client.request(method, self.config.url(path))
312 };
313
314 request_builder = request_builder
315 .query(&self.config.query())
316 .headers(self.config.headers());
317
318 if let Some(headers) = request_options.headers() {
319 request_builder = request_builder.headers(headers.clone());
320 }
321
322 if !request_options.query().is_empty() {
323 request_builder = request_builder.query(request_options.query());
324 }
325
326 request_builder
327 }
328
329 #[allow(unused)]
331 pub(crate) async fn get<O>(
332 &self,
333 path: &str,
334 request_options: &RequestOptions,
335 ) -> Result<O, OpenAIError>
336 where
337 O: DeserializeOwned,
338 {
339 let request_maker = || async {
340 Ok(self
341 .build_request_builder(reqwest::Method::GET, path, request_options)
342 .build()?)
343 };
344
345 self.execute(request_maker).await
346 }
347
348 #[allow(unused)]
350 pub(crate) async fn delete<O>(
351 &self,
352 path: &str,
353 request_options: &RequestOptions,
354 ) -> Result<O, OpenAIError>
355 where
356 O: DeserializeOwned,
357 {
358 let request_maker = || async {
359 Ok(self
360 .build_request_builder(reqwest::Method::DELETE, path, request_options)
361 .build()?)
362 };
363
364 self.execute(request_maker).await
365 }
366
367 #[allow(unused)]
369 pub(crate) async fn get_raw(
370 &self,
371 path: &str,
372 request_options: &RequestOptions,
373 ) -> Result<(Bytes, HeaderMap), OpenAIError> {
374 let request_maker = || async {
375 Ok(self
376 .build_request_builder(reqwest::Method::GET, path, request_options)
377 .build()?)
378 };
379
380 self.execute_raw(request_maker).await
381 }
382
383 #[allow(unused)]
385 pub(crate) async fn post_raw<I>(
386 &self,
387 path: &str,
388 request: I,
389 request_options: &RequestOptions,
390 ) -> Result<(Bytes, HeaderMap), OpenAIError>
391 where
392 I: Serialize,
393 {
394 let request_maker = || async {
395 Ok(self
396 .build_request_builder(reqwest::Method::POST, path, request_options)
397 .json(&request)
398 .build()?)
399 };
400
401 self.execute_raw(request_maker).await
402 }
403
404 #[allow(unused)]
406 pub(crate) async fn post<I, O>(
407 &self,
408 path: &str,
409 request: I,
410 request_options: &RequestOptions,
411 ) -> Result<O, OpenAIError>
412 where
413 I: Serialize,
414 O: DeserializeOwned,
415 {
416 let request_maker = || async {
417 Ok(self
418 .build_request_builder(reqwest::Method::POST, path, request_options)
419 .json(&request)
420 .build()?)
421 };
422
423 self.execute(request_maker).await
424 }
425
426 #[allow(unused)]
428 pub(crate) async fn post_form_raw<F>(
429 &self,
430 path: &str,
431 form: F,
432 request_options: &RequestOptions,
433 ) -> Result<(Bytes, HeaderMap), OpenAIError>
434 where
435 Form: AsyncTryFrom<F, Error = OpenAIError>,
436 F: Clone,
437 {
438 let request_maker = || async {
439 Ok(self
440 .build_request_builder(reqwest::Method::POST, path, request_options)
441 .multipart(<Form as AsyncTryFrom<F>>::try_from(form.clone()).await?)
442 .build()?)
443 };
444
445 self.execute_raw(request_maker).await
446 }
447
448 #[allow(unused)]
450 pub(crate) async fn post_form<O, F>(
451 &self,
452 path: &str,
453 form: F,
454 request_options: &RequestOptions,
455 ) -> Result<O, OpenAIError>
456 where
457 O: DeserializeOwned,
458 Form: AsyncTryFrom<F, Error = OpenAIError>,
459 F: Clone,
460 {
461 let request_maker = || async {
462 Ok(self
463 .build_request_builder(reqwest::Method::POST, path, request_options)
464 .multipart(<Form as AsyncTryFrom<F>>::try_from(form.clone()).await?)
465 .build()?)
466 };
467
468 self.execute(request_maker).await
469 }
470
471 #[allow(unused)]
472 #[cfg(not(target_family = "wasm"))]
473 pub(crate) async fn post_form_stream<O, F>(
474 &self,
475 path: &str,
476 form: F,
477 request_options: &RequestOptions,
478 ) -> Result<Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>, OpenAIError>
479 where
480 F: Clone,
481 Form: AsyncTryFrom<F, Error = OpenAIError>,
482 O: DeserializeOwned + std::marker::Send + 'static,
483 {
484 let request_builder = self
486 .build_request_builder(reqwest::Method::POST, path, request_options)
487 .multipart(<Form as AsyncTryFrom<F>>::try_from(form.clone()).await?);
488
489 Ok(stream(request_builder).await)
490 }
491
492 #[cfg(not(target_family = "wasm"))]
498 async fn execute_raw<M, Fut>(&self, request_maker: M) -> Result<(Bytes, HeaderMap), OpenAIError>
499 where
500 M: Fn() -> Fut,
501 Fut: core::future::Future<Output = Result<reqwest::Request, OpenAIError>>,
502 {
503 let client = self.http_client.clone();
504
505 backoff::future::retry(self.backoff.clone(), || async {
506 let request = request_maker().await.map_err(backoff::Error::Permanent)?;
507 let response = client
508 .execute(request)
509 .await
510 .map_err(OpenAIError::Reqwest)
511 .map_err(backoff::Error::Permanent)?;
512
513 let status = response.status();
514
515 match read_response(response).await {
516 Ok((bytes, headers)) => Ok((bytes, headers)),
517 Err(e) => {
518 match e {
519 OpenAIError::ApiError(api_error) => {
520 if status.is_server_error() {
521 Err(backoff::Error::Transient {
522 err: OpenAIError::ApiError(api_error),
523 retry_after: None,
524 })
525 } else if status.as_u16() == 429
526 && api_error.r#type != Some("insufficient_quota".to_string())
527 {
528 tracing::warn!("Rate limited: {}", api_error.message);
530 Err(backoff::Error::Transient {
531 err: OpenAIError::ApiError(api_error),
532 retry_after: None,
533 })
534 } else {
535 Err(backoff::Error::Permanent(OpenAIError::ApiError(api_error)))
536 }
537 }
538 _ => Err(backoff::Error::Permanent(e)),
539 }
540 }
541 }
542 })
543 .await
544 }
545
546 #[cfg(target_family = "wasm")]
548 async fn execute_raw<M, Fut>(&self, request_maker: M) -> Result<(Bytes, HeaderMap), OpenAIError>
549 where
550 M: Fn() -> Fut,
551 Fut: core::future::Future<Output = Result<reqwest::Request, OpenAIError>>,
552 {
553 let request = request_maker().await?;
554 let response = self
555 .http_client
556 .execute(request)
557 .await
558 .map_err(OpenAIError::Reqwest)?;
559
560 read_response(response).await
561 }
562
563 async fn execute<O, M, Fut>(&self, request_maker: M) -> Result<O, OpenAIError>
569 where
570 O: DeserializeOwned,
571 M: Fn() -> Fut,
572 Fut: core::future::Future<Output = Result<reqwest::Request, OpenAIError>>,
573 {
574 let (bytes, _headers) = self.execute_raw(request_maker).await?;
575
576 let response: O = serde_json::from_slice(bytes.as_ref())
577 .map_err(|e| map_deserialization_error(e, bytes.as_ref()))?;
578
579 Ok(response)
580 }
581
582 #[allow(unused)]
584 #[cfg(not(target_family = "wasm"))]
585 pub(crate) async fn post_stream<I, O>(
586 &self,
587 path: &str,
588 request: I,
589 request_options: &RequestOptions,
590 ) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
591 where
592 I: Serialize,
593 O: DeserializeOwned + std::marker::Send + 'static,
594 {
595 let request_builder = self
596 .build_request_builder(reqwest::Method::POST, path, request_options)
597 .json(&request);
598
599 stream(request_builder).await
600 }
601
602 #[allow(unused)]
603 #[cfg(not(target_family = "wasm"))]
604 pub(crate) async fn post_stream_mapped_raw_events<I, O>(
605 &self,
606 path: &str,
607 request: I,
608 request_options: &RequestOptions,
609 event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
610 ) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
611 where
612 I: Serialize,
613 O: DeserializeOwned + std::marker::Send + 'static,
614 {
615 let request_builder = self
616 .build_request_builder(reqwest::Method::POST, path, request_options)
617 .json(&request);
618
619 stream_mapped_raw_events(request_builder, event_mapper).await
620 }
621
622 #[allow(unused)]
624 #[cfg(not(target_family = "wasm"))]
625 pub(crate) async fn get_stream<O>(
626 &self,
627 path: &str,
628 request_options: &RequestOptions,
629 ) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
630 where
631 O: DeserializeOwned + std::marker::Send + 'static,
632 {
633 let request_builder =
634 self.build_request_builder(reqwest::Method::GET, path, request_options);
635
636 stream(request_builder).await
637 }
638}
639
640async fn read_response(response: Response) -> Result<(Bytes, HeaderMap), OpenAIError> {
641 let status = response.status();
642 let headers = response.headers().clone();
643 let bytes = response.bytes().await.map_err(OpenAIError::Reqwest)?;
644
645 if status.is_server_error() {
646 let message: String = String::from_utf8_lossy(&bytes).into_owned();
648 tracing::warn!("Server error: {status} - {message}");
649 return Err(OpenAIError::ApiError(ApiError {
650 message,
651 r#type: None,
652 param: None,
653 code: None,
654 }));
655 }
656
657 if !status.is_success() {
659 let wrapped_error: WrappedError = serde_json::from_slice(bytes.as_ref())
660 .map_err(|e| map_deserialization_error(e, bytes.as_ref()))?;
661
662 return Err(OpenAIError::ApiError(wrapped_error.error));
663 }
664
665 Ok((bytes, headers))
666}
667
668#[cfg(not(target_family = "wasm"))]
671pub(crate) async fn stream<O>(
672 request_builder: reqwest::RequestBuilder,
673) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
674where
675 O: DeserializeOwned + std::marker::Send + 'static,
676{
677 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
678
679 tokio::spawn(async move {
680 let response = match request_builder.send().await {
681 Ok(r) => r,
682 Err(e) => {
683 let _ = tx.send(Err(OpenAIError::Reqwest(e)));
684 return;
685 }
686 };
687 if !response.status().is_success() {
688 if let Err(e) = read_response(response).await {
689 let _ = tx.send(Err(e));
690 }
691 return;
692 }
693 let byte_stream = response
694 .bytes_stream()
695 .map(|r| r.map_err(std::io::Error::other));
696 let mut event_stream = std::pin::pin!(eventsource_stream::EventStream::new(byte_stream));
697
698 while let Some(ev) = event_stream.next().await {
699 let event = match ev {
700 Ok(e) => e,
701 Err(e) => {
702 let _ = tx.send(Err(OpenAIError::StreamError(Box::new(
703 StreamError::EventStream(e.to_string()),
704 ))));
705 break;
706 }
707 };
708 if event.data == "[DONE]" {
709 break;
710 }
711 if event.event == "keepalive" {
712 continue;
713 }
714
715 let response = serde_json::from_str::<O>(&event.data)
716 .map_err(|e| map_deserialization_error(e, event.data.as_bytes()));
717
718 if tx.send(response).is_err() {
719 break;
720 }
721 }
722 });
723
724 Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
725}
726
727#[cfg(not(target_family = "wasm"))]
728pub(crate) async fn stream_mapped_raw_events<O>(
729 request_builder: reqwest::RequestBuilder,
730 event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
731) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
732where
733 O: DeserializeOwned + std::marker::Send + 'static,
734{
735 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
736
737 tokio::spawn(async move {
738 let response = match request_builder.send().await {
739 Ok(r) => r,
740 Err(e) => {
741 let _ = tx.send(Err(OpenAIError::Reqwest(e)));
742 return;
743 }
744 };
745 if !response.status().is_success() {
746 if let Err(e) = read_response(response).await {
747 let _ = tx.send(Err(e));
748 }
749 return;
750 }
751 let byte_stream = response
752 .bytes_stream()
753 .map(|r| r.map_err(std::io::Error::other));
754 let mut event_stream = std::pin::pin!(eventsource_stream::EventStream::new(byte_stream));
755
756 while let Some(ev) = event_stream.next().await {
757 let event = match ev {
758 Ok(e) => e,
759 Err(e) => {
760 let _ = tx.send(Err(OpenAIError::StreamError(Box::new(
761 StreamError::EventStream(e.to_string()),
762 ))));
763 break;
764 }
765 };
766 let done = event.data == "[DONE]";
767
768 if event.event == "keepalive" {
769 continue;
770 }
771
772 let response = event_mapper(event);
773
774 if tx.send(response).is_err() {
775 break;
776 }
777
778 if done {
779 break;
780 }
781 }
782 });
783
784 Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
785}