adk_model/gemini/
streaming.rs

1use adk_core::{Content, LlmResponse, Part};
2use futures::Stream;
3use std::pin::Pin;
4
5/// Aggregate streaming responses into a single response
6pub async fn aggregate_stream(
7    mut stream: Pin<Box<dyn Stream<Item = adk_core::Result<LlmResponse>> + Send>>,
8) -> adk_core::Result<LlmResponse> {
9    use futures::StreamExt;
10
11    let mut aggregated_text = String::new();
12    let mut last_response: Option<LlmResponse> = None;
13
14    while let Some(result) = stream.next().await {
15        let response = result?;
16
17        if let Some(content) = &response.content {
18            for part in &content.parts {
19                if let Part::Text { text } = part {
20                    aggregated_text.push_str(text);
21                }
22            }
23        }
24
25        last_response = Some(response);
26    }
27
28    let mut final_response = last_response.ok_or_else(|| {
29        adk_core::AdkError::Model("No responses received from stream".to_string())
30    })?;
31
32    final_response.content = Some(Content {
33        role: "model".to_string(),
34        parts: vec![Part::Text { text: aggregated_text }],
35    });
36    final_response.partial = false;
37    final_response.turn_complete = true;
38
39    Ok(final_response)
40}