strands_agents/streaming/
mod.rs

1//! Streaming utilities for processing model responses.
2
3use futures::{Stream, StreamExt};
4
5use crate::types::errors::StrandsError;
6use crate::types::streaming::StreamEvent;
7
8/// Collects text from a stream of events.
9pub async fn collect_text<S>(stream: S) -> Result<String, StrandsError>
10where
11    S: Stream<Item = Result<StreamEvent, StrandsError>>,
12{
13    let mut text = String::new();
14    futures::pin_mut!(stream);
15
16    while let Some(event) = stream.next().await {
17        let event = event?;
18        if let Some(delta) = event.as_text_delta() {
19            text.push_str(delta);
20        }
21    }
22
23    Ok(text)
24}
25
26/// Prints a stream of events to stdout.
27pub async fn print_stream<S>(stream: S) -> Result<(), StrandsError>
28where
29    S: Stream<Item = Result<StreamEvent, StrandsError>>,
30{
31    use std::io::{self, Write};
32
33    futures::pin_mut!(stream);
34
35    while let Some(event) = stream.next().await {
36        let event = event?;
37        if let Some(text) = event.as_text_delta() {
38            print!("{text}");
39            io::stdout().flush().ok();
40        }
41    }
42    println!();
43
44    Ok(())
45}