Skip to main content

async_openai/
client.rs

1#[cfg(not(target_family = "wasm"))]
2use std::pin::Pin;
3
4use bytes::Bytes;
5#[cfg(not(target_family = "wasm"))]
6use futures::{stream::StreamExt, Stream};
7use reqwest::{header::HeaderMap, multipart::Form, Response};
8use serde::{de::DeserializeOwned, Serialize};
9
10#[cfg(not(target_family = "wasm"))]
11use crate::error::StreamError;
12use crate::{
13    config::{Config, OpenAIConfig},
14    error::{map_deserialization_error, ApiError, OpenAIError, WrappedError},
15    traits::AsyncTryFrom,
16    RequestOptions,
17};
18
19#[cfg(feature = "administration")]
20use crate::admin::Admin;
21#[cfg(feature = "chatkit")]
22use crate::chatkit::Chatkit;
23#[cfg(feature = "file")]
24use crate::file::Files;
25#[cfg(feature = "image")]
26use crate::image::Images;
27#[cfg(feature = "moderation")]
28use crate::moderation::Moderations;
29#[cfg(feature = "assistant")]
30#[allow(deprecated)]
31use crate::Assistants;
32#[cfg(feature = "audio")]
33use crate::Audio;
34#[cfg(feature = "batch")]
35use crate::Batches;
36#[cfg(feature = "chat-completion")]
37use crate::Chat;
38#[cfg(feature = "completions")]
39use crate::Completions;
40#[cfg(feature = "container")]
41use crate::Containers;
42#[cfg(feature = "responses")]
43use crate::Conversations;
44#[cfg(feature = "embedding")]
45use crate::Embeddings;
46#[cfg(feature = "evals")]
47use crate::Evals;
48#[cfg(feature = "finetuning")]
49use crate::FineTuning;
50#[cfg(feature = "model")]
51use crate::Models;
52#[cfg(feature = "realtime")]
53use crate::Realtime;
54#[cfg(feature = "responses")]
55use crate::Responses;
56#[cfg(feature = "skill")]
57use crate::Skills;
58#[cfg(feature = "assistant")]
59#[allow(deprecated)]
60use crate::Threads;
61#[cfg(feature = "upload")]
62use crate::Uploads;
63#[cfg(feature = "vectorstore")]
64use crate::VectorStores;
65#[cfg(feature = "video")]
66use crate::Videos;
67
68#[derive(Debug, Clone)]
69/// Client is a container for config, backoff and http_client
70/// used to make API calls.
71pub struct Client<C: Config> {
72    http_client: reqwest::Client,
73    config: C,
74    #[cfg(not(target_family = "wasm"))]
75    backoff: backoff::ExponentialBackoff,
76}
77
78impl<C: Config> Default for Client<C>
79where
80    C: Default,
81{
82    fn default() -> Self {
83        Self {
84            http_client: reqwest::Client::new(),
85            config: C::default(),
86            #[cfg(not(target_family = "wasm"))]
87            backoff: Default::default(),
88        }
89    }
90}
91
92impl Client<OpenAIConfig> {
93    /// Client with default [OpenAIConfig]
94    pub fn new() -> Self {
95        Self::default()
96    }
97}
98
99impl<C: Config> Client<C> {
100    /// Create client with a custom HTTP client, OpenAI config, and backoff.
101    #[cfg(not(target_family = "wasm"))]
102    pub fn build(
103        http_client: reqwest::Client,
104        config: C,
105        backoff: backoff::ExponentialBackoff,
106    ) -> Self {
107        Self {
108            http_client,
109            config,
110            backoff,
111        }
112    }
113
114    /// Create client with a custom HTTP client and config (WASM version without backoff).
115    #[cfg(target_family = "wasm")]
116    pub fn build(http_client: reqwest::Client, config: C) -> Self {
117        Self {
118            http_client,
119            config,
120        }
121    }
122
123    /// Create client with [OpenAIConfig] or [crate::config::AzureConfig]
124    pub fn with_config(config: C) -> Self {
125        Self {
126            http_client: reqwest::Client::new(),
127            config,
128            #[cfg(not(target_family = "wasm"))]
129            backoff: Default::default(),
130        }
131    }
132
133    /// Provide your own [client] to make HTTP requests with.
134    ///
135    /// [client]: reqwest::Client
136    pub fn with_http_client(mut self, http_client: reqwest::Client) -> Self {
137        self.http_client = http_client;
138        self
139    }
140
141    /// Exponential backoff for retrying [rate limited](https://platform.openai.com/docs/guides/rate-limits) requests.
142    #[cfg(not(target_family = "wasm"))]
143    pub fn with_backoff(mut self, backoff: backoff::ExponentialBackoff) -> Self {
144        self.backoff = backoff;
145        self
146    }
147
148    // API groups
149
150    /// To call [Models] group related APIs using this client.
151    #[cfg(feature = "model")]
152    pub fn models(&self) -> Models<'_, C> {
153        Models::new(self)
154    }
155
156    /// To call [Completions] group related APIs using this client.
157    #[cfg(feature = "completions")]
158    pub fn completions(&self) -> Completions<'_, C> {
159        Completions::new(self)
160    }
161
162    /// To call [Chat] group related APIs using this client.
163    #[cfg(feature = "chat-completion")]
164    pub fn chat(&self) -> Chat<'_, C> {
165        Chat::new(self)
166    }
167
168    /// To call [Images] group related APIs using this client.
169    #[cfg(feature = "image")]
170    pub fn images(&self) -> Images<'_, C> {
171        Images::new(self)
172    }
173
174    /// To call [Moderations] group related APIs using this client.
175    #[cfg(feature = "moderation")]
176    pub fn moderations(&self) -> Moderations<'_, C> {
177        Moderations::new(self)
178    }
179
180    /// To call [Files] group related APIs using this client.
181    #[cfg(feature = "file")]
182    pub fn files(&self) -> Files<'_, C> {
183        Files::new(self)
184    }
185
186    /// To call [Uploads] group related APIs using this client.
187    #[cfg(feature = "upload")]
188    pub fn uploads(&self) -> Uploads<'_, C> {
189        Uploads::new(self)
190    }
191
192    /// To call [FineTuning] group related APIs using this client.
193    #[cfg(feature = "finetuning")]
194    pub fn fine_tuning(&self) -> FineTuning<'_, C> {
195        FineTuning::new(self)
196    }
197
198    /// To call [Embeddings] group related APIs using this client.
199    #[cfg(feature = "embedding")]
200    pub fn embeddings(&self) -> Embeddings<'_, C> {
201        Embeddings::new(self)
202    }
203
204    /// To call [Audio] group related APIs using this client.
205    #[cfg(feature = "audio")]
206    pub fn audio(&self) -> Audio<'_, C> {
207        Audio::new(self)
208    }
209
210    /// To call [Videos] group related APIs using this client.
211    #[cfg(feature = "video")]
212    pub fn videos(&self) -> Videos<'_, C> {
213        Videos::new(self)
214    }
215
216    /// To call [Assistants] group related APIs using this client.
217    #[cfg(feature = "assistant")]
218    #[deprecated(
219        note = "Assistants API is deprecated and will be removed in August 2026. Use the Responses API."
220    )]
221    #[allow(deprecated)]
222    pub fn assistants(&self) -> Assistants<'_, C> {
223        Assistants::new(self)
224    }
225
226    /// To call [Threads] group related APIs using this client.
227    #[cfg(feature = "assistant")]
228    #[deprecated(
229        note = "Assistants API is deprecated and will be removed in August 2026. Use the Responses API."
230    )]
231    #[allow(deprecated)]
232    pub fn threads(&self) -> Threads<'_, C> {
233        Threads::new(self)
234    }
235
236    /// To call [VectorStores] group related APIs using this client.
237    #[cfg(feature = "vectorstore")]
238    pub fn vector_stores(&self) -> VectorStores<'_, C> {
239        VectorStores::new(self)
240    }
241
242    /// To call [Batches] group related APIs using this client.
243    #[cfg(feature = "batch")]
244    pub fn batches(&self) -> Batches<'_, C> {
245        Batches::new(self)
246    }
247
248    /// To call [Admin] group related APIs using this client.
249    /// This groups together admin API keys, invites, users, projects, audit logs, and certificates.
250    #[cfg(feature = "administration")]
251    pub fn admin(&self) -> Admin<'_, C> {
252        Admin::new(self)
253    }
254
255    /// To call [Responses] group related APIs using this client.
256    #[cfg(feature = "responses")]
257    pub fn responses(&self) -> Responses<'_, C> {
258        Responses::new(self)
259    }
260
261    /// To call [Conversations] group related APIs using this client.
262    #[cfg(feature = "responses")]
263    pub fn conversations(&self) -> Conversations<'_, C> {
264        Conversations::new(self)
265    }
266
267    /// To call [Containers] group related APIs using this client.
268    #[cfg(feature = "container")]
269    pub fn containers(&self) -> Containers<'_, C> {
270        Containers::new(self)
271    }
272
273    /// To call [Skills] group related APIs using this client.
274    #[cfg(feature = "skill")]
275    pub fn skills(&self) -> Skills<'_, C> {
276        Skills::new(self)
277    }
278
279    /// To call [Evals] group related APIs using this client.
280    #[cfg(feature = "evals")]
281    pub fn evals(&self) -> Evals<'_, C> {
282        Evals::new(self)
283    }
284
285    #[cfg(feature = "chatkit")]
286    pub fn chatkit(&self) -> Chatkit<'_, C> {
287        Chatkit::new(self)
288    }
289
290    /// To call [Realtime] group related APIs using this client.
291    #[cfg(feature = "realtime")]
292    pub fn realtime(&self) -> Realtime<'_, C> {
293        Realtime::new(self)
294    }
295
296    pub fn config(&self) -> &C {
297        &self.config
298    }
299
300    /// Helper function to build a request builder with common configuration
301    fn build_request_builder(
302        &self,
303        method: reqwest::Method,
304        path: &str,
305        request_options: &RequestOptions,
306    ) -> reqwest::RequestBuilder {
307        let mut request_builder = if let Some(path) = request_options.path() {
308            self.http_client
309                .request(method, self.config.url(path.as_str()))
310        } else {
311            self.http_client.request(method, self.config.url(path))
312        };
313
314        request_builder = request_builder
315            .query(&self.config.query())
316            .headers(self.config.headers());
317
318        if let Some(headers) = request_options.headers() {
319            request_builder = request_builder.headers(headers.clone());
320        }
321
322        if !request_options.query().is_empty() {
323            request_builder = request_builder.query(request_options.query());
324        }
325
326        request_builder
327    }
328
329    /// Make a GET request to {path} and deserialize the response body
330    #[allow(unused)]
331    pub(crate) async fn get<O>(
332        &self,
333        path: &str,
334        request_options: &RequestOptions,
335    ) -> Result<O, OpenAIError>
336    where
337        O: DeserializeOwned,
338    {
339        let request_maker = || async {
340            Ok(self
341                .build_request_builder(reqwest::Method::GET, path, request_options)
342                .build()?)
343        };
344
345        self.execute(request_maker).await
346    }
347
348    /// Make a DELETE request to {path} and deserialize the response body
349    #[allow(unused)]
350    pub(crate) async fn delete<O>(
351        &self,
352        path: &str,
353        request_options: &RequestOptions,
354    ) -> Result<O, OpenAIError>
355    where
356        O: DeserializeOwned,
357    {
358        let request_maker = || async {
359            Ok(self
360                .build_request_builder(reqwest::Method::DELETE, path, request_options)
361                .build()?)
362        };
363
364        self.execute(request_maker).await
365    }
366
367    /// Make a GET request to {path} and return the response body
368    #[allow(unused)]
369    pub(crate) async fn get_raw(
370        &self,
371        path: &str,
372        request_options: &RequestOptions,
373    ) -> Result<(Bytes, HeaderMap), OpenAIError> {
374        let request_maker = || async {
375            Ok(self
376                .build_request_builder(reqwest::Method::GET, path, request_options)
377                .build()?)
378        };
379
380        self.execute_raw(request_maker).await
381    }
382
383    /// Make a POST request to {path} and return the response body
384    #[allow(unused)]
385    pub(crate) async fn post_raw<I>(
386        &self,
387        path: &str,
388        request: I,
389        request_options: &RequestOptions,
390    ) -> Result<(Bytes, HeaderMap), OpenAIError>
391    where
392        I: Serialize,
393    {
394        let request_maker = || async {
395            Ok(self
396                .build_request_builder(reqwest::Method::POST, path, request_options)
397                .json(&request)
398                .build()?)
399        };
400
401        self.execute_raw(request_maker).await
402    }
403
404    /// Make a POST request to {path} and deserialize the response body
405    #[allow(unused)]
406    pub(crate) async fn post<I, O>(
407        &self,
408        path: &str,
409        request: I,
410        request_options: &RequestOptions,
411    ) -> Result<O, OpenAIError>
412    where
413        I: Serialize,
414        O: DeserializeOwned,
415    {
416        let request_maker = || async {
417            Ok(self
418                .build_request_builder(reqwest::Method::POST, path, request_options)
419                .json(&request)
420                .build()?)
421        };
422
423        self.execute(request_maker).await
424    }
425
426    /// POST a form at {path} and return the response body
427    #[allow(unused)]
428    pub(crate) async fn post_form_raw<F>(
429        &self,
430        path: &str,
431        form: F,
432        request_options: &RequestOptions,
433    ) -> Result<(Bytes, HeaderMap), OpenAIError>
434    where
435        Form: AsyncTryFrom<F, Error = OpenAIError>,
436        F: Clone,
437    {
438        let request_maker = || async {
439            Ok(self
440                .build_request_builder(reqwest::Method::POST, path, request_options)
441                .multipart(<Form as AsyncTryFrom<F>>::try_from(form.clone()).await?)
442                .build()?)
443        };
444
445        self.execute_raw(request_maker).await
446    }
447
448    /// POST a form at {path} and deserialize the response body
449    #[allow(unused)]
450    pub(crate) async fn post_form<O, F>(
451        &self,
452        path: &str,
453        form: F,
454        request_options: &RequestOptions,
455    ) -> Result<O, OpenAIError>
456    where
457        O: DeserializeOwned,
458        Form: AsyncTryFrom<F, Error = OpenAIError>,
459        F: Clone,
460    {
461        let request_maker = || async {
462            Ok(self
463                .build_request_builder(reqwest::Method::POST, path, request_options)
464                .multipart(<Form as AsyncTryFrom<F>>::try_from(form.clone()).await?)
465                .build()?)
466        };
467
468        self.execute(request_maker).await
469    }
470
471    #[allow(unused)]
472    #[cfg(not(target_family = "wasm"))]
473    pub(crate) async fn post_form_stream<O, F>(
474        &self,
475        path: &str,
476        form: F,
477        request_options: &RequestOptions,
478    ) -> Result<Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>, OpenAIError>
479    where
480        F: Clone,
481        Form: AsyncTryFrom<F, Error = OpenAIError>,
482        O: DeserializeOwned + std::marker::Send + 'static,
483    {
484        // Build and execute request manually since multipart::Form is not Clone
485        let request_builder = self
486            .build_request_builder(reqwest::Method::POST, path, request_options)
487            .multipart(<Form as AsyncTryFrom<F>>::try_from(form.clone()).await?);
488
489        Ok(stream(request_builder).await)
490    }
491
492    /// Execute a HTTP request and retry on rate limit (non-WASM version with backoff)
493    ///
494    /// request_maker serves one purpose: to be able to create request again
495    /// to retry API call after getting rate limited. request_maker is async because
496    /// reqwest::multipart::Form is created by async calls to read files for uploads.
497    #[cfg(not(target_family = "wasm"))]
498    async fn execute_raw<M, Fut>(&self, request_maker: M) -> Result<(Bytes, HeaderMap), OpenAIError>
499    where
500        M: Fn() -> Fut,
501        Fut: core::future::Future<Output = Result<reqwest::Request, OpenAIError>>,
502    {
503        let client = self.http_client.clone();
504
505        backoff::future::retry(self.backoff.clone(), || async {
506            let request = request_maker().await.map_err(backoff::Error::Permanent)?;
507            let response = client
508                .execute(request)
509                .await
510                .map_err(OpenAIError::Reqwest)
511                .map_err(backoff::Error::Permanent)?;
512
513            let status = response.status();
514
515            match read_response(response).await {
516                Ok((bytes, headers)) => Ok((bytes, headers)),
517                Err(e) => {
518                    match e {
519                        OpenAIError::ApiError(api_error) => {
520                            if status.is_server_error() {
521                                Err(backoff::Error::Transient {
522                                    err: OpenAIError::ApiError(api_error),
523                                    retry_after: None,
524                                })
525                            } else if status.as_u16() == 429
526                                && api_error.r#type != Some("insufficient_quota".to_string())
527                            {
528                                // Rate limited retry...
529                                tracing::warn!("Rate limited: {}", api_error.message);
530                                Err(backoff::Error::Transient {
531                                    err: OpenAIError::ApiError(api_error),
532                                    retry_after: None,
533                                })
534                            } else {
535                                Err(backoff::Error::Permanent(OpenAIError::ApiError(api_error)))
536                            }
537                        }
538                        _ => Err(backoff::Error::Permanent(e)),
539                    }
540                }
541            }
542        })
543        .await
544    }
545
546    /// Execute a HTTP request (WASM version - single attempt, no retry)
547    #[cfg(target_family = "wasm")]
548    async fn execute_raw<M, Fut>(&self, request_maker: M) -> Result<(Bytes, HeaderMap), OpenAIError>
549    where
550        M: Fn() -> Fut,
551        Fut: core::future::Future<Output = Result<reqwest::Request, OpenAIError>>,
552    {
553        let request = request_maker().await?;
554        let response = self
555            .http_client
556            .execute(request)
557            .await
558            .map_err(OpenAIError::Reqwest)?;
559
560        read_response(response).await
561    }
562
563    /// Execute a HTTP request and retry on rate limit
564    ///
565    /// request_maker serves one purpose: to be able to create request again
566    /// to retry API call after getting rate limited. request_maker is async because
567    /// reqwest::multipart::Form is created by async calls to read files for uploads.
568    async fn execute<O, M, Fut>(&self, request_maker: M) -> Result<O, OpenAIError>
569    where
570        O: DeserializeOwned,
571        M: Fn() -> Fut,
572        Fut: core::future::Future<Output = Result<reqwest::Request, OpenAIError>>,
573    {
574        let (bytes, _headers) = self.execute_raw(request_maker).await?;
575
576        let response: O = serde_json::from_slice(bytes.as_ref())
577            .map_err(|e| map_deserialization_error(e, bytes.as_ref()))?;
578
579        Ok(response)
580    }
581
582    /// Make HTTP POST request to receive SSE
583    #[allow(unused)]
584    #[cfg(not(target_family = "wasm"))]
585    pub(crate) async fn post_stream<I, O>(
586        &self,
587        path: &str,
588        request: I,
589        request_options: &RequestOptions,
590    ) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
591    where
592        I: Serialize,
593        O: DeserializeOwned + std::marker::Send + 'static,
594    {
595        let request_builder = self
596            .build_request_builder(reqwest::Method::POST, path, request_options)
597            .json(&request);
598
599        stream(request_builder).await
600    }
601
602    #[allow(unused)]
603    #[cfg(not(target_family = "wasm"))]
604    pub(crate) async fn post_stream_mapped_raw_events<I, O>(
605        &self,
606        path: &str,
607        request: I,
608        request_options: &RequestOptions,
609        event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
610    ) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
611    where
612        I: Serialize,
613        O: DeserializeOwned + std::marker::Send + 'static,
614    {
615        let request_builder = self
616            .build_request_builder(reqwest::Method::POST, path, request_options)
617            .json(&request);
618
619        stream_mapped_raw_events(request_builder, event_mapper).await
620    }
621
622    /// Make HTTP GET request to receive SSE
623    #[allow(unused)]
624    #[cfg(not(target_family = "wasm"))]
625    pub(crate) async fn get_stream<O>(
626        &self,
627        path: &str,
628        request_options: &RequestOptions,
629    ) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
630    where
631        O: DeserializeOwned + std::marker::Send + 'static,
632    {
633        let request_builder =
634            self.build_request_builder(reqwest::Method::GET, path, request_options);
635
636        stream(request_builder).await
637    }
638}
639
640async fn read_response(response: Response) -> Result<(Bytes, HeaderMap), OpenAIError> {
641    let status = response.status();
642    let headers = response.headers().clone();
643    let bytes = response.bytes().await.map_err(OpenAIError::Reqwest)?;
644
645    if status.is_server_error() {
646        // OpenAI does not guarantee server errors are returned as JSON so we cannot deserialize them.
647        let message: String = String::from_utf8_lossy(&bytes).into_owned();
648        tracing::warn!("Server error: {status} - {message}");
649        return Err(OpenAIError::ApiError(ApiError {
650            message,
651            r#type: None,
652            param: None,
653            code: None,
654        }));
655    }
656
657    // Deserialize response body from either error object or actual response object
658    if !status.is_success() {
659        let wrapped_error: WrappedError = serde_json::from_slice(bytes.as_ref())
660            .map_err(|e| map_deserialization_error(e, bytes.as_ref()))?;
661
662        return Err(OpenAIError::ApiError(wrapped_error.error));
663    }
664
665    Ok((bytes, headers))
666}
667
668/// Request which responds with SSE.
669/// [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format)
670#[cfg(not(target_family = "wasm"))]
671pub(crate) async fn stream<O>(
672    request_builder: reqwest::RequestBuilder,
673) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
674where
675    O: DeserializeOwned + std::marker::Send + 'static,
676{
677    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
678
679    tokio::spawn(async move {
680        let response = match request_builder.send().await {
681            Ok(r) => r,
682            Err(e) => {
683                let _ = tx.send(Err(OpenAIError::Reqwest(e)));
684                return;
685            }
686        };
687        if !response.status().is_success() {
688            if let Err(e) = read_response(response).await {
689                let _ = tx.send(Err(e));
690            }
691            return;
692        }
693        let byte_stream = response
694            .bytes_stream()
695            .map(|r| r.map_err(std::io::Error::other));
696        let mut event_stream = std::pin::pin!(eventsource_stream::EventStream::new(byte_stream));
697
698        while let Some(ev) = event_stream.next().await {
699            let event = match ev {
700                Ok(e) => e,
701                Err(e) => {
702                    let _ = tx.send(Err(OpenAIError::StreamError(Box::new(
703                        StreamError::EventStream(e.to_string()),
704                    ))));
705                    break;
706                }
707            };
708            if event.data == "[DONE]" {
709                break;
710            }
711            if event.event == "keepalive" {
712                continue;
713            }
714
715            let response = serde_json::from_str::<O>(&event.data)
716                .map_err(|e| map_deserialization_error(e, event.data.as_bytes()));
717
718            if tx.send(response).is_err() {
719                break;
720            }
721        }
722    });
723
724    Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
725}
726
727#[cfg(not(target_family = "wasm"))]
728pub(crate) async fn stream_mapped_raw_events<O>(
729    request_builder: reqwest::RequestBuilder,
730    event_mapper: impl Fn(eventsource_stream::Event) -> Result<O, OpenAIError> + Send + 'static,
731) -> Pin<Box<dyn Stream<Item = Result<O, OpenAIError>> + Send>>
732where
733    O: DeserializeOwned + std::marker::Send + 'static,
734{
735    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
736
737    tokio::spawn(async move {
738        let response = match request_builder.send().await {
739            Ok(r) => r,
740            Err(e) => {
741                let _ = tx.send(Err(OpenAIError::Reqwest(e)));
742                return;
743            }
744        };
745        if !response.status().is_success() {
746            if let Err(e) = read_response(response).await {
747                let _ = tx.send(Err(e));
748            }
749            return;
750        }
751        let byte_stream = response
752            .bytes_stream()
753            .map(|r| r.map_err(std::io::Error::other));
754        let mut event_stream = std::pin::pin!(eventsource_stream::EventStream::new(byte_stream));
755
756        while let Some(ev) = event_stream.next().await {
757            let event = match ev {
758                Ok(e) => e,
759                Err(e) => {
760                    let _ = tx.send(Err(OpenAIError::StreamError(Box::new(
761                        StreamError::EventStream(e.to_string()),
762                    ))));
763                    break;
764                }
765            };
766            let done = event.data == "[DONE]";
767
768            if event.event == "keepalive" {
769                continue;
770            }
771
772            let response = event_mapper(event);
773
774            if tx.send(response).is_err() {
775                break;
776            }
777
778            if done {
779                break;
780            }
781        }
782    });
783
784    Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx))
785}