use tokio::sync::mpsc;
use bamboo_agent_core::{AgentError, AgentEvent};
use bamboo_infrastructure::{provider, LLMChunk};
use super::stream_state::StreamAccumulationState;
pub(super) async fn handle_chunk_result(
chunk_result: provider::Result<LLMChunk>,
state: &mut StreamAccumulationState,
event_tx: Option<&mpsc::Sender<AgentEvent>>,
session_id: &str,
) -> Result<(), AgentError> {
match chunk_result {
Ok(LLMChunk::ResponseId(response_id)) => {
tracing::debug!(
"[{}] Received upstream response_id={}",
session_id,
response_id
);
state.set_response_id(response_id);
Ok(())
}
Ok(LLMChunk::Token(token)) => {
state.append_token(&token);
if let Some(event_tx) = event_tx {
let _ = event_tx.send(AgentEvent::Token { content: token }).await;
}
Ok(())
}
Ok(LLMChunk::ReasoningToken(token)) => {
state.append_reasoning_token(&token);
if let Some(event_tx) = event_tx {
let _ = event_tx
.send(AgentEvent::ReasoningToken { content: token })
.await;
}
Ok(())
}
Ok(LLMChunk::ToolCalls(partial_calls)) => {
tracing::trace!(
"[{}] Received {} tool call parts",
session_id,
partial_calls.len()
);
state.extend_tool_calls(partial_calls);
Ok(())
}
Ok(LLMChunk::Done) => {
tracing::debug!("[{}] LLM stream completed", session_id);
Ok(())
}
Ok(LLMChunk::CacheUsage {
cache_creation_input_tokens,
cache_read_input_tokens,
}) => {
tracing::debug!(
"[{}] Cache usage: creation={}, read={}",
session_id,
cache_creation_input_tokens,
cache_read_input_tokens
);
state.record_cache(cache_creation_input_tokens, cache_read_input_tokens);
Ok(())
}
Ok(LLMChunk::UsageSummary {
output_tokens,
thinking_tokens,
}) => {
tracing::debug!(
"[{}] Usage summary: output={}, thinking={}",
session_id,
output_tokens,
thinking_tokens
);
state.record_usage(output_tokens, thinking_tokens);
Ok(())
}
Err(error) => {
let message = error.to_string();
tracing::warn!("[{}] LLM stream error: {}", session_id, message);
let _ = event_tx;
Err(AgentError::LLM(message))
}
}
}