bamboo-server 2026.4.26

HTTP server and API layer for the Bamboo agent framework
Documentation
use actix_web::{web, HttpResponse};
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use crate::{app_state::AppState, error::AppError};
use bamboo_infrastructure::LLMRequestOptions;

use super::super::helpers::now_unix_ts;
use super::PreparedResponsesRequest;
use worker::{spawn_stream_worker, StreamWorkerArgs};

mod errors;
mod events;
#[cfg(test)]
mod tests;
mod worker;

pub(super) async fn handle_streaming_response(
    app_state: web::Data<AppState>,
    prepared: PreparedResponsesRequest,
    forward_id: String,
) -> Result<HttpResponse, AppError> {
    let display_model = prepared
        .provider_name
        .as_ref()
        .map(|p| format!("{}/{}", p, prepared.resolved_model))
        .unwrap_or_else(|| prepared.resolved_model.clone());

    app_state.metrics_service.collector().forward_started(
        forward_id.clone(),
        "openai.responses",
        display_model.clone(),
        true,
        chrono::Utc::now(),
    );

    let provider = match &prepared.provider_name {
        Some(name) => {
            let model_ref = bamboo_domain::ProviderModelRef::new(name, &prepared.resolved_model);
            app_state.get_provider_for_model_ref(&model_ref)?
        }
        None => app_state.get_provider().await,
    };
    let request_options = LLMRequestOptions {
        session_id: None,
        reasoning_effort: prepared.reasoning_effort,
        parallel_tool_calls: prepared.parallel_tool_calls,
        responses: Some(prepared.responses_options.clone()),
    };
    let stream_result = provider
        .chat_stream_with_options(
            &prepared.internal_messages,
            &prepared.internal_tools,
            prepared.max_tokens,
            prepared.resolved_model.as_str(),
            Some(&request_options),
        )
        .await
        .map_err(errors::map_provider_error)?;

    let (tx, rx) = mpsc::channel::<Result<bytes::Bytes, anyhow::Error>>(10);

    let message_id = format!("msg_{}", uuid::Uuid::new_v4());
    let created_at = now_unix_ts();

    spawn_stream_worker(StreamWorkerArgs {
        stream_result,
        tx,
        metrics: app_state.metrics_service.collector(),
        forward_id,
        fallback_response_id: format!("resp_{}", uuid::Uuid::new_v4()),
        message_id,
        created_at,
        resolved_model: display_model,
        estimated_prompt_tokens: prepared.estimated_prompt_tokens,
    });

    let stream = ReceiverStream::new(rx).map(|result| result.map_err(AppError::InternalError));

    Ok(HttpResponse::Ok()
        .content_type("text/event-stream")
        .streaming(stream))
}