use crate::agent::config::AgentRole;
use crate::agent::react::{ReactAgent, StepType};
use crate::error::{AgentError, ReactError};
use crate::llm::types::Message;
use crate::tasks::executor::{TaskContext, TaskExecuteFn, TaskExecutor, TaskExecutorConfig};
use std::sync::Arc;
use tracing::{debug, info, warn};
impl ReactAgent {
pub async fn execute_with_planning(&self, task: &str) -> crate::error::Result<String> {
let agent = self.config.agent_name.clone();
self.reset_messages().await;
self.tools.task_manager.clear();
info!(agent = %agent, "🎯 Starting task planning mode");
info!(agent = %agent, task = %task, "📋 User task");
if !self.has_planning_tools() {
warn!(
agent = %agent,
"⚠️ Current agent does not have planning enabled or lacks complete planning toolset, automatically falling back to normal execution mode"
);
return self.run_direct(task).await;
}
info!(agent = %agent, phase = "planning", "📐 Phase 1: Creating plan");
self.plan_tasks(task).await?;
if self.tools.task_manager.get_all_tasks().is_empty() {
warn!(
agent = %agent,
"⚠️ No tasks created during planning phase, automatically falling back to normal execution mode"
);
return self.run_direct(task).await;
}
info!(agent = %agent, phase = "execution", "🚀 Phase 2: Executing tasks in parallel");
let config = TaskExecutorConfig {
max_concurrent: 5,
default_timeout_secs: self.config.max_iterations as u64 * 30,
enable_hooks: true,
..Default::default()
};
let execute_fn = self.build_execute_fn();
let executor =
TaskExecutor::new(self.tools.task_manager.clone(), config).with_execute_fn(execute_fn);
let results = executor.execute_all().await?;
for result in &results {
info!(
agent = %agent,
task_id = %result.task_id,
status = ?result.status,
duration_ms = result.duration.as_millis(),
" ✓ Task [{}] execution completed ({:?})",
result.task_id,
result.status
);
}
let (completed, total) = executor.get_progress();
info!(
agent = %agent,
"📊 Execution complete: {}/{} tasks succeeded",
completed,
total
);
info!(agent = %agent, phase = "summary", "📝 Phase 3: Generating final answer");
let task_results_summary = self
.tools
.task_manager
.get_all_tasks()
.iter()
.map(|t| {
let result_str = t.result.as_deref().unwrap_or("No result");
format!(
" - [{}] {:?}: {} → {}",
t.id, t.status, t.description, result_str
)
})
.collect::<Vec<_>>()
.join("\n");
self.memory.context.lock().await.push(Message::user(format!(
"All tasks have been completed. Below are the execution results for each task:\n{}\n\n\
Based on the above results, use the final_answer tool to provide the final answer.\n\
**Note**: Do not create new tasks or perform other operations; provide the final answer directly.",
task_results_summary
)));
for _ in 0..self.config.max_iterations {
let steps = self.think().await?;
if let Some(answer) = self.process_steps(steps).await? {
info!(agent = %agent, "🏁 Task planning mode execution complete");
return Ok(answer);
}
}
warn!(agent = %agent, max = self.config.max_iterations, "Reached maximum iteration count");
Err(ReactError::Agent(AgentError::MaxIterationsExceeded(
self.config.max_iterations,
)))
}
async fn plan_tasks(&self, task: &str) -> crate::error::Result<()> {
let agent = self.config.agent_name.clone();
let planning_prompt = format!(
"{}\n\n\
Please first use the think tool to analyze the problem, then use the plan tool to create a plan, and finally use create_task to create each sub-task one by one.\n\n\
**Important: Task decomposition rules**\n\
- Break the problem down into the most granular sub-tasks possible; each sub-task should do only one thing\n\
- Do not set dependencies for mutually independent sub-tasks; let them execute in parallel\n\
- Only set dependencies when one task truly needs another task's result\n\
- Build a wide and shallow DAG (directed acyclic graph) rather than a linear chain\n\
- **Planning is only complete after all sub-tasks are created; do not stop after creating only a portion**",
task
);
self.memory
.context
.lock()
.await
.push(Message::user(planning_prompt));
let planning_max_rounds = self.config.max_iterations;
let mut has_created_tasks = false;
for round in 0..planning_max_rounds {
debug!(agent = %agent, round = round + 1, "📐 Planning round");
let steps = self.think().await?;
let mut created_task_this_round = false;
for step in steps {
if let StepType::Call {
tool_call_id,
function_name,
arguments,
} = step
{
if function_name == "create_task" {
created_task_this_round = true;
}
let result = self.execute_tool(&function_name, &arguments).await?;
if function_name == "final_answer" {
info!(agent = %agent, "🏁 Final answer generated during planning phase");
return Ok(());
}
self.memory.context.lock().await.push(Message::tool_result(
tool_call_id,
function_name,
result,
));
}
}
if created_task_this_round {
has_created_tasks = true;
}
if has_created_tasks && !created_task_this_round {
let task_count = self.tools.task_manager.get_all_tasks().len();
info!(
agent = %agent,
task_count = task_count,
"📐 Planning complete, {} sub-tasks created",
task_count
);
break;
}
}
Ok(())
}
fn build_execute_fn(&self) -> TaskExecuteFn {
let agent_name = self.config.agent_name.clone();
let model = self.config.model_name.clone();
let is_orchestrator = self.config.role == AgentRole::Orchestrator;
let subagent_names: Vec<String> = if is_orchestrator {
self.tools
.subagent_registry
.agents_map()
.try_read()
.map(|agents| agents.keys().cloned().collect())
.unwrap_or_default()
} else {
Vec::new()
};
Arc::new(move |ctx: TaskContext| {
let agent_name = agent_name.clone();
let model = model.clone();
let _subagent_names = subagent_names.clone();
Box::pin(async move {
debug!(
agent = %agent_name,
task_id = %ctx.task_id,
"Executing task via LLM"
);
let upstream_context = ctx.format_upstream_context();
let user_prompt = if upstream_context.is_empty() {
ctx.description.clone()
} else {
format!("{}\n\n{}", upstream_context, ctx.description)
};
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.unwrap_or_default();
let messages = vec![
crate::llm::types::Message::system(
"You are a task execution assistant. Please complete the following task and provide the result directly.".to_string(),
),
crate::llm::types::Message::user(user_prompt),
];
let response = crate::llm::chat(
Arc::new(client),
&model,
&messages,
Some(0.3),
Some(4096u32),
Some(false),
None,
None,
None,
)
.await
.map_err(|e| ReactError::Other(format!("LLM execution failed: {}", e)))?;
let content = response
.choices
.first()
.and_then(|c| c.message.content.as_text())
.unwrap_or_else(|| format!("Task [{}] completed (no output)", ctx.task_id));
Ok(content)
})
})
}
pub async fn execute_with_planning_fn(
&self,
task: &str,
execute_fn: TaskExecuteFn,
) -> crate::error::Result<String> {
let _agent = self.config.agent_name.clone();
self.reset_messages().await;
self.tools.task_manager.clear();
if !self.has_planning_tools() {
return self.run_direct(task).await;
}
self.plan_tasks(task).await?;
if self.tools.task_manager.get_all_tasks().is_empty() {
return self.run_direct(task).await;
}
let config = TaskExecutorConfig::default();
let executor =
TaskExecutor::new(self.tools.task_manager.clone(), config).with_execute_fn(execute_fn);
let _results = executor.execute_all().await?;
let task_results_summary = self
.tools
.task_manager
.get_all_tasks()
.iter()
.map(|t| {
let result_str = t.result.as_deref().unwrap_or("No result");
format!(
" - [{}] {:?}: {} → {}",
t.id, t.status, t.description, result_str
)
})
.collect::<Vec<_>>()
.join("\n");
self.memory.context.lock().await.push(Message::user(format!(
"All tasks have been completed. Below are the execution results for each task:\n{}\n\n\
Based on the above results, use the final_answer tool to provide the final answer.",
task_results_summary
)));
for _ in 0..self.config.max_iterations {
let steps = self.think().await?;
if let Some(answer) = self.process_steps(steps).await? {
return Ok(answer);
}
}
Err(ReactError::Agent(AgentError::MaxIterationsExceeded(
self.config.max_iterations,
)))
}
}