async_openai_wasm/types/
assistant_stream.rs

1use serde::Deserialize;
2
3use crate::client::OpenAIEventMappedStream;
4use crate::error::{map_deserialization_error, ApiError, OpenAIError};
5
6use super::{
7    MessageDeltaObject, MessageObject, RunObject, RunStepDeltaObject, RunStepObject, ThreadObject,
8};
9
10/// Represents an event emitted when streaming a Run.
11///
12/// Each event in a server-sent events stream has an `event` and `data` property:
13///
14/// ```text
15/// event: thread.created
16/// data: {"id": "thread_123", "object": "thread", ...}
17/// ```
18///
19/// We emit events whenever a new object is created, transitions to a new state, or is being
20/// streamed in parts (deltas). For example, we emit `thread.run.created` when a new run
21/// is created, `thread.run.completed` when a run completes, and so on. When an Assistant chooses
22/// to create a message during a run, we emit a `thread.message.created event`, a
23/// `thread.message.in_progress` event, many `thread.message.delta` events, and finally a
24/// `thread.message.completed` event.
25///
26/// We may add additional events over time, so we recommend handling unknown events gracefully
27/// in your code. See the [Assistants API quickstart](https://platform.openai.com/docs/assistants/overview) to learn how to
28/// integrate the Assistants API with streaming.
29#[derive(Debug, Deserialize, Clone)]
30#[serde(tag = "event", content = "data")]
31#[non_exhaustive]
32pub enum AssistantStreamEvent {
33    /// Occurs when a new [thread](https://platform.openai.com/docs/api-reference/threads/object) is created.
34    #[serde(rename = "thread.created")]
35    TreadCreated(ThreadObject),
36    /// Occurs when a new [run](https://platform.openai.com/docs/api-reference/runs/object) is created.
37    #[serde(rename = "thread.run.created")]
38    ThreadRunCreated(RunObject),
39    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `queued` status.
40    #[serde(rename = "thread.run.queued")]
41    ThreadRunQueued(RunObject),
42    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to an `in_progress` status.
43    #[serde(rename = "thread.run.in_progress")]
44    ThreadRunInProgress(RunObject),
45    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `requires_action` status.
46    #[serde(rename = "thread.run.requires_action")]
47    ThreadRunRequiresAction(RunObject),
48    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) is completed.
49    #[serde(rename = "thread.run.completed")]
50    ThreadRunCompleted(RunObject),
51    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) ends with status `incomplete`.
52    #[serde(rename = "thread.run.incomplete")]
53    ThreadRunIncomplete(RunObject),
54    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) fails.
55    #[serde(rename = "thread.run.failed")]
56    ThreadRunFailed(RunObject),
57    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) moves to a `cancelling` status.
58    #[serde(rename = "thread.run.cancelling")]
59    ThreadRunCancelling(RunObject),
60    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) is cancelled.
61    #[serde(rename = "thread.run.cancelled")]
62    ThreadRunCancelled(RunObject),
63    /// Occurs when a [run](https://platform.openai.com/docs/api-reference/runs/object) expires.
64    #[serde(rename = "thread.run.expired")]
65    ThreadRunExpired(RunObject),
66    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is created.
67    #[serde(rename = "thread.run.step.created")]
68    ThreadRunStepCreated(RunStepObject),
69    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) moves to an `in_progress` state.
70    #[serde(rename = "thread.run.step.in_progress")]
71    ThreadRunStepInProgress(RunStepObject),
72    /// Occurs when parts of a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) are being streamed.
73    #[serde(rename = "thread.run.step.delta")]
74    ThreadRunStepDelta(RunStepDeltaObject),
75    ///  Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is completed.
76    #[serde(rename = "thread.run.step.completed")]
77    ThreadRunStepCompleted(RunStepObject),
78    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) fails.
79    #[serde(rename = "thread.run.step.failed")]
80    ThreadRunStepFailed(RunStepObject),
81    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) is cancelled.
82    #[serde(rename = "thread.run.step.cancelled")]
83    ThreadRunStepCancelled(RunStepObject),
84    /// Occurs when a [run step](https://platform.openai.com/docs/api-reference/run-steps/step-object) expires.
85    #[serde(rename = "thread.run.step.expired")]
86    ThreadRunStepExpired(RunStepObject),
87    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) is created.
88    #[serde(rename = "thread.message.created")]
89    ThreadMessageCreated(MessageObject),
90    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) moves to an `in_progress` state.
91    #[serde(rename = "thread.message.in_progress")]
92    ThreadMessageInProgress(MessageObject),
93    /// Occurs when parts of a [Message](https://platform.openai.com/docs/api-reference/messages/object) are being streamed.
94    #[serde(rename = "thread.message.delta")]
95    ThreadMessageDelta(MessageDeltaObject),
96    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) is completed.
97    #[serde(rename = "thread.message.completed")]
98    ThreadMessageCompleted(MessageObject),
99    /// Occurs when a [message](https://platform.openai.com/docs/api-reference/messages/object) ends before it is completed.
100    #[serde(rename = "thread.message.incomplete")]
101    ThreadMessageIncomplete(MessageObject),
102    /// Occurs when an [error](https://platform.openai.com/docs/guides/error-codes/api-errors) occurs. This can happen due to an internal server error or a timeout.
103    #[serde(rename = "error")]
104    ErrorEvent(ApiError),
105    /// Occurs when a stream ends.
106    #[serde(rename = "done")]
107    Done(String),
108}
109
110pub type AssistantEventStream = OpenAIEventMappedStream<AssistantStreamEvent>;
111
112impl TryFrom<eventsource_stream::Event> for AssistantStreamEvent {
113    type Error = OpenAIError;
114    fn try_from(value: eventsource_stream::Event) -> Result<Self, Self::Error> {
115        match value.event.as_str() {
116            "thread.created" => serde_json::from_str::<ThreadObject>(value.data.as_str())
117                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
118                .map(AssistantStreamEvent::TreadCreated),
119            "thread.run.created" => serde_json::from_str::<RunObject>(value.data.as_str())
120                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
121                .map(AssistantStreamEvent::ThreadRunCreated),
122            "thread.run.queued" => serde_json::from_str::<RunObject>(value.data.as_str())
123                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
124                .map(AssistantStreamEvent::ThreadRunQueued),
125            "thread.run.in_progress" => serde_json::from_str::<RunObject>(value.data.as_str())
126                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
127                .map(AssistantStreamEvent::ThreadRunInProgress),
128            "thread.run.requires_action" => serde_json::from_str::<RunObject>(value.data.as_str())
129                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
130                .map(AssistantStreamEvent::ThreadRunRequiresAction),
131            "thread.run.completed" => serde_json::from_str::<RunObject>(value.data.as_str())
132                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
133                .map(AssistantStreamEvent::ThreadRunCompleted),
134            "thread.run.incomplete" => serde_json::from_str::<RunObject>(value.data.as_str())
135                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
136                .map(AssistantStreamEvent::ThreadRunIncomplete),
137            "thread.run.failed" => serde_json::from_str::<RunObject>(value.data.as_str())
138                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
139                .map(AssistantStreamEvent::ThreadRunFailed),
140            "thread.run.cancelling" => serde_json::from_str::<RunObject>(value.data.as_str())
141                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
142                .map(AssistantStreamEvent::ThreadRunCancelling),
143            "thread.run.cancelled" => serde_json::from_str::<RunObject>(value.data.as_str())
144                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
145                .map(AssistantStreamEvent::ThreadRunCancelled),
146            "thread.run.expired" => serde_json::from_str::<RunObject>(value.data.as_str())
147                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
148                .map(AssistantStreamEvent::ThreadRunExpired),
149            "thread.run.step.created" => serde_json::from_str::<RunStepObject>(value.data.as_str())
150                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
151                .map(AssistantStreamEvent::ThreadRunStepCreated),
152            "thread.run.step.in_progress" => {
153                serde_json::from_str::<RunStepObject>(value.data.as_str())
154                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
155                    .map(AssistantStreamEvent::ThreadRunStepInProgress)
156            }
157            "thread.run.step.delta" => {
158                serde_json::from_str::<RunStepDeltaObject>(value.data.as_str())
159                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
160                    .map(AssistantStreamEvent::ThreadRunStepDelta)
161            }
162            "thread.run.step.completed" => {
163                serde_json::from_str::<RunStepObject>(value.data.as_str())
164                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
165                    .map(AssistantStreamEvent::ThreadRunStepCompleted)
166            }
167            "thread.run.step.failed" => serde_json::from_str::<RunStepObject>(value.data.as_str())
168                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
169                .map(AssistantStreamEvent::ThreadRunStepFailed),
170            "thread.run.step.cancelled" => {
171                serde_json::from_str::<RunStepObject>(value.data.as_str())
172                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
173                    .map(AssistantStreamEvent::ThreadRunStepCancelled)
174            }
175            "thread.run.step.expired" => serde_json::from_str::<RunStepObject>(value.data.as_str())
176                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
177                .map(AssistantStreamEvent::ThreadRunStepExpired),
178            "thread.message.created" => serde_json::from_str::<MessageObject>(value.data.as_str())
179                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
180                .map(AssistantStreamEvent::ThreadMessageCreated),
181            "thread.message.in_progress" => {
182                serde_json::from_str::<MessageObject>(value.data.as_str())
183                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
184                    .map(AssistantStreamEvent::ThreadMessageInProgress)
185            }
186            "thread.message.delta" => {
187                serde_json::from_str::<MessageDeltaObject>(value.data.as_str())
188                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
189                    .map(AssistantStreamEvent::ThreadMessageDelta)
190            }
191            "thread.message.completed" => {
192                serde_json::from_str::<MessageObject>(value.data.as_str())
193                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
194                    .map(AssistantStreamEvent::ThreadMessageCompleted)
195            }
196            "thread.message.incomplete" => {
197                serde_json::from_str::<MessageObject>(value.data.as_str())
198                    .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
199                    .map(AssistantStreamEvent::ThreadMessageIncomplete)
200            }
201            "error" => serde_json::from_str::<ApiError>(value.data.as_str())
202                .map_err(|e| map_deserialization_error(e, value.data.as_bytes()))
203                .map(AssistantStreamEvent::ErrorEvent),
204            "done" => Ok(AssistantStreamEvent::Done(value.data)),
205
206            _ => Err(OpenAIError::StreamError(
207                "Unrecognized event: {value:?#}".into(),
208            )),
209        }
210    }
211}