1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
//! Text Streaming impl for the `LanguageModelRequest` trait.
use crate::core::{
AssistantMessage, LanguageModelStreamChunkType, Message, Messages, ToolCallInfo,
ToolResultInfo,
language_model::{
LanguageModel, LanguageModelOptions, LanguageModelResponseContentType, LanguageModelStream,
LanguageModelStreamChunk, Step, StopReason, Usage, request::LanguageModelRequest,
},
messages::TaggedMessage,
utils::resolve_message,
};
use crate::error::Result;
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::Mutex;
impl<M: LanguageModel> LanguageModelRequest<M> {
/// Streams text generation and tool execution using the language model.
///
/// This method performs streaming text generation, providing real-time access to response chunks
/// as they are produced. It supports tool calling and execution in multiple steps, streaming
/// intermediate results and handling tool interactions dynamically.
///
/// For non-streaming responses, use [`generate_text`](Self::generate_text) instead.
///
/// # Returns
///
/// A [`StreamTextResponse`] containing the stream of chunks and final conversation state.
///
/// # Errors
///
/// Returns an `Error` if the underlying language model fails to generate a response
/// or if tool execution encounters an error.
///
/// # Examples
///
/// ```rust,no_run
///# #[cfg(feature = "openai")]
///# {
/// use aisdk::{
/// core::{LanguageModelRequest, LanguageModelStreamChunkType},
/// providers::OpenAI,
/// };
/// use futures::StreamExt;
///
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///
/// let openai = OpenAI::gpt_5();
///
/// let mut stream = LanguageModelRequest::builder()
/// .model(openai)
/// .prompt("What is the meaning of life?")
/// .build()
/// .stream_text()
/// .await?
/// .stream;
///
/// while let Some(chunk) = stream.next().await {
/// if let LanguageModelStreamChunkType::Text(text) = chunk {
/// println!("{}", text);
/// }
/// }
///
/// Ok(())
/// }
///# }
/// ```
pub async fn stream_text(&mut self) -> Result<StreamTextResponse> {
let (system_prompt, messages) = resolve_message(&self.options, &self.prompt);
let options = Arc::new(Mutex::new(LanguageModelOptions {
system: (!system_prompt.is_empty()).then_some(system_prompt),
messages,
schema: self.options.schema.to_owned(),
stop_sequences: self.options.stop_sequences.to_owned(),
tools: self.options.tools.to_owned(),
stop_when: self.options.stop_when.clone(),
on_step_start: self.options.on_step_start.clone(),
on_step_finish: self.options.on_step_finish.clone(),
stop_reason: None,
..self.options
}));
let (tx, stream) = LanguageModelStream::new();
let _ = tx.send(LanguageModelStreamChunkType::Start);
let mut model = self.model.clone();
let thread_options = options.clone();
tokio::spawn(async move {
loop {
let mut options = thread_options.lock().await;
// Update the current step
options.current_step_id += 1;
let current_step_id = options.current_step_id;
// Prepare the next step
if let Some(hook) = options.on_step_start.clone() {
hook(&mut options);
}
let response_result = model.stream_text(options.clone()).await;
let mut response = match response_result {
Ok(r) => r,
Err(e) => {
options.stop_reason = Some(StopReason::Error(e.clone()));
let _ = tx.send(LanguageModelStreamChunkType::Failed(format!(
"Model streaming failed: {e}"
)));
return Err(e);
}
};
while let Some(ref chunk) = response.next().await {
match chunk {
Ok(chunk) => {
for output in chunk {
match output {
LanguageModelStreamChunk::Done(final_msg) => {
match final_msg.content {
LanguageModelResponseContentType::Text(_) => {
let assistant_msg =
Message::Assistant(AssistantMessage {
content: final_msg.content.clone(),
usage: final_msg.usage.clone(),
});
options.messages.push(TaggedMessage::new(
current_step_id,
assistant_msg,
));
options.stop_reason = Some(StopReason::Finish);
}
LanguageModelResponseContentType::Reasoning {
ref content,
ref extensions,
} => {
options.messages.push(TaggedMessage::new(
current_step_id,
Message::Assistant(AssistantMessage {
content:
LanguageModelResponseContentType::Reasoning {
content: content.clone(),
extensions: extensions.clone(),
},
usage: final_msg.usage.clone(),
}),
));
options.stop_reason = Some(StopReason::Finish);
}
LanguageModelResponseContentType::ToolCall(
ref tool_info,
) => {
// add tool message
let usage = final_msg.usage.clone();
let _ = &options.messages.push(TaggedMessage::new(
current_step_id.to_owned(),
Message::Assistant(AssistantMessage::new(
LanguageModelResponseContentType::ToolCall(
tool_info.clone(),
),
usage,
)),
));
options.handle_tool_call(tool_info).await;
}
_ => {}
}
// Finish the step
if let Some(ref hook) = options.on_step_finish {
hook(&options);
}
// Stop If
if let Some(hook) = &options.stop_when.clone()
&& hook(&options)
{
let _ =
tx.send(LanguageModelStreamChunkType::Incomplete(
"Stopped by hook".to_string(),
));
options.stop_reason = Some(StopReason::Hook);
break;
}
}
LanguageModelStreamChunk::Delta(other) => match other {
// Propagate text and reasoning chunks
LanguageModelStreamChunkType::Text(_)
| LanguageModelStreamChunkType::Reasoning(_) => {
let _ = tx.send(other.clone());
}
_ => {}
},
}
}
}
Err(e) => {
let _ = tx.send(LanguageModelStreamChunkType::Failed(e.to_string()));
options.stop_reason = Some(StopReason::Error(e.clone()));
break;
}
}
match options.stop_reason {
None => {}
_ => break,
};
}
match options.stop_reason {
None => {}
_ => break,
};
}
drop(tx);
Ok(())
});
let result = StreamTextResponse { stream, options };
Ok(result)
}
}
// ============================================================================
// Section: response types
// ============================================================================
/// Response from a streaming text generation call.
///
/// This struct contains the streaming response from a language model,
/// including the stream of chunks and the final options state.
pub struct StreamTextResponse {
/// The stream of response chunks from the language model.
pub stream: LanguageModelStream,
// The reason the model stopped generating text.
options: Arc<Mutex<LanguageModelOptions>>,
}
impl StreamTextResponse {
/// Returns the step IDs of all messages in the conversation.
///
/// This is primarily used for testing and debugging purposes.
#[cfg(any(test, feature = "test-access"))]
pub async fn step_ids(&self) -> Vec<usize> {
self.options
.lock()
.await
.messages
.iter()
.map(|t| t.step_id)
.collect()
}
}
impl StreamTextResponse {
/// Returns all messages from the conversation.
///
/// This includes system prompts, user inputs, assistant responses,
/// and any tool-related messages that occurred during streaming.
///
/// # Returns
///
/// A vector of all [`Message`] instances in the conversation.
pub async fn messages(&self) -> Messages {
self.options.lock().await.messages()
}
/// Returns the conversation step with the specified index.
///
/// A step represents all messages exchanged during one cycle of model interaction,
/// including user input, assistant responses, and tool calls/results.
///
/// # Parameters
///
/// * `index` - The step ID to retrieve.
///
/// # Returns
///
/// An `Option<Step>` containing the step if it exists.
pub async fn step(&self, index: usize) -> Option<Step> {
self.options.lock().await.step(index)
}
/// Returns the most recent conversation step.
///
/// This is equivalent to calling `step()` with the highest step ID.
///
/// # Returns
///
/// An `Option<Step>` containing the last step if any steps exist.
pub async fn last_step(&self) -> Option<Step> {
self.options.lock().await.last_step()
}
/// Returns all conversation steps in chronological order.
///
/// Each step contains all messages exchanged during that cycle of interaction.
///
/// # Returns
///
/// A vector of all [`Step`] instances in order.
pub async fn steps(&self) -> Vec<Step> {
self.options.lock().await.steps()
}
/// Calculates the total token usage across all conversation steps.
///
/// This aggregates input, output, reasoning, and cached token counts
/// from all assistant messages in the conversation.
///
/// # Returns
///
/// A [`Usage`] struct containing the aggregated token statistics.
pub async fn usage(&self) -> Usage {
self.options.lock().await.usage()
}
/// Returns the content of the last assistant message, excluding reasoning.
///
/// This provides access to the final output content from the language model,
/// filtering out any reasoning content that may be present.
///
/// # Returns
///
/// An `Option<LanguageModelResponseContentType>` containing the content if available.
pub async fn content(&self) -> Option<LanguageModelResponseContentType> {
self.options.lock().await.content().cloned()
}
/// Returns the text content of the last assistant message.
///
/// This extracts the plain text from the final assistant response,
/// if the content type is text.
///
/// # Returns
///
/// An `Option<String>` containing the text if the last message is text content.
pub async fn text(&self) -> Option<String> {
self.options.lock().await.text()
}
/// Extracts all tool execution results from the conversation.
///
/// This collects all tool result messages that were generated during
/// the streaming process, including results from tool calls.
///
/// # Returns
///
/// An `Option<Vec<ToolResultInfo>>` containing all tool results if any exist.
pub async fn tool_results(&self) -> Option<Vec<ToolResultInfo>> {
self.options.lock().await.tool_results()
}
/// Extracts all tool calls from the conversation.
///
/// This collects all tool call requests that were made by the assistant
/// during the streaming process.
///
/// # Returns
///
/// An `Option<Vec<ToolCallInfo>>` containing all tool calls if any exist.
pub async fn tool_calls(&self) -> Option<Vec<ToolCallInfo>> {
self.options.lock().await.tool_calls()
}
/// Returns the reason why text generation stopped.
///
/// This indicates how and why the streaming process terminated,
/// such as completion, error, or user-defined stop conditions.
///
/// # Returns
///
/// An `Option<StopReason>` indicating the termination reason if available.
pub async fn stop_reason(&self) -> Option<StopReason> {
self.options.lock().await.stop_reason()
}
}