aster/agents/
subagent_handler.rs1use crate::{
2 agents::{subagent_task_config::TaskConfig, AgentEvent, SessionConfig},
3 conversation::{message::Message, Conversation},
4 execution::manager::AgentManager,
5 prompt_template::render_global_file,
6 recipe::Recipe,
7};
8use anyhow::{anyhow, Result};
9use futures::StreamExt;
10use rmcp::model::{ErrorCode, ErrorData};
11use serde::Serialize;
12use std::future::Future;
13use std::pin::Pin;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, info};
16
17#[derive(Serialize)]
18struct SubagentPromptContext {
19 max_turns: usize,
20 subagent_id: String,
21 task_instructions: String,
22 tool_count: usize,
23 available_tools: String,
24}
25
26type AgentMessagesFuture =
27 Pin<Box<dyn Future<Output = Result<(Conversation, Option<String>)>> + Send>>;
28
29pub async fn run_complete_subagent_task(
31 recipe: Recipe,
32 task_config: TaskConfig,
33 return_last_only: bool,
34 session_id: String,
35 images: Option<Vec<crate::agents::subagent_tool::ImageData>>,
36 cancellation_token: Option<CancellationToken>,
37) -> Result<String, anyhow::Error> {
38 let (messages, final_output) =
39 get_agent_messages(recipe, task_config, session_id, images, cancellation_token)
40 .await
41 .map_err(|e| {
42 ErrorData::new(
43 ErrorCode::INTERNAL_ERROR,
44 format!("Failed to execute task: {}", e),
45 None,
46 )
47 })?;
48
49 if let Some(output) = final_output {
50 return Ok(output);
51 }
52
53 let response_text = if return_last_only {
54 messages
55 .messages()
56 .last()
57 .and_then(|message| {
58 message.content.iter().find_map(|content| match content {
59 crate::conversation::message::MessageContent::Text(text_content) => {
60 Some(text_content.text.clone())
61 }
62 _ => None,
63 })
64 })
65 .unwrap_or_else(|| String::from("No text content in last message"))
66 } else {
67 let all_text_content: Vec<String> = messages
68 .iter()
69 .flat_map(|message| {
70 message.content.iter().filter_map(|content| {
71 match content {
72 crate::conversation::message::MessageContent::Text(text_content) => {
73 Some(text_content.text.clone())
74 }
75 crate::conversation::message::MessageContent::ToolResponse(
76 tool_response,
77 ) => {
78 if let Ok(result) = &tool_response.tool_result {
80 let texts: Vec<String> = result
81 .content
82 .iter()
83 .filter_map(|content| {
84 if let rmcp::model::RawContent::Text(raw_text_content) =
85 &content.raw
86 {
87 Some(raw_text_content.text.clone())
88 } else {
89 None
90 }
91 })
92 .collect();
93 if !texts.is_empty() {
94 Some(format!("Tool result: {}", texts.join("\n")))
95 } else {
96 None
97 }
98 } else {
99 None
100 }
101 }
102 _ => None,
103 }
104 })
105 })
106 .collect();
107
108 all_text_content.join("\n")
109 };
110
111 Ok(response_text)
112}
113
114fn get_agent_messages(
115 recipe: Recipe,
116 task_config: TaskConfig,
117 session_id: String,
118 images: Option<Vec<crate::agents::subagent_tool::ImageData>>,
119 cancellation_token: Option<CancellationToken>,
120) -> AgentMessagesFuture {
121 Box::pin(async move {
122 let system_instructions = recipe.instructions.clone().unwrap_or_default();
123 let user_task = recipe
124 .prompt
125 .clone()
126 .unwrap_or_else(|| "Begin.".to_string());
127
128 let agent_manager = AgentManager::instance()
129 .await
130 .map_err(|e| anyhow!("Failed to create AgentManager: {}", e))?;
131
132 let agent = agent_manager
133 .get_or_create_agent(session_id.clone())
134 .await
135 .map_err(|e| anyhow!("Failed to get sub agent session file path: {}", e))?;
136
137 agent
138 .update_provider(task_config.provider, &session_id)
139 .await
140 .map_err(|e| anyhow!("Failed to set provider on sub agent: {}", e))?;
141
142 for extension in task_config.extensions {
143 if let Err(e) = agent.add_extension(extension.clone()).await {
144 debug!(
145 "Failed to add extension '{}' to subagent: {}",
146 extension.name(),
147 e
148 );
149 }
150 }
151
152 let has_response_schema = recipe.response.is_some();
153 agent
154 .apply_recipe_components(recipe.sub_recipes.clone(), recipe.response.clone(), true)
155 .await;
156
157 let tools = agent.list_tools(None).await;
158 let subagent_prompt = render_global_file(
159 "subagent_system.md",
160 &SubagentPromptContext {
161 max_turns: task_config
162 .max_turns
163 .expect("TaskConfig always sets max_turns"),
164 subagent_id: session_id.clone(),
165 task_instructions: system_instructions,
166 tool_count: tools.len(),
167 available_tools: tools
168 .iter()
169 .map(|t| t.name.to_string())
170 .collect::<Vec<_>>()
171 .join(", "),
172 },
173 )
174 .map_err(|e| anyhow!("Failed to render subagent system prompt: {}", e))?;
175 agent.override_system_prompt(subagent_prompt).await;
176
177 let mut user_message = Message::user().with_text(user_task);
178
179 if let Some(images) = images {
181 for image in images {
182 user_message = user_message.with_image(image.data, image.mime_type);
183 }
184 }
185
186 let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);
187
188 if let Some(activities) = recipe.activities {
189 for activity in activities {
190 info!("Recipe activity: {}", activity);
191 }
192 }
193 let session_config = SessionConfig {
194 id: session_id.clone(),
195 schedule_id: None,
196 max_turns: task_config.max_turns.map(|v| v as u32),
197 retry_config: recipe.retry,
198 system_prompt: None,
199 };
200
201 let mut stream = crate::session_context::with_session_id(Some(session_id.clone()), async {
202 agent
203 .reply(user_message, session_config, cancellation_token)
204 .await
205 })
206 .await
207 .map_err(|e| anyhow!("Failed to get reply from agent: {}", e))?;
208 while let Some(message_result) = stream.next().await {
209 match message_result {
210 Ok(AgentEvent::Message(msg)) => conversation.push(msg),
211 Ok(AgentEvent::McpNotification(_)) | Ok(AgentEvent::ModelChange { .. }) => {}
212 Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
213 conversation = updated_conversation;
214 }
215 Err(e) => {
216 tracing::error!("Error receiving message from subagent: {}", e);
217 break;
218 }
219 }
220 }
221
222 let final_output = if has_response_schema {
223 agent
224 .final_output_tool
225 .lock()
226 .await
227 .as_ref()
228 .and_then(|tool| tool.final_output.clone())
229 } else {
230 None
231 };
232
233 Ok((conversation, final_output))
234 })
235}