async_openai/types/assistants/
stream.rs

1use serde::Deserialize;
2
3use crate::error::ApiError;
4use crate::types::assistants::{
5    MessageDeltaObject, MessageObject, RunObject, RunStepDeltaObject, RunStepObject, ThreadObject,
6};
7
8/// Represents an event emitted when streaming a Run.
9///
10/// Each event in a server-sent events stream has an `event` and `data` property:
11///
12/// ```text
13/// event: thread.created
14/// data: {"id": "thread_123", "object": "thread", ...}
15/// ```
16///
17/// We emit events whenever a new object is created, transitions to a new state, or is being
18/// streamed in parts (deltas). For example, we emit `thread.run.created` when a new run
19/// is created, `thread.run.completed` when a run completes, and so on. When an Assistant chooses
20/// to create a message during a run, we emit a `thread.message.created event`, a
21/// `thread.message.in_progress` event, many `thread.message.delta` events, and finally a
22/// `thread.message.completed` event.
23///
24/// We may add additional events over time, so we recommend handling unknown events gracefully
25/// in your code. See the [Assistants API quickstart](https://platform.openai.com/docs/assistants/overview) to learn how to
26/// integrate the Assistants API with streaming.
27
28#[derive(Debug, Deserialize, Clone)]
29#[serde(tag = "event", content = "data")]
30pub enum AssistantStreamEvent {
31    /// Occurs when a new [thread](https://platform.openai.com/docs/api-reference/threads/object) is created.
32    #[serde(rename = "thread.created")]
33    ThreadCreated(ThreadObject),
34    /// Occurs when a new [run](https://platform.openai.com/docs/api-reference/runs/object) is created.
35    #[serde(rename = "thread.run.created")]
36    ThreadRunCreated(RunObject),
37    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `queued` status.
38    #[serde(rename = "thread.run.queued")]
39    ThreadRunQueued(RunObject),
40    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to an `in_progress` status.
41    #[serde(rename = "thread.run.in_progress")]
42    ThreadRunInProgress(RunObject),
43    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `requires_action` status.
44    #[serde(rename = "thread.run.requires_action")]
45    ThreadRunRequiresAction(RunObject),
46    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) is completed.
47    #[serde(rename = "thread.run.completed")]
48    ThreadRunCompleted(RunObject),
49    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) ends with status `incomplete`.
50    #[serde(rename = "thread.run.incomplete")]
51    ThreadRunIncomplete(RunObject),
52    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) fails.
53    #[serde(rename = "thread.run.failed")]
54    ThreadRunFailed(RunObject),
55    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `cancelling` status.
56    #[serde(rename = "thread.run.cancelling")]
57    ThreadRunCancelling(RunObject),
58    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) is cancelled.
59    #[serde(rename = "thread.run.cancelled")]
60    ThreadRunCancelled(RunObject),
61    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) expires.
62    #[serde(rename = "thread.run.expired")]
63    ThreadRunExpired(RunObject),
64    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is created.
65    #[serde(rename = "thread.run.step.created")]
66    ThreadRunStepCreated(RunStepObject),
67    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) moves to an `in_progress` state.
68    #[serde(rename = "thread.run.step.in_progress")]
69    ThreadRunStepInProgress(RunStepObject),
70    /// Occurs when parts of a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) are being streamed.
71    #[serde(rename = "thread.run.step.delta")]
72    ThreadRunStepDelta(RunStepDeltaObject),
73    ///  Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is completed.
74    #[serde(rename = "thread.run.step.completed")]
75    ThreadRunStepCompleted(RunStepObject),
76    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) fails.
77    #[serde(rename = "thread.run.step.failed")]
78    ThreadRunStepFailed(RunStepObject),
79    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is cancelled.
80    #[serde(rename = "thread.run.step.cancelled")]
81    ThreadRunStepCancelled(RunStepObject),
82    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) expires.
83    #[serde(rename = "thread.run.step.expired")]
84    ThreadRunStepExpired(RunStepObject),
85    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) is created.
86    #[serde(rename = "thread.message.created")]
87    ThreadMessageCreated(MessageObject),
88    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) moves to an `in_progress` state.
89    #[serde(rename = "thread.message.in_progress")]
90    ThreadMessageInProgress(MessageObject),
91    /// Occurs when parts of a [Message](https://platform.openai.com/docs/api-reference/messages/object) are being streamed.
92    #[serde(rename = "thread.message.delta")]
93    ThreadMessageDelta(MessageDeltaObject),
94    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) is completed.
95    #[serde(rename = "thread.message.completed")]
96    ThreadMessageCompleted(MessageObject),
97    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) ends before it is completed.
98    #[serde(rename = "thread.message.incomplete")]
99    ThreadMessageIncomplete(MessageObject),
100    /// Occurs when an [error](https://platform.openai.com/docs/guides/error-codes/api-errors) occurs. This can happen due to an internal server error or a timeout.
101    #[serde(rename = "error")]
102    ErrorEvent(ApiError),
103    /// Occurs when a stream ends.
104    #[serde(rename = "done")]
105    Done(String),
106}
107
108#[cfg(feature = "_api")]
109pub type AssistantEventStream = std::pin::Pin<
110    Box<dyn futures::Stream<Item = Result<AssistantStreamEvent, crate::error::OpenAIError>> + Send>,
111>;
112
113#[cfg(feature = "_api")]
114impl TryFrom<eventsource_stream::Event> for AssistantStreamEvent {
115    type Error = crate::error::OpenAIError;
116    fn try_from(value: eventsource_stream::Event) -> Result<Self, Self::Error> {
117        match value.event.as_str() {
118            "thread.created" => serde_json::from_str::<ThreadObject>(value.data.as_str())
119                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
120                .map(AssistantStreamEvent::ThreadCreated),
121            "thread.run.created" => serde_json::from_str::<RunObject>(value.data.as_str())
122                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
123                .map(AssistantStreamEvent::ThreadRunCreated),
124            "thread.run.queued" => serde_json::from_str::<RunObject>(value.data.as_str())
125                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
126                .map(AssistantStreamEvent::ThreadRunQueued),
127            "thread.run.in_progress" => serde_json::from_str::<RunObject>(value.data.as_str())
128                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
129                .map(AssistantStreamEvent::ThreadRunInProgress),
130            "thread.run.requires_action" => serde_json::from_str::<RunObject>(value.data.as_str())
131                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
132                .map(AssistantStreamEvent::ThreadRunRequiresAction),
133            "thread.run.completed" => serde_json::from_str::<RunObject>(value.data.as_str())
134                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
135                .map(AssistantStreamEvent::ThreadRunCompleted),
136            "thread.run.incomplete" => serde_json::from_str::<RunObject>(value.data.as_str())
137                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
138                .map(AssistantStreamEvent::ThreadRunIncomplete),
139            "thread.run.failed" => serde_json::from_str::<RunObject>(value.data.as_str())
140                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
141                .map(AssistantStreamEvent::ThreadRunFailed),
142            "thread.run.cancelling" => serde_json::from_str::<RunObject>(value.data.as_str())
143                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
144                .map(AssistantStreamEvent::ThreadRunCancelling),
145            "thread.run.cancelled" => serde_json::from_str::<RunObject>(value.data.as_str())
146                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
147                .map(AssistantStreamEvent::ThreadRunCancelled),
148            "thread.run.expired" => serde_json::from_str::<RunObject>(value.data.as_str())
149                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
150                .map(AssistantStreamEvent::ThreadRunExpired),
151            "thread.run.step.created" => serde_json::from_str::<RunStepObject>(value.data.as_str())
152                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
153                .map(AssistantStreamEvent::ThreadRunStepCreated),
154            "thread.run.step.in_progress" => {
155                serde_json::from_str::<RunStepObject>(value.data.as_str())
156                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
157                    .map(AssistantStreamEvent::ThreadRunStepInProgress)
158            }
159            "thread.run.step.delta" => {
160                serde_json::from_str::<RunStepDeltaObject>(value.data.as_str())
161                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
162                    .map(AssistantStreamEvent::ThreadRunStepDelta)
163            }
164            "thread.run.step.completed" => {
165                serde_json::from_str::<RunStepObject>(value.data.as_str())
166                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
167                    .map(AssistantStreamEvent::ThreadRunStepCompleted)
168            }
169            "thread.run.step.failed" => serde_json::from_str::<RunStepObject>(value.data.as_str())
170                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
171                .map(AssistantStreamEvent::ThreadRunStepFailed),
172            "thread.run.step.cancelled" => {
173                serde_json::from_str::<RunStepObject>(value.data.as_str())
174                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
175                    .map(AssistantStreamEvent::ThreadRunStepCancelled)
176            }
177            "thread.run.step.expired" => serde_json::from_str::<RunStepObject>(value.data.as_str())
178                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
179                .map(AssistantStreamEvent::ThreadRunStepExpired),
180            "thread.message.created" => serde_json::from_str::<MessageObject>(value.data.as_str())
181                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
182                .map(AssistantStreamEvent::ThreadMessageCreated),
183            "thread.message.in_progress" => {
184                serde_json::from_str::<MessageObject>(value.data.as_str())
185                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
186                    .map(AssistantStreamEvent::ThreadMessageInProgress)
187            }
188            "thread.message.delta" => {
189                serde_json::from_str::<MessageDeltaObject>(value.data.as_str())
190                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
191                    .map(AssistantStreamEvent::ThreadMessageDelta)
192            }
193            "thread.message.completed" => {
194                serde_json::from_str::<MessageObject>(value.data.as_str())
195                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
196                    .map(AssistantStreamEvent::ThreadMessageCompleted)
197            }
198            "thread.message.incomplete" => {
199                serde_json::from_str::<MessageObject>(value.data.as_str())
200                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
201                    .map(AssistantStreamEvent::ThreadMessageIncomplete)
202            }
203            "error" => serde_json::from_str::<ApiError>(value.data.as_str())
204                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
205                .map(AssistantStreamEvent::ErrorEvent),
206            "done" => Ok(AssistantStreamEvent::Done(value.data)),
207
208            _ => Err(crate::error::OpenAIError::StreamError(Box::new(
209                crate::error::StreamError::UnknownEvent(value),
210            ))),
211        }
212    }
213}
214
215#[cfg(feature = "_api")]
216impl crate::traits::EventType for AssistantStreamEvent {
217    fn event_type(&self) -> &'static str {
218        match self {
219            AssistantStreamEvent::ThreadCreated(_) => "thread.created",
220            AssistantStreamEvent::ThreadRunCreated(_) => "thread.run.created",
221            AssistantStreamEvent::ThreadRunQueued(_) => "thread.run.queued",
222            AssistantStreamEvent::ThreadRunInProgress(_) => "thread.run.in_progress",
223            AssistantStreamEvent::ThreadRunRequiresAction(_) => "thread.run.requires_action",
224            AssistantStreamEvent::ThreadRunCompleted(_) => "thread.run.completed",
225            AssistantStreamEvent::ThreadRunIncomplete(_) => "thread.run.incomplete",
226            AssistantStreamEvent::ThreadRunFailed(_) => "thread.run.failed",
227            AssistantStreamEvent::ThreadRunCancelling(_) => "thread.run.cancelling",
228            AssistantStreamEvent::ThreadRunCancelled(_) => "thread.run.cancelled",
229            AssistantStreamEvent::ThreadRunExpired(_) => "thread.run.expired",
230            AssistantStreamEvent::ThreadRunStepCreated(_) => "thread.run.step.created",
231            AssistantStreamEvent::ThreadRunStepInProgress(_) => "thread.run.step.in_progress",
232            AssistantStreamEvent::ThreadRunStepDelta(_) => "thread.run.step.delta",
233            AssistantStreamEvent::ThreadRunStepCompleted(_) => "thread.run.step.completed",
234            AssistantStreamEvent::ThreadRunStepFailed(_) => "thread.run.step.failed",
235            AssistantStreamEvent::ThreadRunStepCancelled(_) => "thread.run.step.cancelled",
236            AssistantStreamEvent::ThreadRunStepExpired(_) => "thread.run.step.expired",
237            AssistantStreamEvent::ThreadMessageCreated(_) => "thread.message.created",
238            AssistantStreamEvent::ThreadMessageInProgress(_) => "thread.message.in_progress",
239            AssistantStreamEvent::ThreadMessageDelta(_) => "thread.message.delta",
240            AssistantStreamEvent::ThreadMessageCompleted(_) => "thread.message.completed",
241            AssistantStreamEvent::ThreadMessageIncomplete(_) => "thread.message.incomplete",
242            AssistantStreamEvent::ErrorEvent(_) => "error",
243            AssistantStreamEvent::Done(_) => "done",
244        }
245    }
246}