bamboo-engine 2026.4.30

Execution engine and orchestration for the Bamboo agent framework
Documentation
use futures::stream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio_util::sync::CancellationToken;

use bamboo_agent_core::tools::{FunctionCall, ToolCall};
use bamboo_agent_core::{AgentError, AgentEvent};
use bamboo_infrastructure::provider::LLMError;
use bamboo_infrastructure::{LLMChunk, LLMStream};

use super::{consume_llm_stream, consume_llm_stream_silent};

fn build_stream(items: Vec<bamboo_infrastructure::provider::Result<LLMChunk>>) -> LLMStream {
    Box::pin(stream::iter(items))
}

#[tokio::test]
async fn consume_llm_stream_accumulates_tokens_and_tool_calls() {
    let stream = build_stream(vec![
        Ok(LLMChunk::ResponseId("resp_123".to_string())),
        Ok(LLMChunk::ReasoningToken("thinking".to_string())),
        Ok(LLMChunk::Token("hi".to_string())),
        Ok(LLMChunk::ToolCalls(vec![ToolCall {
            id: "call_1".to_string(),
            tool_type: "function".to_string(),
            function: FunctionCall {
                name: "test_tool".to_string(),
                arguments: "{".to_string(),
            },
        }])),
        Ok(LLMChunk::ToolCalls(vec![ToolCall {
            id: "call_1".to_string(),
            tool_type: "function".to_string(),
            function: FunctionCall {
                name: String::new(),
                arguments: "}".to_string(),
            },
        }])),
        Ok(LLMChunk::Done),
    ]);

    let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(8);
    let output = consume_llm_stream(stream, &event_tx, &CancellationToken::new(), "session-1")
        .await
        .expect("stream should succeed");

    assert_eq!(output.response_id.as_deref(), Some("resp_123"));
    assert_eq!(output.content, "hi");
    assert_eq!(output.reasoning_content, "thinking");
    assert_eq!(output.token_count, 2);
    assert_eq!(output.tool_calls.len(), 1);
    assert_eq!(output.tool_calls[0].function.name, "test_tool");
    assert_eq!(output.tool_calls[0].function.arguments, "{}");

    let reasoning_event = event_rx.recv().await.expect("missing reasoning event");
    assert!(matches!(reasoning_event, AgentEvent::ReasoningToken { .. }));

    let token_event = event_rx.recv().await.expect("missing token event");
    assert!(matches!(token_event, AgentEvent::Token { .. }));
}

#[tokio::test]
async fn consume_llm_stream_silent_does_not_emit_events() {
    let stream = build_stream(vec![
        Ok(LLMChunk::Token("hello".to_string())),
        Ok(LLMChunk::Done),
    ]);

    let output = consume_llm_stream_silent(stream, &CancellationToken::new(), "session-2")
        .await
        .expect("silent stream should succeed");

    assert!(output.response_id.is_none());
    assert_eq!(output.content, "hello");
    assert!(output.reasoning_content.is_empty());
    assert_eq!(output.token_count, 5);
    assert!(output.tool_calls.is_empty());
}

#[tokio::test]
async fn consume_llm_stream_returns_single_prefix_stream_error_message() {
    let stream = build_stream(vec![Err(LLMError::Stream(
        "Transport error: error decoding response body".to_string(),
    ))]);

    let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(4);
    let err =
        match consume_llm_stream(stream, &event_tx, &CancellationToken::new(), "session-3").await {
            Ok(_) => panic!("stream should fail"),
            Err(err) => err,
        };

    match err {
        AgentError::LLM(message) => {
            assert_eq!(
                message,
                "Stream error: Transport error: error decoding response body"
            );
            assert!(!message.starts_with("Stream error: Stream error:"));
        }
        other => panic!("expected AgentError::LLM, got {other:?}"),
    }

    assert!(matches!(event_rx.try_recv(), Err(TryRecvError::Empty)));
}