bamboo-server 2026.5.1

HTTP server and API layer for the Bamboo agent framework
Documentation
use actix_web::Error as ActixError;
use bytes::Bytes;
use futures::{Stream, StreamExt};

use bamboo_engine::{ForwardStatus, MetricsCollector};
use bamboo_infrastructure::{provider::LLMStream, LLMChunk};

use super::super::usage::{build_estimated_usage, estimate_completion_tokens};
use super::sse;

#[derive(Clone)]
pub(super) struct StreamRuntimeContext {
    pub(super) metrics: MetricsCollector,
    pub(super) forward_id: String,
    pub(super) estimated_prompt_tokens: u64,
}

pub(super) fn build_gemini_event_stream(
    mut stream: LLMStream,
    context: StreamRuntimeContext,
) -> impl Stream<Item = Result<Bytes, ActixError>> {
    async_stream::stream! {
        let mut had_error = false;
        let mut streamed_text = String::new();

        while let Some(chunk) = stream.next().await {
            match chunk {
                Ok(LLMChunk::ResponseId(_)) => {}
                Ok(LLMChunk::Token(token)) => {
                    streamed_text.push_str(&token);
                    match sse::token_chunk_bytes(token) {
                        Ok(bytes) => yield Ok::<_, ActixError>(bytes),
                        Err(error) => {
                            yield Err(error);
                            continue;
                        }
                    }
                }
                Ok(LLMChunk::ReasoningToken(_)) => {}
                Ok(LLMChunk::ToolCalls(tool_calls)) => {
                    for tool_call in tool_calls {
                        match sse::tool_call_chunk_bytes(tool_call) {
                            Ok(bytes) => yield Ok::<_, ActixError>(bytes),
                            Err(error) => {
                                yield Err(error);
                                continue;
                            }
                        }
                    }
                }
                Ok(LLMChunk::Done) => {
                    yield Ok::<_, ActixError>(sse::done_chunk_bytes());
                    break;
                }
                Ok(LLMChunk::CacheUsage { .. }) | Ok(LLMChunk::UsageSummary { .. }) => {}
                Err(error) => {
                    had_error = true;
                    context.metrics.forward_completed(
                        context.forward_id.clone(),
                        chrono::Utc::now(),
                        None,
                        ForwardStatus::Error,
                        None,
                        Some(format!("Stream error: {error}")),
                    );
                    yield Err(sse::stream_error(format!("Stream error: {error}")));
                    break;
                }
            }
        }

        if !had_error {
            let completion_tokens = estimate_completion_tokens(&streamed_text);
            context.metrics.forward_completed(
                context.forward_id,
                chrono::Utc::now(),
                Some(200),
                ForwardStatus::Success,
                Some(build_estimated_usage(
                    context.estimated_prompt_tokens,
                    completion_tokens,
                )),
                None,
            );
        }
    }
}