bamboo_engine/runtime/task_evaluation/
executor.rs1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4
5use bamboo_agent_core::{AgentError, AgentEvent, Session};
6use bamboo_domain::ReasoningEffort;
7use bamboo_domain::TaskItemStatus;
8use bamboo_infrastructure::{LLMProvider, LLMRequestOptions};
9
10use super::super::task_context::TaskLoopContext;
11use super::message_builder::build_task_evaluation_messages;
12use super::schema::get_task_evaluation_tools;
13use super::token_estimation::estimate_prompt_tokens;
14use super::TaskEvaluationResult;
15
16mod outcomes;
17
18fn has_tool_activity(ctx: &TaskLoopContext) -> bool {
19 ctx.items.iter().any(|item| !item.tool_calls.is_empty())
20}
21
22fn skipped_evaluation(reasoning: &str) -> TaskEvaluationResult {
23 TaskEvaluationResult {
24 needs_evaluation: false,
25 updates: Vec::new(),
26 reasoning: reasoning.to_string(),
27 prompt_tokens: 0,
28 completion_tokens: 0,
29 }
30}
31
32fn normalize_lightweight_reasoning_effort(
33 reasoning_effort: Option<ReasoningEffort>,
34) -> Option<ReasoningEffort> {
35 reasoning_effort.map(|effort| match effort {
36 ReasoningEffort::Xhigh | ReasoningEffort::Max => ReasoningEffort::High,
37 other => other,
38 })
39}
40
41pub async fn evaluate_task_progress(
43 ctx: &TaskLoopContext,
44 session: &Session,
45 llm: Arc<dyn LLMProvider>,
46 event_tx: &mpsc::Sender<AgentEvent>,
47 session_id: &str,
48 model: &str,
49 reasoning_effort: Option<ReasoningEffort>,
50) -> Result<TaskEvaluationResult, AgentError> {
51 use crate::runtime::stream::handler::consume_llm_stream_silent;
52
53 let in_progress_count = ctx
54 .items
55 .iter()
56 .filter(|item| matches!(item.status, TaskItemStatus::InProgress))
57 .count();
58
59 if in_progress_count == 0 {
60 return Ok(skipped_evaluation("No in-progress tasks to evaluate"));
61 }
62
63 let is_plan_mode = session
64 .agent_runtime_state
65 .as_ref()
66 .and_then(|s| s.plan_mode.as_ref())
67 .is_some();
68
69 if !is_plan_mode && !has_tool_activity(ctx) {
70 return Ok(skipped_evaluation(
71 "No tool executions yet; skipping task evaluation.",
72 ));
73 }
74
75 tracing::info!(
76 "[{}] Evaluating {} in-progress task items",
77 session_id,
78 in_progress_count
79 );
80
81 let _ = event_tx
82 .send(AgentEvent::TaskEvaluationStarted {
83 session_id: session_id.to_string(),
84 items_count: in_progress_count,
85 })
86 .await;
87
88 let messages = build_task_evaluation_messages(ctx, session);
89 let prompt_tokens = estimate_prompt_tokens(&messages);
90 let tools = get_task_evaluation_tools();
91
92 tracing::debug!("[{}] Task evaluation using model: {}", session_id, model);
94
95 let request_reasoning_effort = normalize_lightweight_reasoning_effort(reasoning_effort);
96 if request_reasoning_effort != reasoning_effort {
97 tracing::debug!(
98 "[{}] Task evaluation downgraded reasoning effort from {:?} to {:?} for lightweight request",
99 session_id,
100 reasoning_effort,
101 request_reasoning_effort
102 );
103 }
104
105 let request_options = LLMRequestOptions {
106 session_id: Some(session_id.to_string()),
107 reasoning_effort: request_reasoning_effort,
108 parallel_tool_calls: None,
109 responses: None,
110 };
111 match llm
112 .chat_stream_with_options(&messages, &tools, Some(500), model, Some(&request_options))
113 .await
114 {
115 Ok(stream) => {
116 let stream_output = consume_llm_stream_silent(
117 stream,
118 &tokio_util::sync::CancellationToken::new(),
119 session_id,
120 )
121 .await
122 .map_err(|error| AgentError::LLM(error.to_string()))?;
123
124 Ok(
125 outcomes::build_success_result(stream_output, event_tx, session_id, prompt_tokens)
126 .await,
127 )
128 }
129 Err(error) => {
130 tracing::warn!("[{}] Task evaluation failed: {}", session_id, error);
131 Ok(skipped_evaluation(&format!("Evaluation failed: {}", error)))
132 }
133 }
134}