use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::runtime::config::AgentLoopConfig;
use crate::runtime::guardian_state::{
ensure_guardian_state, guardian_read_only_disabled_tools, write_guardian_config,
write_guardian_state, GuardianPhase, GUARDIAN_REVIEW_RUBRIC,
};
use crate::runtime::runner::loop_execution::startup::{
resolve_auxiliary_models, InFlightTaskEvaluation, LoopRunState,
};
use crate::runtime::runner::prompt_context::PromptMemoryRuntimeContext;
use crate::runtime::runner::session_setup::tool_schemas::resolve_available_tool_schemas_for_session;
use crate::runtime::stream::handler::StreamHandlingOutput;
use crate::runtime::task_context::TaskLoopContext;
use bamboo_agent_core::tools::ToolExecutor;
use bamboo_agent_core::{AgentError, AgentEvent, Message, Session};
use bamboo_domain::session::runtime_state::{
AgentRuntimeState, AgentStatusState, ChildWaitPolicy, SuspensionState, WaitingForChildrenState,
};
use bamboo_llm::LLMProvider;
use bamboo_metrics::{
MetricsCollector, RoundStatus as MetricsRoundStatus, SessionStatus as MetricsSessionStatus,
TokenUsage as MetricsTokenUsage,
};
use super::super::to_event_token_usage;
use super::gold::{
apply_completed_gold_evaluation, drain_in_flight_gold_evaluation, evaluate_gold_terminal,
poll_completed_gold_evaluation, spawn_gold_evaluation_if_needed,
start_queued_gold_evaluation_if_idle, GoldTerminalDecision,
};
use crate::runtime::runner::state_bridge;
const MAX_LLM_TURN_ATTEMPTS: usize = 3;
const LLM_RETRY_BASE_DELAY_MS: u64 = 400;
fn should_retry_turn_error(error: &AgentError) -> bool {
let AgentError::LLM(message) = error else {
return false;
};
let message = message.trim().to_ascii_lowercase();
if message.is_empty() {
return false;
}
let non_retryable_patterns = [
"authentication error",
"invalid api key",
"invalid_request_error",
"unsupported model",
"model_name is required",
"http 400",
"http 401",
"http 403",
"http 404",
];
!non_retryable_patterns
.iter()
.any(|pattern| message.contains(pattern))
}
fn is_overflow_recoverable(error: &AgentError) -> bool {
matches!(error, AgentError::LLMOverflow(_))
}
struct TurnOutcome {
should_break: bool,
sent_complete: bool,
}
fn is_terminal_child_status(status: &str) -> bool {
matches!(
status,
"completed" | "error" | "timeout" | "cancelled" | "skipped"
)
}
async fn suspend_to_wait_for_children(
session: &mut Session,
runtime_state: &mut AgentRuntimeState,
persistence: Option<&Arc<dyn bamboo_domain::RuntimeSessionPersistence>>,
child_session_ids: Vec<String>,
wait_for: ChildWaitPolicy,
) -> TurnOutcome {
let now = Utc::now();
let count = child_session_ids.len();
runtime_state.waiting_for_children = Some(WaitingForChildrenState::for_children(
child_session_ids,
wait_for,
now,
));
state_bridge::write_runtime_state(session, runtime_state);
session.metadata.insert(
"runtime.suspend_reason".to_string(),
"waiting_for_children".to_string(),
);
session.updated_at = now;
if let Some(persistence) = persistence {
if let Err(error) = persistence.save_runtime_session(session).await {
tracing::warn!(
"[{}] suspend-to-wait failed to persist parent wait on {} child(ren): {}",
session.id,
count,
error
);
}
}
TurnOutcome {
should_break: true,
sent_complete: false,
}
}
async fn maybe_suspend_for_orphaned_children(
session: &mut Session,
config: &AgentLoopConfig,
runtime_state: &mut AgentRuntimeState,
) -> Option<TurnOutcome> {
if runtime_state.waiting_for_children.is_some() {
return None;
}
let storage = config.storage.as_ref()?;
let mut active: Vec<String> = storage
.list_child_run_statuses(&session.id)
.await
.unwrap_or_default()
.into_iter()
.filter(|(_, status)| !status.as_deref().is_some_and(is_terminal_child_status))
.map(|(id, _)| id)
.collect();
if active.is_empty() {
return None;
}
active.sort();
active.dedup();
tracing::info!(
"[{}] end-of-turn safety net: suspending to wait for {} orphaned child session(s) the model did not explicitly wait on",
session.id,
active.len(),
);
Some(
suspend_to_wait_for_children(
session,
runtime_state,
config.persistence.as_ref(),
active,
ChildWaitPolicy::All,
)
.await,
)
}
fn build_guardian_review_prompt(
task_context: &Option<TaskLoopContext>,
config: &AgentLoopConfig,
) -> String {
let mut prompt = String::from(GUARDIAN_REVIEW_RUBRIC);
let criteria: Vec<String> = task_context
.as_ref()
.and_then(|ctx| {
ctx.items
.iter()
.find(|item| Some(&item.id) == ctx.active_item_id.as_ref())
})
.map(|item| item.completion_criteria.clone())
.unwrap_or_default();
if !criteria.is_empty() {
prompt.push_str("\n\n## Completion criteria (verify EACH against real evidence)\n");
for (idx, criterion) in criteria.iter().enumerate() {
prompt.push_str(&format!("{}. {}\n", idx + 1, criterion));
}
}
let goal = config.active_goal();
if let Some(goal) = goal {
prompt.push_str("\n\n## Session goal\n");
prompt.push_str(goal);
prompt.push('\n');
}
if criteria.is_empty() && goal.is_none() {
prompt.push_str(
"\n\n(No explicit completion criteria or goal were provided; review the diff for correctness, completeness, and obvious bugs.)\n",
);
}
prompt
}
async fn maybe_spawn_guardian_review(
session: &mut Session,
config: &AgentLoopConfig,
task_context: &Option<TaskLoopContext>,
runtime_state: &mut AgentRuntimeState,
iteration: u32,
) -> Option<TurnOutcome> {
if runtime_state.waiting_for_children.is_some() {
return None;
}
if !config.guardian_active() {
return None;
}
let spawner = config.guardian_spawner.as_ref()?;
let max_reviews = config.guardian_max_reviews();
let mut guardian_state = ensure_guardian_state(session);
match guardian_state.phase {
GuardianPhase::Pending => return None,
GuardianPhase::Reviewed => {
if guardian_state.last_approved() {
return None;
}
if guardian_state.budget_exhausted(max_reviews) {
tracing::warn!(
"[{}] guardian: review budget ({}) exhausted with unresolved findings; allowing completion",
session.id,
max_reviews
);
return None;
}
}
GuardianPhase::None => {
if guardian_state.budget_exhausted(max_reviews) {
return None;
}
}
}
if let Some(guardian_config) = config.guardian_config.as_ref() {
write_guardian_config(session, guardian_config);
}
let review_prompt = build_guardian_review_prompt(task_context, config);
let Some(model) = config
.guardian_model()
.map(str::to_string)
.or_else(|| config.model_name.clone())
.map(|model| model.trim().to_string())
.filter(|model| !model.is_empty())
else {
tracing::warn!(
"[{}] guardian: no reviewer model resolved; skipping review at this terminal",
session.id
);
return None;
};
let disabled_tools = Some(guardian_read_only_disabled_tools());
match spawner
.spawn_guardian_review(session, review_prompt, model, disabled_tools)
.await
{
Ok(child_id) => {
guardian_state.record_spawn(&child_id);
guardian_state.last_reviewed_at_round = iteration;
let pass = guardian_state.review_count;
write_guardian_state(session, guardian_state);
tracing::info!(
"[{}] guardian: spawned read-only review child {} (pass {}/{}); suspending until verdict",
session.id,
child_id,
pass,
max_reviews
);
Some(
suspend_to_wait_for_children(
session,
runtime_state,
config.persistence.as_ref(),
vec![child_id],
ChildWaitPolicy::All,
)
.await,
)
}
Err(error) => {
tracing::warn!(
"[{}] guardian: failed to spawn review child: {}; allowing completion",
session.id,
error
);
None
}
}
}
fn map_turn_error_status(error: &AgentError) -> (MetricsRoundStatus, MetricsSessionStatus) {
if matches!(error, AgentError::Cancelled) {
(
MetricsRoundStatus::Cancelled,
MetricsSessionStatus::Cancelled,
)
} else {
(MetricsRoundStatus::Error, MetricsSessionStatus::Error)
}
}
fn record_turn_failure(
metrics_collector: Option<&MetricsCollector>,
round_id: &str,
session_id: &str,
message_count: u32,
error: &AgentError,
) {
let (round_status, session_status) = map_turn_error_status(error);
crate::runtime::runner::metrics_lifecycle::record_round_and_session_error(
metrics_collector,
round_id,
session_id,
message_count,
round_status,
Some(error.to_string()),
session_status,
);
}
async fn poll_completed_task_evaluation(state: &mut LoopRunState) {
let finished = state
.task_evaluation
.in_flight
.as_ref()
.is_some_and(|in_flight| in_flight.join_handle.is_finished());
if !finished {
return;
}
let Some(in_flight) = state.task_evaluation.in_flight.take() else {
return;
};
match in_flight.join_handle.await {
Ok(result) => {
state.task_evaluation.completed = Some(result);
}
Err(error) => {
tracing::warn!(
"[{}] Async task evaluation join failed for round {}: {}",
state.session_id,
in_flight.request.round_number,
error
);
}
}
}
async fn drain_in_flight_task_evaluation(state: &mut LoopRunState) {
if state.task_evaluation.completed.is_some() {
return;
}
let Some(in_flight) = state.task_evaluation.in_flight.take() else {
return;
};
match in_flight.join_handle.await {
Ok(result) => {
state.task_evaluation.completed = Some(result);
}
Err(error) => {
tracing::warn!(
"[{}] Async task evaluation join failed while draining round {}: {}",
state.session_id,
in_flight.request.round_number,
error
);
}
}
}
async fn apply_completed_task_evaluation(
session: &mut Session,
event_tx: &mpsc::Sender<AgentEvent>,
config: &AgentLoopConfig,
state: &mut LoopRunState,
) {
let Some(result) = state.task_evaluation.completed.take() else {
return;
};
let apply_outcome = crate::runtime::runner::task_lifecycle::apply_task_evaluation_result(
&mut state.task_context,
session,
&state.session_id,
result.clone(),
);
let synthetic_round_id = format!(
"{}-task-evaluation-round-{}",
state.session_id, result.round_number
);
crate::runtime::runner::metrics_lifecycle::record_round_started(
state.metrics_collector.as_ref(),
&synthetic_round_id,
&state.session_id,
result.model_name.as_str(),
);
crate::runtime::runner::metrics_lifecycle::record_round_completed(
state.metrics_collector.as_ref(),
&synthetic_round_id,
&state.session_id,
session.messages.len() as u32,
if apply_outcome.stale {
MetricsRoundStatus::Cancelled
} else {
MetricsRoundStatus::Success
},
apply_outcome.usage,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_outputs)
.unwrap_or(0)
.min(u32::MAX as usize) as u32,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_tokens_saved)
.unwrap_or(0),
None,
);
if !apply_outcome.stale && apply_outcome.applied_updates > 0 {
if let Some(ref ctx) = state.task_context {
let task_list_title = result
.task_list_title
.or_else(|| {
session
.task_list
.as_ref()
.map(|task_list| task_list.title.clone())
})
.unwrap_or_else(|| "Agent Tasks".to_string());
session.set_task_list_version_meta(ctx.version.to_string());
let task_list = ctx.to_task_list_with_title(task_list_title);
session.set_task_list(task_list.clone());
crate::runtime::runner::tool_execution::persist_shared_task_list(
config,
session,
&result.shared_session_id,
&state.session_id,
&task_list,
)
.await;
let _ = event_tx
.send(AgentEvent::TaskListUpdated { task_list })
.await;
}
}
}
fn spawn_task_evaluation_request(
state: &mut LoopRunState,
event_tx: &mpsc::Sender<AgentEvent>,
request: crate::runtime::runner::task_lifecycle::AsyncTaskEvaluationRequest,
llm: Arc<dyn LLMProvider>,
) {
let task_round = request.round_number;
let session_id = state.session_id.clone();
let event_tx = event_tx.clone();
let request_for_spawn = request.clone();
let join_handle = tokio::spawn(async move {
crate::runtime::runner::task_lifecycle::execute_async_task_evaluation(
request_for_spawn,
llm,
event_tx,
)
.await
});
tracing::debug!(
"[{}] Spawned async task evaluation for round {}",
session_id,
task_round
);
state.task_evaluation.in_flight = Some(InFlightTaskEvaluation {
request,
join_handle,
});
}
fn spawn_task_evaluation_if_needed(
turn: usize,
session: &Session,
event_tx: &mpsc::Sender<AgentEvent>,
config: &AgentLoopConfig,
state: &mut LoopRunState,
llm: Arc<dyn LLMProvider>,
) -> Result<(), AgentError> {
let task_list_dirty = state
.task_context
.as_ref()
.is_some_and(|ctx| ctx.task_list_dirty);
if !task_list_dirty {
return Ok(());
}
if let Some(ctx) = state.task_context.as_mut() {
ctx.task_list_dirty = false;
}
let eval_model = state
.auxiliary_models
.fast_model_name
.as_deref()
.or(Some(state.model_name.as_str()));
let request = crate::runtime::runner::task_lifecycle::build_async_task_evaluation_request(
&state.task_context,
session,
&state.session_id,
turn + 1,
eval_model,
config.reasoning_effort,
)?;
let Some(request) = request else {
return Ok(());
};
if state.task_evaluation.in_flight.is_some() {
state.task_evaluation.queued_request = Some(request);
tracing::debug!(
"[{}] Queued latest async task evaluation snapshot for round {} while another evaluation is still in flight",
state.session_id,
turn + 1
);
return Ok(());
}
spawn_task_evaluation_request(state, event_tx, request, llm);
Ok(())
}
fn refresh_auxiliary_models_for_round(state: &mut LoopRunState, config: &AgentLoopConfig) {
state.auxiliary_models = resolve_auxiliary_models(config);
state.runtime_state.llm.fast_model_name = state.auxiliary_models.fast_model_name.clone();
state.runtime_state.llm.background_model_name =
state.auxiliary_models.background_model_name.clone();
}
#[allow(clippy::too_many_arguments)]
async fn handle_no_tool_calls(
content: String,
reasoning: Option<String>,
prompt_tokens: u64,
completion_tokens: u64,
round_usage: MetricsTokenUsage,
session: &mut Session,
event_tx: &mpsc::Sender<AgentEvent>,
metrics_collector: Option<&MetricsCollector>,
round_id: &str,
session_id: &str,
config: &AgentLoopConfig,
task_context: &Option<TaskLoopContext>,
eval_model: &str,
iteration: u32,
llm: Arc<dyn LLMProvider>,
) -> TurnOutcome {
session.add_message(Message::assistant_with_reasoning(content, None, reasoning));
let decision = evaluate_gold_terminal(
session,
task_context,
config,
eval_model,
config.reasoning_effort,
session_id,
iteration,
llm,
event_tx,
)
.await;
let outcome = match decision {
GoldTerminalDecision::Continue { continuation_count } => {
tracing::info!(
"[{}] Goal terminal gate: continuing toward goal (continuation {})",
session_id,
continuation_count
);
TurnOutcome {
should_break: false,
sent_complete: false,
}
}
GoldTerminalDecision::Stop => {
let _ = event_tx
.send(AgentEvent::Complete {
usage: to_event_token_usage(prompt_tokens, completion_tokens),
})
.await;
TurnOutcome {
should_break: true,
sent_complete: true,
}
}
};
crate::runtime::runner::metrics_lifecycle::record_round_completed(
metrics_collector,
round_id,
session_id,
session.messages.len() as u32,
MetricsRoundStatus::Success,
round_usage,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_outputs)
.unwrap_or(0)
.min(u32::MAX as usize) as u32,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_tokens_saved)
.unwrap_or(0),
None,
);
outcome
}
async fn handle_tool_calls_path(
frame: &crate::runtime::runner::round_frame::RoundFrame<'_>,
stream_output: StreamHandlingOutput,
mut round_usage: MetricsTokenUsage,
session: &mut Session,
auxiliary_models: &crate::runtime::config::AuxiliaryModelConfig,
model_name: &str,
task_context: &mut Option<TaskLoopContext>,
) -> Result<TurnOutcome, AgentError> {
let reasoning = (!stream_output.reasoning_content.trim().is_empty())
.then_some(stream_output.reasoning_content);
session.add_message(Message::assistant_with_reasoning(
stream_output.content,
Some(stream_output.tool_calls.clone()),
reasoning,
));
let compression_model = Some(model_name.to_string())
.or_else(|| (!session.model.trim().is_empty()).then_some(session.model.trim().to_string()));
if compression_model.is_none() {
tracing::warn!(
"[{}] Skipping mid-turn context compression after tool execution: missing model name",
frame.session_id
);
}
let tool_schemas =
resolve_available_tool_schemas_for_session(frame.config, frame.tools.as_ref(), session);
let tool_execution = crate::runtime::runner::tool_execution::execute_round_tool_calls(
&stream_output.tool_calls,
frame,
session,
task_context,
compression_model
.as_deref()
.or(auxiliary_models.background_model_name.as_deref()),
auxiliary_models
.summarization_model_provider
.as_ref()
.or(auxiliary_models.background_model_provider.as_ref()),
&tool_schemas,
)
.await?;
let mut awaiting_clarification = false;
let mut waiting_for_children = false;
let mut round_status = MetricsRoundStatus::Success;
let mut round_error: Option<String> = None;
if tool_execution.round_status != MetricsRoundStatus::Success {
round_status = tool_execution.round_status;
}
if let Some(e) = tool_execution.round_error {
round_error = Some(e);
}
if tool_execution.awaiting_clarification {
awaiting_clarification = true;
}
if tool_execution.waiting_for_children {
waiting_for_children = true;
}
if awaiting_clarification || waiting_for_children {
crate::runtime::runner::metrics_lifecycle::record_round_completed(
frame.metrics_collector,
frame.round_id,
frame.session_id,
session.messages.len() as u32,
round_status,
round_usage,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_outputs)
.unwrap_or(0)
.min(u32::MAX as usize) as u32,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_tokens_saved)
.unwrap_or(0),
round_error,
);
return Ok(TurnOutcome {
should_break: true,
sent_complete: false,
});
}
if frame.debug_enabled {
tracing::debug!(
"[{}] round_complete: {}",
frame.session_id,
serde_json::json!({
"round": frame.turn + 1,
"message_count": session.messages.len(),
})
);
}
let _complexity = if frame.config.features_dynamic_model_routing {
let round_tool_calls = &stream_output.tool_calls;
let classifier_model = auxiliary_models
.fast_model_name
.as_deref()
.or(Some(model_name));
let _classifier_provider = auxiliary_models
.fast_model_provider
.clone()
.unwrap_or_else(|| frame.llm.clone());
if let Some(_model) = classifier_model {
let complexity = heuristic_complexity(round_tool_calls);
tracing::info!(
"[{}] Dynamic model routing: round {} complexity={:?}",
frame.session_id,
frame.turn + 1,
complexity
);
session.metadata.insert(
"last_round_complexity".to_string(),
format!("{:?}", complexity),
);
Some(complexity)
} else {
None
}
} else {
None
};
round_usage.recompute_total();
crate::runtime::runner::metrics_lifecycle::record_round_completed(
frame.metrics_collector,
frame.round_id,
frame.session_id,
session.messages.len() as u32,
round_status,
round_usage,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_outputs)
.unwrap_or(0)
.min(u32::MAX as usize) as u32,
session
.token_usage
.as_ref()
.map(|usage| usage.prompt_cached_tool_tokens_saved)
.unwrap_or(0),
round_error,
);
Ok(TurnOutcome {
should_break: false,
sent_complete: false,
})
}
pub(super) async fn run_pipeline(
session: &mut Session,
event_tx: &mpsc::Sender<AgentEvent>,
llm: Arc<dyn LLMProvider>,
tools: Arc<dyn ToolExecutor>,
cancel_token: &CancellationToken,
config: &AgentLoopConfig,
state: &mut LoopRunState,
) -> super::super::Result<bool> {
let mut sent_complete = false;
let mut turn_counter: u32 = 0;
loop {
refresh_auxiliary_models_for_round(state, config);
poll_completed_task_evaluation(state).await;
apply_completed_task_evaluation(session, event_tx, config, state).await;
if state.task_evaluation.in_flight.is_none() {
if let Some(request) = state.task_evaluation.queued_request.take() {
let eval_provider = state
.auxiliary_models
.fast_model_provider
.clone()
.unwrap_or_else(|| llm.clone());
spawn_task_evaluation_request(state, event_tx, request, eval_provider);
}
}
poll_completed_gold_evaluation(state).await;
apply_completed_gold_evaluation(session, config, state).await;
start_queued_gold_evaluation_if_idle(
state,
event_tx,
state
.auxiliary_models
.fast_model_provider
.clone()
.unwrap_or_else(|| llm.clone()),
);
state.runtime_state.round.current_round = turn_counter;
let round_id = format!("{}-round-{}", state.session_id, turn_counter + 1);
state.runtime_state.round.last_round_id = Some(round_id.clone());
let runtime_context = PromptMemoryRuntimeContext {
llm: state
.auxiliary_models
.background_model_provider
.clone()
.unwrap_or_else(|| llm.clone()),
background_model_name: state.auxiliary_models.background_model_name.clone(),
};
crate::runtime::runner::round_prelude::refresh_round_prompt_context(
session,
config.prompt_memory_flags,
Some(&runtime_context),
)
.await;
if let Some(ctx) = state.task_context.as_mut() {
ctx.current_round = turn_counter;
ctx.max_rounds = config.max_rounds as u32;
}
if state.debug_logger.enabled {
tracing::debug!(
"[{}] round_start: {}",
state.session_id,
serde_json::json!({
"round": turn_counter + 1,
"total_rounds": config.max_rounds,
"message_count": session.messages.len(),
})
);
}
let _ = event_tx
.send(AgentEvent::RunnerProgress {
session_id: state.session_id.clone(),
round_count: turn_counter,
})
.await;
state_bridge::merge_pending_injected_messages(
session,
config.storage.as_ref(),
config.persistence.as_ref(),
)
.await;
if cancel_token.is_cancelled() {
crate::runtime::runner::metrics_lifecycle::record_session_cancelled(
state.metrics_collector.as_ref(),
&state.session_id,
session.messages.len() as u32,
);
return Err(AgentError::Cancelled);
}
crate::runtime::runner::metrics_lifecycle::record_round_started(
state.metrics_collector.as_ref(),
&round_id,
&state.session_id,
&state.model_name,
);
let tool_schemas =
resolve_available_tool_schemas_for_session(config, tools.as_ref(), session);
let mut overflow_recovery_attempted = false;
let mut turn_outcome: Option<TurnOutcome> = None;
let mut terminal_error: Option<AgentError> = None;
for attempt in 1..=MAX_LLM_TURN_ATTEMPTS {
let llm_output = match crate::runtime::runner::round_lifecycle::execute_llm_round(
session,
config,
&llm,
event_tx,
cancel_token,
&state.session_id,
&state.model_name,
&tool_schemas,
)
.await
{
Ok(output) => output,
Err(error) => {
if is_overflow_recoverable(&error) && !overflow_recovery_attempted {
overflow_recovery_attempted = true;
if !state.overflow_recovery.can_attempt_recovery() {
let breaker_error = AgentError::LLMOverflow(format!(
"overflow recovery circuit breaker opened after {} consecutive recoveries",
state.overflow_recovery.consecutive_recoveries
));
tracing::error!(
"[{}] Turn {} overflow recovery skipped by circuit breaker: {}",
state.session_id,
turn_counter + 1,
breaker_error,
);
terminal_error = Some(breaker_error);
break;
}
tracing::warn!(
"[{}] Turn {} detected overflow error (attempt {}/{}): {}. Trying forced overflow recovery.",
state.session_id,
turn_counter + 1,
attempt,
MAX_LLM_TURN_ATTEMPTS,
error,
);
let recovered =
crate::runtime::runner::round_lifecycle::force_overflow_context_recovery(
session,
config,
&state.model_name,
&state.session_id,
&llm,
Some(event_tx),
)
.await?;
if recovered {
state
.overflow_recovery
.record_recovery(turn_counter as usize);
tracing::info!(
"[{}] Overflow recovery applied: total_recoveries={}, consecutive_recoveries={}, turn={}",
state.session_id,
state.overflow_recovery.total_recoveries,
state.overflow_recovery.consecutive_recoveries,
turn_counter + 1,
);
let tool_schemas_after_recovery =
resolve_available_tool_schemas_for_session(
config,
tools.as_ref(),
session,
);
match crate::runtime::runner::round_lifecycle::execute_llm_round(
session,
config,
&llm,
event_tx,
cancel_token,
&state.session_id,
&state.model_name,
&tool_schemas_after_recovery,
)
.await
{
Ok(output) => output,
Err(recovery_error) => {
tracing::error!(
"[{}] Turn {} overflow recovery retry failed: {}",
state.session_id,
turn_counter + 1,
recovery_error,
);
terminal_error = Some(recovery_error);
break;
}
}
} else {
tracing::error!(
"[{}] Turn {} overflow recovery was attempted but no compression was applied.",
state.session_id,
turn_counter + 1,
);
terminal_error = Some(error);
break;
}
} else if should_retry_turn_error(&error) && attempt < MAX_LLM_TURN_ATTEMPTS {
let delay_ms = LLM_RETRY_BASE_DELAY_MS * (1u64 << (attempt - 1));
tracing::warn!(
"[{}] Turn {} LLM call failed (attempt {}/{}): {}. Retrying in {}ms",
state.session_id,
turn_counter + 1,
attempt,
MAX_LLM_TURN_ATTEMPTS,
error,
delay_ms
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
continue;
} else {
tracing::error!(
"[{}] Turn {} LLM call failed terminally (attempt {}/{}): {}",
state.session_id,
turn_counter + 1,
attempt,
MAX_LLM_TURN_ATTEMPTS,
error,
);
terminal_error = Some(error);
break;
}
}
};
let stream_output = llm_output.stream_output;
if stream_output.tool_calls.is_empty() {
if let Some(suspend) =
maybe_suspend_for_orphaned_children(session, config, &mut state.runtime_state)
.await
{
turn_outcome = Some(suspend);
break;
}
if let Some(review) = maybe_spawn_guardian_review(
session,
config,
&state.task_context,
&mut state.runtime_state,
turn_counter + 1,
)
.await
{
turn_outcome = Some(review);
break;
}
let reasoning = (!stream_output.reasoning_content.trim().is_empty())
.then_some(stream_output.reasoning_content);
let eval_model = state
.auxiliary_models
.fast_model_name
.clone()
.unwrap_or_else(|| state.model_name.clone());
turn_outcome = Some(
handle_no_tool_calls(
stream_output.content,
reasoning,
llm_output.prompt_tokens,
llm_output.completion_tokens,
llm_output.round_usage,
session,
event_tx,
state.metrics_collector.as_ref(),
&round_id,
&state.session_id,
config,
&state.task_context,
&eval_model,
turn_counter + 1,
llm.clone(),
)
.await,
);
break;
}
let frame = crate::runtime::runner::round_frame::RoundFrame {
session_id: &state.session_id,
round_id: &round_id,
turn: turn_counter as usize,
debug_enabled: state.debug_logger.enabled,
event_tx,
metrics_collector: state.metrics_collector.as_ref(),
config,
llm: &llm,
tools: &tools,
};
match handle_tool_calls_path(
&frame,
stream_output,
llm_output.round_usage,
session,
&state.auxiliary_models,
&state.model_name,
&mut state.task_context,
)
.await
{
Ok(outcome) => {
turn_outcome = Some(outcome);
break;
}
Err(error) => {
if should_retry_turn_error(&error) && attempt < MAX_LLM_TURN_ATTEMPTS {
let delay_ms = LLM_RETRY_BASE_DELAY_MS * (1u64 << (attempt - 1));
tracing::warn!(
"[{}] Turn {} post-LLM handling failed (attempt {}/{}): {}. Retrying in {}ms",
state.session_id,
turn_counter + 1,
attempt,
MAX_LLM_TURN_ATTEMPTS,
error,
delay_ms
);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
continue;
}
tracing::error!(
"[{}] Turn {} post-LLM handling failed terminally (attempt {}/{}): {}",
state.session_id,
turn_counter + 1,
attempt,
MAX_LLM_TURN_ATTEMPTS,
error,
);
terminal_error = Some(error);
break;
}
}
}
if let Some(error) = terminal_error {
record_turn_failure(
state.metrics_collector.as_ref(),
&round_id,
&state.session_id,
session.messages.len() as u32,
&error,
);
return Err(error);
}
let Some(outcome) = turn_outcome else {
let error = AgentError::LLM(format!(
"[{}] turn {} completed without outcome",
state.session_id,
turn_counter + 1
));
record_turn_failure(
state.metrics_collector.as_ref(),
&round_id,
&state.session_id,
session.messages.len() as u32,
&error,
);
return Err(error);
};
if !overflow_recovery_attempted {
state.overflow_recovery.reset_after_stable_round();
}
state.runtime_state.memory.overflow_recovery_total =
state.overflow_recovery.total_recoveries as u32;
state.runtime_state.memory.overflow_recovery_consecutive =
state.overflow_recovery.consecutive_recoveries as u32;
match session
.metadata
.get("runtime.suspend_reason")
.map(String::as_str)
{
Some("awaiting_clarification") => {
state.runtime_state.status = AgentStatusState::Suspended;
state.runtime_state.suspension = Some(SuspensionState {
reason: "awaiting_clarification".to_string(),
suspended_at: Utc::now(),
resumable: true,
hook_point: Some("AfterToolExecution".to_string()),
});
}
Some("awaiting_parent_approval") => {
state.runtime_state.status = AgentStatusState::Suspended;
state.runtime_state.suspension = Some(SuspensionState {
reason: "awaiting_parent_approval".to_string(),
suspended_at: Utc::now(),
resumable: true,
hook_point: Some("AfterToolExecution".to_string()),
});
}
Some("waiting_for_children") => {
state.runtime_state.status = AgentStatusState::Suspended;
state.runtime_state.suspension = Some(SuspensionState {
reason: "waiting_for_children".to_string(),
suspended_at: Utc::now(),
resumable: true,
hook_point: Some("AfterToolExecution".to_string()),
});
if let Some(storage) = config.storage.as_ref() {
if let Ok(Some(persisted)) = storage.load_session(&state.session_id).await {
if let Some(runtime_state) = persisted.agent_runtime_state {
state.runtime_state.waiting_for_children =
runtime_state.waiting_for_children;
}
let existing_ids: std::collections::HashSet<String> = session
.messages
.iter()
.map(|message| message.id.clone())
.collect();
let mut appended = 0usize;
for message in persisted.messages {
let hidden_runtime_resume = message
.metadata
.as_ref()
.and_then(|metadata| metadata.get("runtime_kind"))
.and_then(|value| value.as_str())
.is_some_and(|kind| {
matches!(
kind,
"child_completion_resume" | "guardian_review_resume"
)
});
if hidden_runtime_resume && !existing_ids.contains(message.id.as_str())
{
session.messages.push(message);
appended += 1;
}
}
if appended > 0 {
tracing::info!(
"[{}] Preserved {} hidden child-completion resume message(s) during parent suspension save",
state.session_id,
appended
);
}
}
}
}
_ => {}
}
state_bridge::write_runtime_state(session, &state.runtime_state);
sent_complete = sent_complete || outcome.sent_complete;
if outcome.should_break {
break;
}
if let Err(error) = spawn_task_evaluation_if_needed(
turn_counter as usize,
session,
event_tx,
config,
state,
state
.auxiliary_models
.fast_model_provider
.clone()
.unwrap_or_else(|| llm.clone()),
) {
tracing::warn!(
"[{}] Failed to spawn async task evaluation after round {}: {}",
state.session_id,
turn_counter + 1,
error
);
}
if let Err(error) = spawn_gold_evaluation_if_needed(
turn_counter as usize,
session,
event_tx,
config,
state,
state
.auxiliary_models
.fast_model_provider
.clone()
.unwrap_or_else(|| llm.clone()),
) {
tracing::warn!(
"[{}] Failed to spawn async Gold evaluation after round {}: {}",
state.session_id,
turn_counter + 1,
error
);
}
turn_counter += 1;
if turn_counter >= config.max_rounds as u32 {
break;
}
}
drain_in_flight_task_evaluation(state).await;
apply_completed_task_evaluation(session, event_tx, config, state).await;
if state.task_evaluation.in_flight.is_none() {
if let Some(request) = state.task_evaluation.queued_request.take() {
let eval_provider = state
.auxiliary_models
.fast_model_provider
.clone()
.unwrap_or_else(|| llm.clone());
spawn_task_evaluation_request(state, event_tx, request, eval_provider);
drain_in_flight_task_evaluation(state).await;
apply_completed_task_evaluation(session, event_tx, config, state).await;
}
}
drain_in_flight_gold_evaluation(state).await;
apply_completed_gold_evaluation(session, config, state).await;
Ok(sent_complete)
}
fn heuristic_complexity(
tool_calls: &[bamboo_agent_core::tools::ToolCall],
) -> crate::runtime::complexity_classifier::TaskComplexity {
use crate::runtime::complexity_classifier::TaskComplexity;
let simple_tools = ["Read", "Glob", "Grep", "Bash"];
let complex_tools = ["Agent", "SubAgent", "TodoWrite"];
let names: Vec<&str> = tool_calls
.iter()
.map(|tc| tc.function.name.as_str())
.collect();
if names.iter().any(|n| complex_tools.contains(n)) {
return TaskComplexity::Complex;
}
if names.iter().all(|n| simple_tools.contains(n)) && !names.is_empty() {
return TaskComplexity::Simple;
}
TaskComplexity::Standard
}
#[cfg(test)]
mod tests {
use super::super::startup::OverflowRecoveryState;
use super::{
is_overflow_recoverable, is_terminal_child_status, map_turn_error_status,
maybe_spawn_guardian_review, maybe_suspend_for_orphaned_children, should_retry_turn_error,
};
use crate::runtime::config::{AgentLoopConfig, GuardianConfig, GuardianSpawner};
use crate::runtime::goal_state::{
ensure_goal_state, read_goal_state, write_goal_state, GoalDeclaredStatus, GoalRuntimeStatus,
};
use crate::runtime::guardian_state::{
ensure_guardian_state, read_guardian_state, write_guardian_state, GuardianPhase,
GuardianVerdict,
};
use crate::runtime::runner::state_bridge;
use bamboo_agent_core::storage::Storage;
use bamboo_agent_core::{AgentError, AgentEvent, Message, Session};
use bamboo_domain::AgentRuntimeState;
use bamboo_llm::{LLMChunk, LLMError, LLMProvider, LLMStream};
use bamboo_metrics::{
RoundStatus as MetricsRoundStatus, SessionStatus as MetricsSessionStatus,
TokenUsage as MetricsTokenUsage,
};
use futures::stream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
struct MockGuardianSpawner {
child_id: String,
}
#[async_trait::async_trait]
impl GuardianSpawner for MockGuardianSpawner {
async fn spawn_guardian_review(
&self,
_parent_session: &Session,
_review_prompt: String,
_model: String,
_disabled_tools: Option<std::collections::BTreeSet<String>>,
) -> Result<String, String> {
Ok(self.child_id.clone())
}
}
fn guardian_enabled_config(max_reviews: u32) -> AgentLoopConfig {
let spawner: Arc<dyn GuardianSpawner> = Arc::new(MockGuardianSpawner {
child_id: "guardian-child".to_string(),
});
AgentLoopConfig {
guardian_config: Some(GuardianConfig {
enabled: true,
model_name: Some("guardian-test-model".to_string()),
max_reviews,
}),
guardian_spawner: Some(spawner),
..Default::default()
}
}
#[tokio::test]
async fn guardian_gate_spawns_and_suspends_on_first_terminal() {
let mut session = Session::new("s1", "model");
let config = guardian_enabled_config(2);
let mut runtime_state = AgentRuntimeState::new("s1".to_string());
let outcome =
maybe_spawn_guardian_review(&mut session, &config, &None, &mut runtime_state, 1)
.await
.expect("guardian should engage a review and suspend");
assert!(outcome.should_break && !outcome.sent_complete);
assert!(runtime_state.waiting_for_children.is_some());
let guardian_state = read_guardian_state(&session).expect("guardian state persisted");
assert_eq!(guardian_state.phase, GuardianPhase::Pending);
assert_eq!(
guardian_state.guardian_child_id.as_deref(),
Some("guardian-child")
);
assert_eq!(guardian_state.review_count, 1);
}
#[tokio::test]
async fn guardian_gate_inert_without_config() {
let mut session = Session::new("s1", "model");
let config = AgentLoopConfig::default(); let mut runtime_state = AgentRuntimeState::new("s1".to_string());
assert!(
maybe_spawn_guardian_review(&mut session, &config, &None, &mut runtime_state, 1)
.await
.is_none()
);
assert!(runtime_state.waiting_for_children.is_none());
}
#[tokio::test]
async fn guardian_gate_skips_when_no_model_resolves() {
let spawner: Arc<dyn GuardianSpawner> = Arc::new(MockGuardianSpawner {
child_id: "guardian-child".to_string(),
});
let config = AgentLoopConfig {
guardian_config: Some(GuardianConfig {
enabled: true,
model_name: None,
max_reviews: 2,
}),
guardian_spawner: Some(spawner),
..Default::default()
};
let mut session = Session::new("s1", "model");
let mut runtime_state = AgentRuntimeState::new("s1".to_string());
assert!(
maybe_spawn_guardian_review(&mut session, &config, &None, &mut runtime_state, 1)
.await
.is_none()
);
assert!(runtime_state.waiting_for_children.is_none());
assert!(
read_guardian_state(&session).is_none(),
"no guardian review budget should be charged when skipped"
);
}
#[tokio::test]
async fn guardian_gate_completes_after_approval() {
let mut session = Session::new("s1", "model");
let mut guardian_state = ensure_guardian_state(&session);
guardian_state.record_spawn("guardian-child");
guardian_state.record_verdict(GuardianVerdict::approved(), 1);
write_guardian_state(&mut session, guardian_state);
let config = guardian_enabled_config(2);
let mut runtime_state = AgentRuntimeState::new("s1".to_string());
assert!(
maybe_spawn_guardian_review(&mut session, &config, &None, &mut runtime_state, 2)
.await
.is_none()
);
assert!(runtime_state.waiting_for_children.is_none());
}
#[tokio::test]
async fn guardian_gate_re_reviews_after_reject_then_completes_on_budget() {
let mut session = Session::new("s1", "model");
let mut guardian_state = ensure_guardian_state(&session);
guardian_state.record_spawn("guardian-child");
guardian_state.record_verdict(GuardianVerdict::rejected(vec!["bug".to_string()]), 1);
write_guardian_state(&mut session, guardian_state);
let config = guardian_enabled_config(2);
let mut runtime_state = AgentRuntimeState::new("s1".to_string());
let outcome =
maybe_spawn_guardian_review(&mut session, &config, &None, &mut runtime_state, 2)
.await
.expect("rejected within budget → re-review (suspend)");
assert!(outcome.should_break && !outcome.sent_complete);
let after = read_guardian_state(&session).expect("state persisted");
assert_eq!(after.review_count, 2, "second review spawned");
assert_eq!(after.phase, GuardianPhase::Pending);
let mut exhausted = ensure_guardian_state(&session);
exhausted.record_verdict(GuardianVerdict::rejected(vec!["still".to_string()]), 3);
write_guardian_state(&mut session, exhausted);
let mut runtime_state2 = AgentRuntimeState::new("s1".to_string());
assert!(
maybe_spawn_guardian_review(&mut session, &config, &None, &mut runtime_state2, 4)
.await
.is_none(),
"budget exhausted → allow completion despite unresolved findings"
);
}
struct StubProvider;
#[async_trait::async_trait]
impl LLMProvider for StubProvider {
async fn chat_stream(
&self,
_messages: &[Message],
_tools: &[bamboo_agent_core::tools::ToolSchema],
_max_output_tokens: Option<u32>,
_model: &str,
) -> Result<LLMStream, LLMError> {
Ok(Box::pin(stream::iter(vec![Ok(LLMChunk::Done)])))
}
}
struct ScriptedGoldProvider {
decision: &'static str,
confidence: &'static str,
}
#[async_trait::async_trait]
impl LLMProvider for ScriptedGoldProvider {
async fn chat_stream(
&self,
_messages: &[Message],
_tools: &[bamboo_agent_core::tools::ToolSchema],
_max_output_tokens: Option<u32>,
_model: &str,
) -> Result<LLMStream, LLMError> {
let arguments = format!(
r#"{{"decision":"{}","confidence":"{}","reasoning":"gate test"}}"#,
self.decision, self.confidence
);
let call = bamboo_agent_core::tools::ToolCall {
id: "gold-call-1".to_string(),
tool_type: "function".to_string(),
function: bamboo_agent_core::tools::FunctionCall {
name: "report_gold_evaluation".to_string(),
arguments,
},
};
Ok(Box::pin(stream::iter(vec![
Ok(LLMChunk::ToolCalls(vec![call])),
Ok(LLMChunk::Done),
])))
}
}
fn gold_continue_config() -> crate::runtime::config::AgentLoopConfig {
crate::runtime::config::AgentLoopConfig {
gold_config: Some(crate::runtime::config::GoldConfig {
enabled: true,
auto_continue_enabled: true,
goal: Some("finish the task".to_string()),
max_auto_continuations: 3,
..crate::runtime::config::GoldConfig::default()
}),
..crate::runtime::config::AgentLoopConfig::default()
}
}
fn round_usage() -> MetricsTokenUsage {
MetricsTokenUsage {
prompt_tokens: 1,
completion_tokens: 1,
total_tokens: 2,
}
}
#[tokio::test]
async fn no_tool_calls_does_not_complete_when_gold_continues() {
let mut session = Session::new("session-1", "model");
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let outcome = super::handle_no_tool_calls(
"tentative answer".to_string(),
None,
5,
5,
round_usage(),
&mut session,
&tx,
None,
"round-1",
"session-1",
&gold_continue_config(),
&None,
"model",
1,
Arc::new(ScriptedGoldProvider {
decision: "continue",
confidence: "high",
}),
)
.await;
assert!(!outcome.should_break);
assert!(!outcome.sent_complete);
assert_eq!(session.messages.len(), 2);
let last = session.messages.last().unwrap();
assert!(matches!(last.role, bamboo_agent_core::Role::User));
let metadata = last.metadata.as_ref().expect("runtime metadata");
assert_eq!(
metadata.get("runtime_kind").and_then(|v| v.as_str()),
Some("goal_continue")
);
drop(tx);
let mut saw_complete = false;
while let Some(event) = rx.recv().await {
if matches!(event, AgentEvent::Complete { .. }) {
saw_complete = true;
}
}
assert!(
!saw_complete,
"Complete must not be emitted on gold continue"
);
}
#[tokio::test]
async fn no_tool_calls_completes_when_gold_achieved() {
let mut session = Session::new("session-1", "model");
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let outcome = super::handle_no_tool_calls(
"final answer".to_string(),
None,
5,
5,
round_usage(),
&mut session,
&tx,
None,
"round-1",
"session-1",
&gold_continue_config(),
&None,
"model",
1,
Arc::new(ScriptedGoldProvider {
decision: "achieved",
confidence: "high",
}),
)
.await;
assert!(outcome.should_break);
assert!(outcome.sent_complete);
assert_eq!(session.messages.len(), 1);
drop(tx);
let mut saw_complete = false;
while let Some(event) = rx.recv().await {
if matches!(event, AgentEvent::Complete { .. }) {
saw_complete = true;
}
}
assert!(
saw_complete,
"Complete must be emitted when gold is achieved"
);
}
#[tokio::test]
async fn e2e_goal_loop_continue_then_declare_then_complete() {
let mut session = Session::new("session-e2e", "model");
let config = gold_continue_config();
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
let r1 = super::handle_no_tool_calls(
"I think that's everything.".to_string(),
None,
5,
5,
round_usage(),
&mut session,
&tx,
None,
"round-1",
"session-e2e",
&config,
&None,
"model",
1,
Arc::new(ScriptedGoldProvider {
decision: "continue",
confidence: "high",
}),
)
.await;
assert!(!r1.should_break, "undeclared + continue → keep working");
assert!(!r1.sent_complete);
let st = read_goal_state(&session).expect("goal state persisted after round 1");
assert_eq!(st.continuation_count, 1);
assert_eq!(st.status, GoalRuntimeStatus::Active);
assert_eq!(st.eval_history.len(), 1);
assert!(session
.messages
.last()
.unwrap()
.content
.contains("update_goal"));
let mut st = ensure_goal_state(&session, "finish the task");
st.declare(GoalDeclaredStatus::Complete, 2);
write_goal_state(&mut session, st);
let r2 = super::handle_no_tool_calls(
"Done — shipped and verified.".to_string(),
None,
5,
5,
round_usage(),
&mut session,
&tx,
None,
"round-2",
"session-e2e",
&config,
&None,
"model",
2,
Arc::new(ScriptedGoldProvider {
decision: "achieved",
confidence: "high",
}),
)
.await;
assert!(r2.should_break, "declared complete + achieved → stop");
assert!(r2.sent_complete);
let st = read_goal_state(&session).expect("goal state persisted after round 2");
assert_eq!(st.status, GoalRuntimeStatus::Complete);
assert_eq!(st.declared_status, None, "declaration cleared after acting");
assert_eq!(st.eval_history.len(), 2, "both double-checks persisted");
drop(tx);
let mut completes = 0;
while let Some(event) = rx.recv().await {
if matches!(event, AgentEvent::Complete { .. }) {
completes += 1;
}
}
assert_eq!(
completes, 1,
"exactly one terminal Complete across the whole loop"
);
}
#[tokio::test]
async fn e2e_goal_loop_double_check_vetoes_premature_complete() {
let mut session = Session::new("session-e2e2", "model");
let config = gold_continue_config();
let (tx, _rx) = tokio::sync::mpsc::channel(16);
let mut st = ensure_goal_state(&session, "finish the task");
st.declare(GoalDeclaredStatus::Complete, 1);
write_goal_state(&mut session, st);
let outcome = super::handle_no_tool_calls(
"All done!".to_string(),
None,
5,
5,
round_usage(),
&mut session,
&tx,
None,
"round-1",
"session-e2e2",
&config,
&None,
"model",
1,
Arc::new(ScriptedGoldProvider {
decision: "continue",
confidence: "high",
}),
)
.await;
assert!(!outcome.should_break, "premature completion vetoed");
assert!(!outcome.sent_complete);
let st = read_goal_state(&session).expect("goal state persisted");
assert_eq!(st.status, GoalRuntimeStatus::Active);
assert_eq!(
st.declared_status, None,
"stale declaration cleared on veto"
);
assert_eq!(st.continuation_count, 1);
}
struct GoalLoopE2eProvider {
main_calls: std::sync::atomic::AtomicUsize,
}
#[async_trait::async_trait]
impl LLMProvider for GoalLoopE2eProvider {
async fn chat_stream(
&self,
_messages: &[Message],
_tools: &[bamboo_agent_core::tools::ToolSchema],
_max_output_tokens: Option<u32>,
_model: &str,
) -> Result<LLMStream, LLMError> {
Ok(Box::pin(stream::iter(vec![Ok(LLMChunk::Done)])))
}
async fn chat_stream_with_options(
&self,
_messages: &[Message],
_tools: &[bamboo_agent_core::tools::ToolSchema],
_max_output_tokens: Option<u32>,
_model: &str,
options: Option<&bamboo_llm::LLMRequestOptions>,
) -> Result<LLMStream, LLMError> {
let purpose = options
.and_then(|o| o.request_purpose.as_deref())
.unwrap_or("agent_loop");
if purpose == "gold_evaluation" {
let call = bamboo_agent_core::tools::ToolCall {
id: "gold-1".to_string(),
tool_type: "function".to_string(),
function: bamboo_agent_core::tools::FunctionCall {
name: "report_gold_evaluation".to_string(),
arguments: r#"{"decision":"achieved","confidence":"high","reasoning":"objective verified"}"#.to_string(),
},
};
return Ok(Box::pin(stream::iter(vec![
Ok(LLMChunk::ToolCalls(vec![call])),
Ok(LLMChunk::Done),
])));
}
let n = self
.main_calls
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if n == 0 {
let call = bamboo_agent_core::tools::ToolCall {
id: "ug-1".to_string(),
tool_type: "function".to_string(),
function: bamboo_agent_core::tools::FunctionCall {
name: "update_goal".to_string(),
arguments: r#"{"status":"complete"}"#.to_string(),
},
};
Ok(Box::pin(stream::iter(vec![
Ok(LLMChunk::ToolCalls(vec![call])),
Ok(LLMChunk::Done),
])))
} else {
Ok(Box::pin(stream::iter(vec![
Ok(LLMChunk::Token("Done — shipped and verified.".to_string())),
Ok(LLMChunk::Done),
])))
}
}
}
fn e2e_loop_state(
session_id: &str,
) -> crate::runtime::runner::loop_execution::startup::LoopRunState {
use crate::runtime::runner::loop_execution::startup::{
GoldEvaluationState, LoopRunState, OverflowRecoveryState, TaskEvaluationState,
};
LoopRunState {
session_id: session_id.to_string(),
model_name: "model".to_string(),
metrics_collector: None,
debug_logger: crate::runtime::runner::logging::DebugLogger::new(false),
task_context: None,
overflow_recovery: OverflowRecoveryState::default(),
task_evaluation: TaskEvaluationState::default(),
gold_evaluation: GoldEvaluationState {
in_flight: None,
completed: None,
queued_request: None,
},
auxiliary_models: crate::runtime::config::AuxiliaryModelConfig::default(),
runtime_state: AgentRuntimeState::new(session_id),
}
}
#[tokio::test]
async fn e2e_full_loop_update_goal_tool_then_double_check_completes() {
use crate::runtime::config::PromptMemoryFlags;
let mut session = Session::new("session-full-e2e", "model");
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
let llm: Arc<dyn LLMProvider> = Arc::new(GoalLoopE2eProvider {
main_calls: std::sync::atomic::AtomicUsize::new(0),
});
let tools: Arc<dyn bamboo_agent_core::tools::ToolExecutor> =
Arc::new(bamboo_tools::BuiltinToolExecutor::new());
let config = AgentLoopConfig {
gold_config: Some(crate::runtime::config::GoldConfig {
enabled: true,
auto_continue_enabled: true,
goal: Some("ship it".to_string()),
max_auto_continuations: 3,
..crate::runtime::config::GoldConfig::default()
}),
prompt_memory_flags: PromptMemoryFlags {
project_prompt_injection: false,
relevant_recall: false,
relevant_recall_rerank: false,
project_first_dream: false,
},
model_name: Some("model".to_string()),
max_rounds: 5,
..AgentLoopConfig::default()
};
let mut state = e2e_loop_state("session-full-e2e");
let cancel = tokio_util::sync::CancellationToken::new();
let sent_complete =
super::run_pipeline(&mut session, &tx, llm, tools, &cancel, &config, &mut state)
.await
.expect("pipeline runs to completion");
assert!(sent_complete, "the run emits a terminal Complete");
let goal_state = read_goal_state(&session).expect("goal state persisted");
assert_eq!(goal_state.status, GoalRuntimeStatus::Complete);
assert_eq!(
goal_state.declared_status, None,
"declaration cleared after the terminal gate acted"
);
assert!(
!goal_state.eval_history.is_empty(),
"the double-check verdict was persisted into the goal's eval trail"
);
drop(tx);
let mut completes = 0;
while let Some(event) = rx.recv().await {
if matches!(event, AgentEvent::Complete { .. }) {
completes += 1;
}
}
assert_eq!(completes, 1, "exactly one terminal Complete");
}
#[derive(Default)]
struct TestStorage {
sessions: RwLock<HashMap<String, Session>>,
}
#[async_trait::async_trait]
impl Storage for TestStorage {
async fn save_session(&self, session: &Session) -> std::io::Result<()> {
self.sessions
.write()
.await
.insert(session.id.clone(), session.clone());
Ok(())
}
async fn load_session(&self, session_id: &str) -> std::io::Result<Option<Session>> {
Ok(self.sessions.read().await.get(session_id).cloned())
}
async fn delete_session(&self, session_id: &str) -> std::io::Result<bool> {
Ok(self.sessions.write().await.remove(session_id).is_some())
}
}
struct TestPersistence(Arc<dyn Storage>);
#[async_trait::async_trait]
impl bamboo_domain::RuntimeSessionPersistence for TestPersistence {
async fn save_runtime_session(&self, session: &mut Session) -> std::io::Result<()> {
self.0.save_session(session).await
}
}
#[tokio::test]
async fn pending_injected_messages_are_merged_once_and_cleared_from_storage() {
let storage: Arc<dyn Storage> = Arc::new(TestStorage::default());
let persistence: Arc<dyn bamboo_domain::RuntimeSessionPersistence> =
Arc::new(TestPersistence(storage.clone()));
let mut persisted = Session::new_child("child-merge", "parent", "model", "Child");
persisted.add_message(Message::system("system"));
persisted.add_message(Message::user("original task"));
persisted.metadata.insert(
"pending_injected_messages".to_string(),
serde_json::json!([
{
"content": "queued correction",
"created_at": chrono::Utc::now(),
}
])
.to_string(),
);
storage
.save_session(&persisted)
.await
.expect("persisted child should be saved");
let mut running = persisted.clone();
running.metadata.remove("pending_injected_messages");
state_bridge::merge_pending_injected_messages(
&mut running,
Some(&storage),
Some(&persistence),
)
.await;
assert_eq!(
running
.messages
.last()
.map(|message| message.content.as_str()),
Some("queued correction")
);
assert!(!running.metadata.contains_key("pending_injected_messages"));
let saved = storage
.load_session("child-merge")
.await
.expect("load should succeed")
.expect("session should exist");
assert!(!saved.metadata.contains_key("pending_injected_messages"));
let count_after_first_merge = running.messages.len();
state_bridge::merge_pending_injected_messages(
&mut running,
Some(&storage),
Some(&persistence),
)
.await;
assert_eq!(running.messages.len(), count_after_first_merge);
}
#[test]
fn retries_transient_llm_errors() {
assert!(should_retry_turn_error(&AgentError::LLM(
"HTTP error: timeout while connecting".to_string(),
)));
assert!(should_retry_turn_error(&AgentError::LLM(
"API error: HTTP 503: Service Unavailable".to_string(),
)));
assert!(should_retry_turn_error(&AgentError::LLM(
"empty assistant response".to_string(),
)));
}
#[test]
fn retries_reqwest_transport_errors() {
assert!(should_retry_turn_error(&AgentError::LLM(
"HTTP error: error sending request for url (https://api.githubcopilot.com/chat/completions)".to_string(),
)));
}
#[test]
fn retries_stream_decode_transport_errors() {
assert!(should_retry_turn_error(&AgentError::LLM(
"Stream error: Transport error: error decoding response body".to_string(),
)));
}
#[test]
fn retries_unknown_llm_errors_by_default() {
assert!(should_retry_turn_error(&AgentError::LLM(
"some completely unknown error".to_string(),
)));
}
#[test]
fn does_not_retry_non_retryable_llm_errors() {
assert!(!should_retry_turn_error(&AgentError::LLM(
"Authentication error: Invalid API key".to_string(),
)));
assert!(!should_retry_turn_error(&AgentError::LLM(
"API error: HTTP 400: invalid request".to_string(),
)));
}
#[test]
fn does_not_retry_non_llm_errors() {
assert!(!should_retry_turn_error(&AgentError::Cancelled));
assert!(!should_retry_turn_error(&AgentError::Tool(
"tool failed".to_string(),
)));
assert!(!should_retry_turn_error(&AgentError::Budget(
"budget exceeded".to_string(),
)));
}
#[test]
fn does_not_retry_empty_llm_error() {
assert!(!should_retry_turn_error(&AgentError::LLM("".to_string())));
assert!(!should_retry_turn_error(&AgentError::LLM(
" ".to_string()
)));
}
#[test]
fn overflow_errors_use_dedicated_recovery_path() {
assert!(is_overflow_recoverable(&AgentError::LLMOverflow(
"prompt too long".to_string(),
)));
assert!(!is_overflow_recoverable(&AgentError::LLM(
"timeout while connecting".to_string(),
)));
assert!(!should_retry_turn_error(&AgentError::LLMOverflow(
"maximum context length exceeded".to_string(),
)));
}
#[test]
fn overflow_recovery_state_opens_circuit_breaker_after_threshold() {
let mut state = OverflowRecoveryState::default();
assert!(state.can_attempt_recovery());
state.record_recovery(0);
state.record_recovery(1);
state.record_recovery(2);
assert!(!state.can_attempt_recovery());
}
#[test]
fn test_map_turn_error_status_cancelled() {
let error = AgentError::Cancelled;
let (round_status, session_status) = map_turn_error_status(&error);
assert_eq!(round_status, MetricsRoundStatus::Cancelled);
assert_eq!(session_status, MetricsSessionStatus::Cancelled);
}
#[test]
fn test_map_turn_error_status_tool_error() {
let error = AgentError::Tool("Tool failed".to_string());
let (round_status, session_status) = map_turn_error_status(&error);
assert_eq!(round_status, MetricsRoundStatus::Error);
assert_eq!(session_status, MetricsSessionStatus::Error);
}
#[test]
fn test_map_turn_error_status_llm_error() {
let error = AgentError::LLM("LLM provider error".to_string());
let (round_status, session_status) = map_turn_error_status(&error);
assert_eq!(round_status, MetricsRoundStatus::Error);
assert_eq!(session_status, MetricsSessionStatus::Error);
}
#[test]
fn test_map_turn_error_status_session_not_found() {
let error = AgentError::SessionNotFound("session-123".to_string());
let (round_status, session_status) = map_turn_error_status(&error);
assert_eq!(round_status, MetricsRoundStatus::Error);
assert_eq!(session_status, MetricsSessionStatus::Error);
}
#[test]
fn test_map_turn_error_status_budget_error() {
let error = AgentError::Budget("Budget exceeded".to_string());
let (round_status, session_status) = map_turn_error_status(&error);
assert_eq!(round_status, MetricsRoundStatus::Error);
assert_eq!(session_status, MetricsSessionStatus::Error);
}
#[test]
fn test_map_turn_error_status_cancelled_is_distinct() {
let cancelled_error = AgentError::Cancelled;
let other_error = AgentError::Tool("Tool error".to_string());
let (cancelled_round, cancelled_session) = map_turn_error_status(&cancelled_error);
let (other_round, other_session) = map_turn_error_status(&other_error);
assert_ne!(cancelled_round, other_round);
assert_ne!(cancelled_session, other_session);
}
#[test]
fn test_map_turn_error_only_cancelled_gets_cancelled_status() {
let errors = vec![
AgentError::LLM("error".to_string()),
AgentError::Tool("error".to_string()),
AgentError::SessionNotFound("id".to_string()),
AgentError::Budget("error".to_string()),
];
for error in errors {
let (round_status, session_status) = map_turn_error_status(&error);
assert_eq!(round_status, MetricsRoundStatus::Error);
assert_eq!(session_status, MetricsSessionStatus::Error);
}
let (round_status, session_status) = map_turn_error_status(&AgentError::Cancelled);
assert_eq!(round_status, MetricsRoundStatus::Cancelled);
assert_eq!(session_status, MetricsSessionStatus::Cancelled);
}
#[tokio::test]
async fn handle_no_tool_calls_emits_complete_and_appends_assistant_message() {
let mut session = Session::new("session-1", "model");
let (tx, mut rx) = tokio::sync::mpsc::channel(4);
let outcome = super::handle_no_tool_calls(
"final answer".to_string(),
Some("reasoning trace".to_string()),
11,
7,
MetricsTokenUsage {
prompt_tokens: 11,
completion_tokens: 7,
total_tokens: 18,
},
&mut session,
&tx,
None,
"round-1",
"session-1",
&crate::runtime::config::AgentLoopConfig::default(),
&None,
"model",
1,
Arc::new(StubProvider),
)
.await;
assert!(outcome.should_break);
assert!(outcome.sent_complete);
assert_eq!(session.messages.len(), 1);
assert!(matches!(
session.messages[0].role,
bamboo_agent_core::Role::Assistant
));
assert_eq!(session.messages[0].content, "final answer");
assert_eq!(
session.messages[0].reasoning.as_deref(),
Some("reasoning trace")
);
let event = rx.recv().await.expect("complete event should be sent");
match event {
AgentEvent::Complete { usage } => {
assert_eq!(usage.prompt_tokens, 11);
assert_eq!(usage.completion_tokens, 7);
assert_eq!(usage.total_tokens, 18);
}
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn apply_completed_task_evaluation_updates_task_list_and_emits_event() {
let storage: Arc<dyn Storage> = Arc::new(TestStorage::default());
let persistence: Arc<dyn bamboo_domain::RuntimeSessionPersistence> =
Arc::new(TestPersistence(storage.clone()));
let mut session = Session::new("session-task-eval", "model");
session.set_task_list(bamboo_domain::TaskList {
session_id: "session-task-eval".to_string(),
title: "Eval Tasks".to_string(),
items: vec![bamboo_domain::TaskItem {
id: "task-1".to_string(),
description: "Do work".to_string(),
status: bamboo_domain::TaskItemStatus::InProgress,
..bamboo_domain::TaskItem::default()
}],
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
});
session
.metadata
.insert("task_list_version".to_string(), "1".to_string());
let mut state = super::super::startup::LoopRunState {
session_id: "session-task-eval".to_string(),
model_name: "model".to_string(),
metrics_collector: None,
debug_logger: crate::runtime::runner::logging::DebugLogger::new(false),
task_context: crate::runtime::task_context::TaskLoopContext::from_session(&session),
overflow_recovery: super::super::startup::OverflowRecoveryState::default(),
task_evaluation: super::super::startup::TaskEvaluationState {
in_flight: None,
completed: Some(
crate::runtime::runner::task_lifecycle::AsyncTaskEvaluationResult {
shared_session_id: "session-task-eval".to_string(),
round_number: 1,
based_on_task_context_version: 1,
task_list_title: Some("Eval Tasks".to_string()),
model_name: "fast-model".to_string(),
evaluation_result: crate::runtime::task_evaluation::TaskEvaluationResult {
needs_evaluation: true,
updates: vec![crate::runtime::task_evaluation::TaskItemUpdate {
item_id: "task-1".to_string(),
status: bamboo_domain::TaskItemStatus::Completed,
notes: Some("done".to_string()),
evidence: Some("verified".to_string()),
blocker: None,
criteria_met: None,
}],
reasoning: "complete".to_string(),
prompt_tokens: 4,
completion_tokens: 2,
},
},
),
queued_request: None,
},
gold_evaluation: super::super::startup::GoldEvaluationState::default(),
auxiliary_models: crate::runtime::config::AuxiliaryModelConfig::default(),
runtime_state: AgentRuntimeState::new("session-task-eval"),
};
let config = crate::runtime::config::AgentLoopConfig {
storage: Some(storage.clone()),
persistence: Some(persistence),
..Default::default()
};
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
super::apply_completed_task_evaluation(&mut session, &tx, &config, &mut state).await;
assert_eq!(
session.task_list.as_ref().unwrap().items[0].status,
bamboo_domain::TaskItemStatus::Completed
);
let event = rx
.recv()
.await
.expect("task update event should be emitted");
match event {
AgentEvent::TaskListUpdated { task_list } => {
assert_eq!(
task_list.items[0].status,
bamboo_domain::TaskItemStatus::Completed
);
}
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn test_build_round_id() {
let id = format!("{}-round-{}", "session-123", 1);
assert_eq!(id, "session-123-round-1");
let id = format!("{}-round-{}", "test", 4 + 1);
assert_eq!(id, "test-round-5");
}
#[tokio::test]
async fn ensure_not_cancelled_returns_ok_when_not_cancelled() {
let token = tokio_util::sync::CancellationToken::new();
assert!(!token.is_cancelled());
}
#[tokio::test]
async fn ensure_not_cancelled_returns_error_when_cancelled() {
let token = tokio_util::sync::CancellationToken::new();
token.cancel();
assert!(token.is_cancelled());
}
#[test]
fn accumulate_round_usage_saturates_components_and_recomputes_total() {
let mut usage = MetricsTokenUsage {
prompt_tokens: u64::MAX - 5,
completion_tokens: u64::MAX - 9,
total_tokens: 0,
};
let delta = MetricsTokenUsage {
prompt_tokens: 10,
completion_tokens: 20,
total_tokens: 30,
};
usage.prompt_tokens = usage.prompt_tokens.saturating_add(delta.prompt_tokens);
usage.completion_tokens = usage
.completion_tokens
.saturating_add(delta.completion_tokens);
usage.recompute_total();
assert_eq!(usage.prompt_tokens, u64::MAX);
assert_eq!(usage.completion_tokens, u64::MAX);
assert_eq!(usage.total_tokens, u64::MAX);
}
#[test]
fn is_terminal_child_status_classifies_correctly() {
for s in ["completed", "error", "timeout", "cancelled", "skipped"] {
assert!(is_terminal_child_status(s), "{s} should be terminal");
}
for s in ["running", "pending", "queued", ""] {
assert!(!is_terminal_child_status(s), "{s} should be active");
}
}
struct ChildIndexStorage {
inner: Arc<TestStorage>,
children: Vec<(String, Option<String>)>,
}
#[async_trait::async_trait]
impl Storage for ChildIndexStorage {
async fn save_session(&self, session: &Session) -> std::io::Result<()> {
self.inner.save_session(session).await
}
async fn load_session(&self, id: &str) -> std::io::Result<Option<Session>> {
self.inner.load_session(id).await
}
async fn delete_session(&self, id: &str) -> std::io::Result<bool> {
self.inner.delete_session(id).await
}
async fn list_child_run_statuses(
&self,
_parent: &str,
) -> std::io::Result<Vec<(String, Option<String>)>> {
Ok(self.children.clone())
}
}
fn config_with_storage(storage: Arc<dyn Storage>) -> AgentLoopConfig {
let persistence: Arc<dyn bamboo_domain::RuntimeSessionPersistence> =
Arc::new(TestPersistence(storage.clone()));
AgentLoopConfig {
storage: Some(storage),
persistence: Some(persistence),
..AgentLoopConfig::default()
}
}
#[tokio::test]
async fn safety_net_suspends_on_orphaned_active_children() {
let inner = Arc::new(TestStorage::default());
let storage: Arc<dyn Storage> = Arc::new(ChildIndexStorage {
inner: inner.clone(),
children: vec![
("c-run".into(), Some("running".into())),
("c-pend".into(), None),
("c-done".into(), Some("completed".into())),
],
});
let config = config_with_storage(storage.clone());
let mut session = Session::new("parent-orphan", "model");
let mut runtime_state = AgentRuntimeState::new("parent-orphan");
let outcome =
maybe_suspend_for_orphaned_children(&mut session, &config, &mut runtime_state)
.await
.expect("must suspend when active children remain");
assert!(outcome.should_break && !outcome.sent_complete);
let wait = runtime_state
.waiting_for_children
.expect("durable wait registered");
assert_eq!(
wait.child_session_ids,
vec!["c-pend".to_string(), "c-run".to_string()]
);
assert_eq!(
session
.metadata
.get("runtime.suspend_reason")
.map(String::as_str),
Some("waiting_for_children")
);
let persisted = storage
.load_session("parent-orphan")
.await
.unwrap()
.unwrap();
assert!(persisted
.agent_runtime_state
.and_then(|s| s.waiting_for_children)
.is_some());
}
#[tokio::test]
async fn safety_net_noop_when_all_children_terminal() {
let inner = Arc::new(TestStorage::default());
let storage: Arc<dyn Storage> = Arc::new(ChildIndexStorage {
inner,
children: vec![
("a".into(), Some("completed".into())),
("b".into(), Some("error".into())),
],
});
let config = config_with_storage(storage);
let mut session = Session::new("parent-done", "model");
let mut runtime_state = AgentRuntimeState::new("parent-done");
assert!(
maybe_suspend_for_orphaned_children(&mut session, &config, &mut runtime_state)
.await
.is_none(),
"no active children → must not suspend"
);
assert!(runtime_state.waiting_for_children.is_none());
}
#[tokio::test]
async fn safety_net_noop_when_already_waiting() {
let storage: Arc<dyn Storage> = Arc::new(ChildIndexStorage {
inner: Arc::new(TestStorage::default()),
children: vec![("x".into(), Some("running".into()))],
});
let config = config_with_storage(storage);
let mut session = Session::new("parent-waiting", "model");
let mut runtime_state = AgentRuntimeState::new("parent-waiting");
runtime_state.waiting_for_children = Some(super::WaitingForChildrenState {
child_session_ids: vec!["x".into()],
wait_for: super::ChildWaitPolicy::All,
registered_at: chrono::Utc::now(),
timeout_at: None,
registered_by_tool_call_id: None,
});
assert!(
maybe_suspend_for_orphaned_children(&mut session, &config, &mut runtime_state)
.await
.is_none()
);
}
}