Skip to main content

async_openai/types/assistants/
stream.rs

1use serde::Deserialize;
2
3use crate::error::ApiError;
4use crate::types::assistants::{
5    MessageDeltaObject, MessageObject, RunObject, RunStepDeltaObject, RunStepObject, ThreadObject,
6};
7
8/// Represents an event emitted when streaming a Run.
9///
10/// Each event in a server-sent events stream has an `event` and `data` property:
11///
12/// ```text
13/// event: thread.created
14/// data: {"id": "thread_123", "object": "thread", ...}
15/// ```
16///
17/// We emit events whenever a new object is created, transitions to a new state, or is being
18/// streamed in parts (deltas). For example, we emit `thread.run.created` when a new run
19/// is created, `thread.run.completed` when a run completes, and so on. When an Assistant chooses
20/// to create a message during a run, we emit a `thread.message.created event`, a
21/// `thread.message.in_progress` event, many `thread.message.delta` events, and finally a
22/// `thread.message.completed` event.
23///
24/// We may add additional events over time, so we recommend handling unknown events gracefully
25/// in your code. See the [Assistants API quickstart](https://platform.openai.com/docs/assistants/overview) to learn how to
26/// integrate the Assistants API with streaming.
27
28#[derive(Debug, Deserialize, Clone)]
29#[serde(tag = "event", content = "data")]
30pub enum AssistantStreamEvent {
31    /// Occurs when a new [thread](https://platform.openai.com/docs/api-reference/threads/object) is created.
32    #[serde(rename = "thread.created")]
33    ThreadCreated(ThreadObject),
34    /// Occurs when a new [run](https://platform.openai.com/docs/api-reference/runs/object) is created.
35    #[serde(rename = "thread.run.created")]
36    ThreadRunCreated(RunObject),
37    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `queued` status.
38    #[serde(rename = "thread.run.queued")]
39    ThreadRunQueued(RunObject),
40    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to an `in_progress` status.
41    #[serde(rename = "thread.run.in_progress")]
42    ThreadRunInProgress(RunObject),
43    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `requires_action` status.
44    #[serde(rename = "thread.run.requires_action")]
45    ThreadRunRequiresAction(RunObject),
46    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) is completed.
47    #[serde(rename = "thread.run.completed")]
48    ThreadRunCompleted(RunObject),
49    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) ends with status `incomplete`.
50    #[serde(rename = "thread.run.incomplete")]
51    ThreadRunIncomplete(RunObject),
52    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) fails.
53    #[serde(rename = "thread.run.failed")]
54    ThreadRunFailed(RunObject),
55    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `cancelling` status.
56    #[serde(rename = "thread.run.cancelling")]
57    ThreadRunCancelling(RunObject),
58    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) is cancelled.
59    #[serde(rename = "thread.run.cancelled")]
60    ThreadRunCancelled(RunObject),
61    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) expires.
62    #[serde(rename = "thread.run.expired")]
63    ThreadRunExpired(RunObject),
64    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is created.
65    #[serde(rename = "thread.run.step.created")]
66    ThreadRunStepCreated(RunStepObject),
67    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) moves to an `in_progress` state.
68    #[serde(rename = "thread.run.step.in_progress")]
69    ThreadRunStepInProgress(RunStepObject),
70    /// Occurs when parts of a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) are being streamed.
71    #[serde(rename = "thread.run.step.delta")]
72    ThreadRunStepDelta(RunStepDeltaObject),
73    ///  Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is completed.
74    #[serde(rename = "thread.run.step.completed")]
75    ThreadRunStepCompleted(RunStepObject),
76    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) fails.
77    #[serde(rename = "thread.run.step.failed")]
78    ThreadRunStepFailed(RunStepObject),
79    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is cancelled.
80    #[serde(rename = "thread.run.step.cancelled")]
81    ThreadRunStepCancelled(RunStepObject),
82    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) expires.
83    #[serde(rename = "thread.run.step.expired")]
84    ThreadRunStepExpired(RunStepObject),
85    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) is created.
86    #[serde(rename = "thread.message.created")]
87    ThreadMessageCreated(MessageObject),
88    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) moves to an `in_progress` state.
89    #[serde(rename = "thread.message.in_progress")]
90    ThreadMessageInProgress(MessageObject),
91    /// Occurs when parts of a [Message](https://platform.openai.com/docs/api-reference/messages/object) are being streamed.
92    #[serde(rename = "thread.message.delta")]
93    ThreadMessageDelta(MessageDeltaObject),
94    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) is completed.
95    #[serde(rename = "thread.message.completed")]
96    ThreadMessageCompleted(MessageObject),
97    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) ends before it is completed.
98    #[serde(rename = "thread.message.incomplete")]
99    ThreadMessageIncomplete(MessageObject),
100    /// Occurs when an [error](https://platform.openai.com/docs/guides/error-codes/api-errors) occurs. This can happen due to an internal server error or a timeout.
101    #[serde(rename = "error")]
102    ErrorEvent(ApiError),
103    /// Occurs when a stream ends.
104    #[serde(rename = "done")]
105    Done(String),
106}
107
108#[cfg(feature = "_api")]
109pub type AssistantEventStream = crate::types::stream::StreamResponse<AssistantStreamEvent>;
110
111#[cfg(feature = "_api")]
112impl TryFrom<eventsource_stream::Event> for AssistantStreamEvent {
113    type Error = crate::error::OpenAIError;
114    fn try_from(value: eventsource_stream::Event) -> Result<Self, Self::Error> {
115        match value.event.as_str() {
116            "thread.created" => serde_json::from_str::<ThreadObject>(value.data.as_str())
117                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
118                .map(AssistantStreamEvent::ThreadCreated),
119            "thread.run.created" => serde_json::from_str::<RunObject>(value.data.as_str())
120                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
121                .map(AssistantStreamEvent::ThreadRunCreated),
122            "thread.run.queued" => serde_json::from_str::<RunObject>(value.data.as_str())
123                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
124                .map(AssistantStreamEvent::ThreadRunQueued),
125            "thread.run.in_progress" => serde_json::from_str::<RunObject>(value.data.as_str())
126                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
127                .map(AssistantStreamEvent::ThreadRunInProgress),
128            "thread.run.requires_action" => serde_json::from_str::<RunObject>(value.data.as_str())
129                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
130                .map(AssistantStreamEvent::ThreadRunRequiresAction),
131            "thread.run.completed" => serde_json::from_str::<RunObject>(value.data.as_str())
132                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
133                .map(AssistantStreamEvent::ThreadRunCompleted),
134            "thread.run.incomplete" => serde_json::from_str::<RunObject>(value.data.as_str())
135                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
136                .map(AssistantStreamEvent::ThreadRunIncomplete),
137            "thread.run.failed" => serde_json::from_str::<RunObject>(value.data.as_str())
138                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
139                .map(AssistantStreamEvent::ThreadRunFailed),
140            "thread.run.cancelling" => serde_json::from_str::<RunObject>(value.data.as_str())
141                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
142                .map(AssistantStreamEvent::ThreadRunCancelling),
143            "thread.run.cancelled" => serde_json::from_str::<RunObject>(value.data.as_str())
144                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
145                .map(AssistantStreamEvent::ThreadRunCancelled),
146            "thread.run.expired" => serde_json::from_str::<RunObject>(value.data.as_str())
147                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
148                .map(AssistantStreamEvent::ThreadRunExpired),
149            "thread.run.step.created" => serde_json::from_str::<RunStepObject>(value.data.as_str())
150                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
151                .map(AssistantStreamEvent::ThreadRunStepCreated),
152            "thread.run.step.in_progress" => {
153                serde_json::from_str::<RunStepObject>(value.data.as_str())
154                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
155                    .map(AssistantStreamEvent::ThreadRunStepInProgress)
156            }
157            "thread.run.step.delta" => {
158                serde_json::from_str::<RunStepDeltaObject>(value.data.as_str())
159                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
160                    .map(AssistantStreamEvent::ThreadRunStepDelta)
161            }
162            "thread.run.step.completed" => {
163                serde_json::from_str::<RunStepObject>(value.data.as_str())
164                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
165                    .map(AssistantStreamEvent::ThreadRunStepCompleted)
166            }
167            "thread.run.step.failed" => serde_json::from_str::<RunStepObject>(value.data.as_str())
168                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
169                .map(AssistantStreamEvent::ThreadRunStepFailed),
170            "thread.run.step.cancelled" => {
171                serde_json::from_str::<RunStepObject>(value.data.as_str())
172                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
173                    .map(AssistantStreamEvent::ThreadRunStepCancelled)
174            }
175            "thread.run.step.expired" => serde_json::from_str::<RunStepObject>(value.data.as_str())
176                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
177                .map(AssistantStreamEvent::ThreadRunStepExpired),
178            "thread.message.created" => serde_json::from_str::<MessageObject>(value.data.as_str())
179                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
180                .map(AssistantStreamEvent::ThreadMessageCreated),
181            "thread.message.in_progress" => {
182                serde_json::from_str::<MessageObject>(value.data.as_str())
183                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
184                    .map(AssistantStreamEvent::ThreadMessageInProgress)
185            }
186            "thread.message.delta" => {
187                serde_json::from_str::<MessageDeltaObject>(value.data.as_str())
188                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
189                    .map(AssistantStreamEvent::ThreadMessageDelta)
190            }
191            "thread.message.completed" => {
192                serde_json::from_str::<MessageObject>(value.data.as_str())
193                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
194                    .map(AssistantStreamEvent::ThreadMessageCompleted)
195            }
196            "thread.message.incomplete" => {
197                serde_json::from_str::<MessageObject>(value.data.as_str())
198                    .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
199                    .map(AssistantStreamEvent::ThreadMessageIncomplete)
200            }
201            "error" => serde_json::from_str::<ApiError>(value.data.as_str())
202                .map_err(|e| crate::error::map_deserialization_error(e, value.data.as_bytes()))
203                .map(AssistantStreamEvent::ErrorEvent),
204            "done" => Ok(AssistantStreamEvent::Done(value.data)),
205
206            _ => Err(crate::error::OpenAIError::StreamError(Box::new(
207                crate::error::StreamError::UnknownEvent(value),
208            ))),
209        }
210    }
211}
212
213#[cfg(feature = "_api")]
214impl crate::traits::EventType for AssistantStreamEvent {
215    fn event_type(&self) -> &'static str {
216        match self {
217            AssistantStreamEvent::ThreadCreated(_) => "thread.created",
218            AssistantStreamEvent::ThreadRunCreated(_) => "thread.run.created",
219            AssistantStreamEvent::ThreadRunQueued(_) => "thread.run.queued",
220            AssistantStreamEvent::ThreadRunInProgress(_) => "thread.run.in_progress",
221            AssistantStreamEvent::ThreadRunRequiresAction(_) => "thread.run.requires_action",
222            AssistantStreamEvent::ThreadRunCompleted(_) => "thread.run.completed",
223            AssistantStreamEvent::ThreadRunIncomplete(_) => "thread.run.incomplete",
224            AssistantStreamEvent::ThreadRunFailed(_) => "thread.run.failed",
225            AssistantStreamEvent::ThreadRunCancelling(_) => "thread.run.cancelling",
226            AssistantStreamEvent::ThreadRunCancelled(_) => "thread.run.cancelled",
227            AssistantStreamEvent::ThreadRunExpired(_) => "thread.run.expired",
228            AssistantStreamEvent::ThreadRunStepCreated(_) => "thread.run.step.created",
229            AssistantStreamEvent::ThreadRunStepInProgress(_) => "thread.run.step.in_progress",
230            AssistantStreamEvent::ThreadRunStepDelta(_) => "thread.run.step.delta",
231            AssistantStreamEvent::ThreadRunStepCompleted(_) => "thread.run.step.completed",
232            AssistantStreamEvent::ThreadRunStepFailed(_) => "thread.run.step.failed",
233            AssistantStreamEvent::ThreadRunStepCancelled(_) => "thread.run.step.cancelled",
234            AssistantStreamEvent::ThreadRunStepExpired(_) => "thread.run.step.expired",
235            AssistantStreamEvent::ThreadMessageCreated(_) => "thread.message.created",
236            AssistantStreamEvent::ThreadMessageInProgress(_) => "thread.message.in_progress",
237            AssistantStreamEvent::ThreadMessageDelta(_) => "thread.message.delta",
238            AssistantStreamEvent::ThreadMessageCompleted(_) => "thread.message.completed",
239            AssistantStreamEvent::ThreadMessageIncomplete(_) => "thread.message.incomplete",
240            AssistantStreamEvent::ErrorEvent(_) => "error",
241            AssistantStreamEvent::Done(_) => "done",
242        }
243    }
244}