use super::types::{StreamChunk, AccumulatedResponse};
use std::collections::HashMap;
#[derive(Debug, Default)]
pub struct StreamingAccumulator {
text: String,
reasoning: String,
tool_calls: HashMap<usize, crate::ToolCall>,
}
impl StreamingAccumulator {
pub fn new() -> Self {
Self::default()
}
pub fn process_chunk(&mut self, chunk: StreamChunk) -> bool {
match chunk {
StreamChunk::Text(text) => {
self.text.push_str(&text);
false }
StreamChunk::Reasoning(reasoning) => {
self.reasoning.push_str(&reasoning);
false }
StreamChunk::ToolCallDelta { index, id, name, arguments_delta } => {
let tool_call = self.tool_calls.entry(index).or_insert_with(|| {
crate::ToolCall {
id: String::new(),
r#type: "function".to_string(),
function: crate::FunctionCall {
name: String::new(),
arguments: String::new(),
},
}
});
if let Some(id_value) = id {
tool_call.id = id_value;
}
if let Some(name_value) = name {
tool_call.function.name = name_value;
}
if let Some(args_delta) = arguments_delta {
if std::env::var("RUST_LOG").map(|v| v.to_lowercase().contains("debug")).unwrap_or(false) {
eprintln!("[UMF ACCUMULATOR] Tool call {} args_delta: '{}'", index, args_delta);
eprintln!("[UMF ACCUMULATOR] Tool call {} total args so far: '{}'", index, tool_call.function.arguments);
}
tool_call.function.arguments.push_str(&args_delta);
}
false }
StreamChunk::Done => true, }
}
pub fn finish(self) -> AccumulatedResponse {
let tool_calls: Vec<crate::ToolCall> = self.tool_calls
.into_iter()
.map(|(_, tool_call)| tool_call)
.filter(|tc| !tc.function.name.is_empty())
.collect();
AccumulatedResponse {
text: self.text,
reasoning: self.reasoning,
tool_calls,
}
}
pub async fn accumulate_stream<S, E>(mut stream: S) -> Result<AccumulatedResponse, E>
where
S: futures_util::Stream<Item = Result<StreamChunk, E>> + Unpin,
{
use futures_util::StreamExt;
let mut accumulator = Self::new();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
if accumulator.process_chunk(chunk) {
break; }
}
Ok(accumulator.finish())
}
}