adk_model/gemini/
streaming.rs1use adk_core::{Content, LlmResponse, Part};
2use futures::Stream;
3use std::pin::Pin;
4
5pub async fn aggregate_stream(
7 mut stream: Pin<Box<dyn Stream<Item = adk_core::Result<LlmResponse>> + Send>>,
8) -> adk_core::Result<LlmResponse> {
9 use futures::StreamExt;
10
11 let mut aggregated_text = String::new();
12 let mut last_response: Option<LlmResponse> = None;
13
14 while let Some(result) = stream.next().await {
15 let response = result?;
16
17 if let Some(content) = &response.content {
18 for part in &content.parts {
19 if let Part::Text { text } = part {
20 aggregated_text.push_str(text);
21 }
22 }
23 }
24
25 last_response = Some(response);
26 }
27
28 let mut final_response = last_response.ok_or_else(|| {
29 adk_core::AdkError::Model("No responses received from stream".to_string())
30 })?;
31
32 final_response.content = Some(Content {
33 role: "model".to_string(),
34 parts: vec![Part::Text { text: aggregated_text }],
35 });
36 final_response.partial = false;
37 final_response.turn_complete = true;
38
39 Ok(final_response)
40}