bamboo-agent 2026.4.2

A fully self-contained AI agent backend framework with built-in web services, multi-LLM provider support, and comprehensive tool execution
Documentation
use crate::agent::core::budget::{
    apply_compression_plan, build_forced_compression_plan_with_summary,
    estimate_context_compression_exposure, normalized_trigger_percent, prepare_hybrid_context,
    summary_source_messages, HeuristicTokenCounter, LlmSummarizer, PreparedContext, Summarizer,
    TokenBudget,
};
use crate::agent::core::tools::ToolSchema;
use crate::agent::core::{AgentError, AgentEvent, Role, Session};
use crate::agent::llm::LLMProvider;
use crate::agent::loop_module::config::AgentLoopConfig;
use std::sync::Arc;
use tokio::sync::mpsc;

use super::super::prompt_context::{
    strip_existing_skill_context, strip_existing_tool_guide_context,
};

mod logging;
mod ocr_cache;
mod transforms;

const FORCE_CONTEXT_COMPRESSION_PERCENT: f64 = 98.0;

pub(super) struct PreparedRoundContext {
    pub prepared_context: PreparedContext,
    pub budget: TokenBudget,
}

async fn emit_context_compression_status(
    event_tx: Option<&mpsc::Sender<AgentEvent>>,
    phase_label: &str,
    status: &str,
) {
    let Some(tx) = event_tx else {
        return;
    };
    let _ = tx
        .send(AgentEvent::ContextCompressionStatus {
            phase: phase_label.to_string(),
            status: status.to_string(),
        })
        .await;
}

fn degrade_prompt_context_sections_for_overflow(session: &mut Session) -> Option<&'static str> {
    let Some(system_message) = session
        .messages
        .iter_mut()
        .find(|message| matches!(message.role, Role::System))
    else {
        return None;
    };

    let without_tool_guide = strip_existing_tool_guide_context(&system_message.content);
    if without_tool_guide != system_message.content {
        system_message.content = without_tool_guide;
        return Some("tool_guide_context");
    }

    let without_skill = strip_existing_skill_context(&system_message.content);
    if without_skill != system_message.content {
        system_message.content = without_skill;
        return Some("skill_context");
    }

    None
}

async fn maybe_apply_host_context_compression_with_budget(
    session: &mut Session,
    config: &AgentLoopConfig,
    model_name: &str,
    session_id: &str,
    llm: &Arc<dyn LLMProvider>,
    budget: &TokenBudget,
    event_tx: Option<&mpsc::Sender<AgentEvent>>,
    phase_label: &str,
) -> Result<bool, AgentError> {
    let exposure = estimate_context_compression_exposure(session, model_name, Some(budget));
    let usage_percent = exposure.active_usage_percent;
    let auto_threshold = normalized_trigger_percent(exposure.budget.compression_trigger_percent);
    let host_auto_requested = usage_percent >= auto_threshold;
    let critical_fallback_requested = usage_percent >= FORCE_CONTEXT_COMPRESSION_PERCENT;
    if !host_auto_requested && !critical_fallback_requested {
        return Ok(false);
    }

    let messages = summary_source_messages(session);
    if messages.len() < 3 {
        tracing::warn!(
            "[{}] {} context compression skipped: usage={:.1}%, auto_threshold={:.1}%, critical_threshold={}%, not enough active messages ({})",
            session_id,
            phase_label,
            usage_percent,
            auto_threshold,
            FORCE_CONTEXT_COMPRESSION_PERCENT,
            messages.len()
        );
        return Ok(false);
    }

    let summary_model = config
        .fast_model_name
        .as_deref()
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .unwrap_or(model_name);
    let existing_summary = session
        .conversation_summary
        .as_ref()
        .map(|summary| summary.content.clone());
    let task_list_prompt = session
        .task_list
        .as_ref()
        .map(|_| session.format_task_list_for_prompt())
        .filter(|value| !value.trim().is_empty());

    let summarizer = LlmSummarizer::new(
        Arc::clone(llm),
        summary_model.to_string(),
        existing_summary,
        task_list_prompt,
    );
    emit_context_compression_status(event_tx, phase_label, "started").await;
    let summary = match summarizer.summarize(&messages).await {
        Ok(summary) => summary,
        Err(error) => {
            emit_context_compression_status(event_tx, phase_label, "failed").await;
            return Err(AgentError::Budget(error.to_string()));
        }
    };

    let plan = match build_forced_compression_plan_with_summary(
        session,
        model_name,
        Some(budget),
        summary,
    ) {
        Ok(plan) => plan,
        Err(reason) => {
            tracing::warn!(
                "[{}] {} context compression attempted (usage={:.1}%) but plan build failed: {}",
                session_id,
                phase_label,
                usage_percent,
                reason
            );
            emit_context_compression_status(event_tx, phase_label, "failed").await;
            return Ok(false);
        }
    };

    let compressed_count = apply_compression_plan(session, plan.clone());
    if compressed_count == 0 {
        tracing::warn!(
            "[{}] {} context compression attempted (usage={:.1}%) but did not archive messages",
            session_id,
            phase_label,
            usage_percent
        );
        emit_context_compression_status(event_tx, phase_label, "skipped").await;
        return Ok(false);
    }

    if let Some(storage) = config.storage.as_ref() {
        if let Err(error) = storage.save_session(session).await {
            tracing::warn!(
                "[{}] Failed to persist forced context compression result: {}",
                session_id,
                error
            );
        }
    }

    tracing::info!(
        "[{}] {} context compression applied: usage={:.1}%, auto_threshold={:.1}%, critical_threshold={}%, compressed_messages={}, usage_after_context_window={:.1}%",
        session_id,
        phase_label,
        usage_percent,
        auto_threshold,
        FORCE_CONTEXT_COMPRESSION_PERCENT,
        compressed_count,
        plan.active_usage_after_percent
    );
    emit_context_compression_status(event_tx, phase_label, "completed").await;
    Ok(true)
}

pub(super) async fn maybe_apply_host_context_compression(
    session: &mut Session,
    config: &AgentLoopConfig,
    model_name: &str,
    session_id: &str,
    _tool_schemas: &[ToolSchema],
    llm: &Arc<dyn LLMProvider>,
    event_tx: Option<&mpsc::Sender<AgentEvent>>,
    phase_label: &str,
) -> Result<bool, AgentError> {
    let budget =
        super::token_budget::resolve_token_budget(session, config, model_name, llm.as_ref()).await;
    maybe_apply_host_context_compression_with_budget(
        session,
        config,
        model_name,
        session_id,
        llm,
        &budget,
        event_tx,
        phase_label,
    )
    .await
}

pub(crate) async fn force_overflow_context_recovery(
    session: &mut Session,
    config: &AgentLoopConfig,
    model_name: &str,
    session_id: &str,
    llm: &Arc<dyn LLMProvider>,
    event_tx: Option<&mpsc::Sender<AgentEvent>>,
) -> Result<bool, AgentError> {
    if let Some(degraded_section) = degrade_prompt_context_sections_for_overflow(session) {
        tracing::info!(
            "[{}] Overflow recovery pre-pass degraded prompt section: {}",
            session_id,
            degraded_section,
        );
        emit_context_compression_status(event_tx, "overflow-recovery", "degraded_sections").await;
        return Ok(true);
    }

    let budget =
        super::token_budget::resolve_token_budget(session, config, model_name, llm.as_ref()).await;
    maybe_apply_host_context_compression_with_budget(
        session,
        config,
        model_name,
        session_id,
        llm,
        &budget,
        event_tx,
        "overflow-recovery",
    )
    .await
}

pub(super) async fn prepare_round_context(
    session: &mut Session,
    config: &AgentLoopConfig,
    model_name: &str,
    session_id: &str,
    _tool_schemas: &[ToolSchema],
    llm: &Arc<dyn LLMProvider>,
    event_tx: Option<&mpsc::Sender<AgentEvent>>,
) -> Result<PreparedRoundContext, AgentError> {
    ocr_cache::maybe_cache_ocr_results(session, config, session_id).await;

    let budget =
        super::token_budget::resolve_token_budget(session, config, model_name, llm.as_ref()).await;

    let counter = HeuristicTokenCounter::default();

    if maybe_apply_host_context_compression_with_budget(
        session, config, model_name, session_id, llm, &budget, event_tx, "pre-turn",
    )
    .await?
    {
        tracing::debug!(
            "[{}] Recomputing prepared context after forced compression fallback",
            session_id
        );
    }

    let mut prepared_context = prepare_hybrid_context(session, &budget, &counter)
        .map_err(|error| AgentError::Budget(error.to_string()))?;

    transforms::apply_message_transforms(config, &mut prepared_context, llm, session_id).await?;
    logging::log_context_truncation(session_id, &prepared_context);

    Ok(PreparedRoundContext {
        prepared_context,
        budget,
    })
}

#[cfg(test)]
mod tests;