bamboo_engine/runtime/task_evaluation/
executor.rs1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4
5use bamboo_agent_core::{AgentError, AgentEvent, Session};
6use bamboo_domain::ReasoningEffort;
7use bamboo_domain::TaskItemStatus;
8use bamboo_infrastructure::{LLMProvider, LLMRequestOptions};
9
10use super::super::task_context::TaskLoopContext;
11use super::message_builder::build_task_evaluation_messages;
12use super::schema::get_task_evaluation_tools;
13use super::token_estimation::estimate_prompt_tokens;
14use super::TaskEvaluationResult;
15
16mod outcomes;
17
18fn has_tool_activity(ctx: &TaskLoopContext) -> bool {
19 ctx.items.iter().any(|item| !item.tool_calls.is_empty())
20}
21
22fn skipped_evaluation(reasoning: &str) -> TaskEvaluationResult {
23 TaskEvaluationResult {
24 needs_evaluation: false,
25 updates: Vec::new(),
26 reasoning: reasoning.to_string(),
27 prompt_tokens: 0,
28 completion_tokens: 0,
29 }
30}
31
32fn normalize_lightweight_reasoning_effort(
33 reasoning_effort: Option<ReasoningEffort>,
34) -> Option<ReasoningEffort> {
35 reasoning_effort.map(|effort| match effort {
36 ReasoningEffort::Xhigh | ReasoningEffort::Max => ReasoningEffort::High,
37 other => other,
38 })
39}
40
41pub async fn evaluate_task_progress(
43 ctx: &TaskLoopContext,
44 session: &Session,
45 llm: Arc<dyn LLMProvider>,
46 event_tx: &mpsc::Sender<AgentEvent>,
47 session_id: &str,
48 model: &str,
49 reasoning_effort: Option<ReasoningEffort>,
50) -> Result<TaskEvaluationResult, AgentError> {
51 use crate::runtime::stream::handler::consume_llm_stream_silent;
52
53 let in_progress_count = ctx
54 .items
55 .iter()
56 .filter(|item| matches!(item.status, TaskItemStatus::InProgress))
57 .count();
58
59 if in_progress_count == 0 {
60 return Ok(skipped_evaluation("No in-progress tasks to evaluate"));
61 }
62
63 if !has_tool_activity(ctx) {
64 return Ok(skipped_evaluation(
65 "No tool executions yet; skipping task evaluation.",
66 ));
67 }
68
69 tracing::info!(
70 "[{}] Evaluating {} in-progress task items",
71 session_id,
72 in_progress_count
73 );
74
75 let _ = event_tx
76 .send(AgentEvent::TaskEvaluationStarted {
77 session_id: session_id.to_string(),
78 items_count: in_progress_count,
79 })
80 .await;
81
82 let messages = build_task_evaluation_messages(ctx, session);
83 let prompt_tokens = estimate_prompt_tokens(&messages);
84 let tools = get_task_evaluation_tools();
85
86 tracing::debug!("[{}] Task evaluation using model: {}", session_id, model);
88
89 let request_reasoning_effort = normalize_lightweight_reasoning_effort(reasoning_effort);
90 if request_reasoning_effort != reasoning_effort {
91 tracing::debug!(
92 "[{}] Task evaluation downgraded reasoning effort from {:?} to {:?} for lightweight request",
93 session_id,
94 reasoning_effort,
95 request_reasoning_effort
96 );
97 }
98
99 let request_options = LLMRequestOptions {
100 session_id: Some(session_id.to_string()),
101 reasoning_effort: request_reasoning_effort,
102 parallel_tool_calls: None,
103 responses: None,
104 };
105 match llm
106 .chat_stream_with_options(&messages, &tools, Some(500), model, Some(&request_options))
107 .await
108 {
109 Ok(stream) => {
110 let stream_output = consume_llm_stream_silent(
111 stream,
112 &tokio_util::sync::CancellationToken::new(),
113 session_id,
114 )
115 .await
116 .map_err(|error| AgentError::LLM(error.to_string()))?;
117
118 Ok(
119 outcomes::build_success_result(stream_output, event_tx, session_id, prompt_tokens)
120 .await,
121 )
122 }
123 Err(error) => {
124 tracing::warn!("[{}] Task evaluation failed: {}", session_id, error);
125 Ok(skipped_evaluation(&format!("Evaluation failed: {}", error)))
126 }
127 }
128}