bamboo-server 2026.5.3

HTTP server and API layer for the Bamboo agent framework
Documentation
use actix_web::{http::StatusCode, web, HttpResponse};
use async_stream::stream;
use bytes::Bytes;
use serde_json::json;

use crate::{app_state::AppState, error::AppError};
use bamboo_engine::metrics::types::ForwardStatus;
use bamboo_infrastructure::LLMRequestOptions;

use super::PreparedCompleteRequest;
use crate::handlers::anthropic::{
    conversion::convert_llm_chunk_to_openai,
    errors::{anthropic_error_response, AnthropicError},
    stream::{format_sse_data, map_completion_stream_chunk},
    usage::{build_estimated_usage, estimate_completion_tokens},
};

pub(super) async fn handle_streaming_complete(
    app_state: web::Data<AppState>,
    prepared: PreparedCompleteRequest,
    forward_id: String,
) -> Result<HttpResponse, AppError> {
    let PreparedCompleteRequest {
        mapped_model,
        response_model,
        internal_messages,
        internal_tools,
        max_tokens,
        reasoning_effort,
        estimated_prompt_tokens,
    } = prepared;

    app_state.metrics_service.collector().forward_started(
        forward_id.clone(),
        "anthropic.complete",
        mapped_model.clone(),
        true,
        chrono::Utc::now(),
    );

    let provider = app_state.get_provider().await;
    let stream_result = provider
        .chat_stream_with_options(
            &internal_messages,
            &internal_tools,
            max_tokens,
            mapped_model.as_str(),
            Some(&LLMRequestOptions {
                session_id: None,
                reasoning_effort,
                parallel_tool_calls: None,
                responses: None,
            }),
        )
        .await;

    let mut stream = match stream_result {
        Ok(stream) => stream,
        Err(error) => {
            app_state.metrics_service.collector().forward_completed(
                forward_id.clone(),
                chrono::Utc::now(),
                None,
                ForwardStatus::Error,
                None,
                Some(format!("Upstream API error: {error}")),
            );
            return Ok(anthropic_error_response(AnthropicError::new(
                StatusCode::BAD_GATEWAY,
                "api_error",
                format!("Upstream API error: {error}"),
            )));
        }
    };

    let metrics = app_state.metrics_service.collector();
    let forward_id_clone = forward_id.clone();

    let stream = stream! {
        use futures::StreamExt;
        let mut had_error = false;
        let mut completion_text = String::new();

        while let Some(chunk_result) = stream.next().await {
            match chunk_result {
                Ok(chunk) => {
                    if let bamboo_infrastructure::types::LLMChunk::Token(text) = &chunk {
                        completion_text.push_str(text);
                    }
                    if let Some(openai_chunk) = convert_llm_chunk_to_openai(chunk, &response_model) {
                        let payload = map_completion_stream_chunk(&openai_chunk, &response_model);
                        if !payload.is_empty() {
                            yield Ok::<Bytes, AppError>(Bytes::from(payload));
                        }
                    }
                }
                Err(error) => {
                    had_error = true;
                    metrics.forward_completed(
                        forward_id_clone.clone(),
                        chrono::Utc::now(),
                        None,
                        ForwardStatus::Error,
                        None,
                        Some(format!("Stream error: {error}")),
                    );
                    let payload = format_sse_data(json!({
                        "type": "error",
                        "error": {
                            "type": "api_error",
                            "message": format!("Stream error: {}", error)
                        }
                    }));
                    yield Ok::<Bytes, AppError>(Bytes::from(payload));
                    yield Ok::<Bytes, AppError>(Bytes::from("data: [DONE]\n\n"));
                    break;
                }
            }
        }
        yield Ok::<Bytes, AppError>(Bytes::from("data: [DONE]\n\n"));

        if !had_error {
            let completion_tokens = estimate_completion_tokens(&completion_text);
            metrics.forward_completed(
                forward_id_clone,
                chrono::Utc::now(),
                Some(200),
                ForwardStatus::Success,
                Some(build_estimated_usage(
                    estimated_prompt_tokens,
                    completion_tokens,
                )),
                None,
            );
        }
    };

    Ok(HttpResponse::Ok()
        .content_type("text/event-stream")
        .streaming(stream))
}