bamboo-engine 2026.4.30

Execution engine and orchestration for the Bamboo agent framework
Documentation
use tokio::sync::mpsc;

use bamboo_agent_core::{AgentError, AgentEvent};
use bamboo_infrastructure::{provider, LLMChunk};

use super::stream_state::StreamAccumulationState;

pub(super) async fn handle_chunk_result(
    chunk_result: provider::Result<LLMChunk>,
    state: &mut StreamAccumulationState,
    event_tx: Option<&mpsc::Sender<AgentEvent>>,
    session_id: &str,
) -> Result<(), AgentError> {
    match chunk_result {
        Ok(LLMChunk::ResponseId(response_id)) => {
            tracing::debug!(
                "[{}] Received upstream response_id={}",
                session_id,
                response_id
            );
            state.set_response_id(response_id);
            Ok(())
        }
        Ok(LLMChunk::Token(token)) => {
            state.append_token(&token);
            if let Some(event_tx) = event_tx {
                let _ = event_tx.send(AgentEvent::Token { content: token }).await;
            }
            Ok(())
        }
        Ok(LLMChunk::ReasoningToken(token)) => {
            state.append_reasoning_token(&token);
            if let Some(event_tx) = event_tx {
                let _ = event_tx
                    .send(AgentEvent::ReasoningToken { content: token })
                    .await;
            }
            Ok(())
        }
        Ok(LLMChunk::ToolCalls(partial_calls)) => {
            tracing::trace!(
                "[{}] Received {} tool call parts",
                session_id,
                partial_calls.len()
            );
            state.extend_tool_calls(partial_calls);
            Ok(())
        }
        Ok(LLMChunk::Done) => {
            tracing::debug!("[{}] LLM stream completed", session_id);
            Ok(())
        }
        Ok(LLMChunk::CacheUsage {
            cache_creation_input_tokens,
            cache_read_input_tokens,
        }) => {
            tracing::debug!(
                "[{}] Cache usage: creation={}, read={}",
                session_id,
                cache_creation_input_tokens,
                cache_read_input_tokens
            );
            state.record_cache(cache_creation_input_tokens, cache_read_input_tokens);
            Ok(())
        }
        Ok(LLMChunk::UsageSummary {
            output_tokens,
            thinking_tokens,
        }) => {
            tracing::debug!(
                "[{}] Usage summary: output={}, thinking={}",
                session_id,
                output_tokens,
                thinking_tokens
            );
            state.record_usage(output_tokens, thinking_tokens);
            Ok(())
        }
        Err(error) => {
            let message = error.to_string();
            tracing::warn!("[{}] LLM stream error: {}", session_id, message);
            // Do not emit AgentEvent::Error here.
            // Round-level retry logic may recover from this transient stream failure.
            let _ = event_tx;
            Err(AgentError::LLM(message))
        }
    }
}