Skip to main content

aster/agents/
subagent_handler.rs

1use crate::{
2    agents::{subagent_task_config::TaskConfig, AgentEvent, SessionConfig},
3    conversation::{message::Message, Conversation},
4    execution::manager::AgentManager,
5    prompt_template::render_global_file,
6    recipe::Recipe,
7};
8use anyhow::{anyhow, Result};
9use futures::StreamExt;
10use rmcp::model::{ErrorCode, ErrorData};
11use serde::Serialize;
12use std::future::Future;
13use std::pin::Pin;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, info};
16
17#[derive(Serialize)]
18struct SubagentPromptContext {
19    max_turns: usize,
20    subagent_id: String,
21    task_instructions: String,
22    tool_count: usize,
23    available_tools: String,
24}
25
26type AgentMessagesFuture =
27    Pin<Box<dyn Future<Output = Result<(Conversation, Option<String>)>> + Send>>;
28
29/// Standalone function to run a complete subagent task with output options
30pub async fn run_complete_subagent_task(
31    recipe: Recipe,
32    task_config: TaskConfig,
33    return_last_only: bool,
34    session_id: String,
35    images: Option<Vec<crate::agents::subagent_tool::ImageData>>,
36    cancellation_token: Option<CancellationToken>,
37) -> Result<String, anyhow::Error> {
38    let (messages, final_output) =
39        get_agent_messages(recipe, task_config, session_id, images, cancellation_token)
40            .await
41            .map_err(|e| {
42                ErrorData::new(
43                    ErrorCode::INTERNAL_ERROR,
44                    format!("Failed to execute task: {}", e),
45                    None,
46                )
47            })?;
48
49    if let Some(output) = final_output {
50        return Ok(output);
51    }
52
53    let response_text = if return_last_only {
54        messages
55            .messages()
56            .last()
57            .and_then(|message| {
58                message.content.iter().find_map(|content| match content {
59                    crate::conversation::message::MessageContent::Text(text_content) => {
60                        Some(text_content.text.clone())
61                    }
62                    _ => None,
63                })
64            })
65            .unwrap_or_else(|| String::from("No text content in last message"))
66    } else {
67        let all_text_content: Vec<String> = messages
68            .iter()
69            .flat_map(|message| {
70                message.content.iter().filter_map(|content| {
71                    match content {
72                        crate::conversation::message::MessageContent::Text(text_content) => {
73                            Some(text_content.text.clone())
74                        }
75                        crate::conversation::message::MessageContent::ToolResponse(
76                            tool_response,
77                        ) => {
78                            // Extract text from tool response
79                            if let Ok(result) = &tool_response.tool_result {
80                                let texts: Vec<String> = result
81                                    .content
82                                    .iter()
83                                    .filter_map(|content| {
84                                        if let rmcp::model::RawContent::Text(raw_text_content) =
85                                            &content.raw
86                                        {
87                                            Some(raw_text_content.text.clone())
88                                        } else {
89                                            None
90                                        }
91                                    })
92                                    .collect();
93                                if !texts.is_empty() {
94                                    Some(format!("Tool result: {}", texts.join("\n")))
95                                } else {
96                                    None
97                                }
98                            } else {
99                                None
100                            }
101                        }
102                        _ => None,
103                    }
104                })
105            })
106            .collect();
107
108        all_text_content.join("\n")
109    };
110
111    Ok(response_text)
112}
113
114fn get_agent_messages(
115    recipe: Recipe,
116    task_config: TaskConfig,
117    session_id: String,
118    images: Option<Vec<crate::agents::subagent_tool::ImageData>>,
119    cancellation_token: Option<CancellationToken>,
120) -> AgentMessagesFuture {
121    Box::pin(async move {
122        let system_instructions = recipe.instructions.clone().unwrap_or_default();
123        let user_task = recipe
124            .prompt
125            .clone()
126            .unwrap_or_else(|| "Begin.".to_string());
127
128        let agent_manager = AgentManager::instance()
129            .await
130            .map_err(|e| anyhow!("Failed to create AgentManager: {}", e))?;
131
132        let agent = agent_manager
133            .get_or_create_agent(session_id.clone())
134            .await
135            .map_err(|e| anyhow!("Failed to get sub agent session file path: {}", e))?;
136
137        agent
138            .update_provider(task_config.provider, &session_id)
139            .await
140            .map_err(|e| anyhow!("Failed to set provider on sub agent: {}", e))?;
141
142        for extension in task_config.extensions {
143            if let Err(e) = agent.add_extension(extension.clone()).await {
144                debug!(
145                    "Failed to add extension '{}' to subagent: {}",
146                    extension.name(),
147                    e
148                );
149            }
150        }
151
152        let has_response_schema = recipe.response.is_some();
153        agent
154            .apply_recipe_components(recipe.sub_recipes.clone(), recipe.response.clone(), true)
155            .await;
156
157        let tools = agent.list_tools(None).await;
158        let subagent_prompt = render_global_file(
159            "subagent_system.md",
160            &SubagentPromptContext {
161                max_turns: task_config
162                    .max_turns
163                    .expect("TaskConfig always sets max_turns"),
164                subagent_id: session_id.clone(),
165                task_instructions: system_instructions,
166                tool_count: tools.len(),
167                available_tools: tools
168                    .iter()
169                    .map(|t| t.name.to_string())
170                    .collect::<Vec<_>>()
171                    .join(", "),
172            },
173        )
174        .map_err(|e| anyhow!("Failed to render subagent system prompt: {}", e))?;
175        agent.override_system_prompt(subagent_prompt).await;
176
177        let mut user_message = Message::user().with_text(user_task);
178
179        // 添加图片内容到用户消息中
180        if let Some(images) = images {
181            for image in images {
182                user_message = user_message.with_image(image.data, image.mime_type);
183            }
184        }
185
186        let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);
187
188        if let Some(activities) = recipe.activities {
189            for activity in activities {
190                info!("Recipe activity: {}", activity);
191            }
192        }
193        let session_config = SessionConfig {
194            id: session_id.clone(),
195            schedule_id: None,
196            max_turns: task_config.max_turns.map(|v| v as u32),
197            retry_config: recipe.retry,
198            system_prompt: None,
199        };
200
201        let mut stream = crate::session_context::with_session_id(Some(session_id.clone()), async {
202            agent
203                .reply(user_message, session_config, cancellation_token)
204                .await
205        })
206        .await
207        .map_err(|e| anyhow!("Failed to get reply from agent: {}", e))?;
208        while let Some(message_result) = stream.next().await {
209            match message_result {
210                Ok(AgentEvent::Message(msg)) => conversation.push(msg),
211                Ok(AgentEvent::McpNotification(_)) | Ok(AgentEvent::ModelChange { .. }) => {}
212                Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
213                    conversation = updated_conversation;
214                }
215                Err(e) => {
216                    tracing::error!("Error receiving message from subagent: {}", e);
217                    break;
218                }
219            }
220        }
221
222        let final_output = if has_response_schema {
223            agent
224                .final_output_tool
225                .lock()
226                .await
227                .as_ref()
228                .and_then(|tool| tool.final_output.clone())
229        } else {
230            None
231        };
232
233        Ok((conversation, final_output))
234    })
235}