1use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use anyhow::Result;
8use tokio::sync::RwLock;
9use tokio::task::JoinHandle;
10use tracing::{debug, error, info};
11use uuid::Uuid;
12
13use agent_diva_core::bus::{InboundMessage, MessageBus};
14use agent_diva_core::security::{SecurityConfig, SecurityLevel, SecurityPolicy};
15use agent_diva_core::utils::truncate;
16use agent_diva_providers::base::{LLMProvider, Message};
17use agent_diva_tools::registry::ToolRegistry;
18use agent_diva_tools::{
19 filesystem::{ListDirTool, ReadFileTool, WriteFileTool},
20 shell::ExecTool,
21 web::{WebFetchTool, WebSearchTool},
22};
23
24use crate::tool_config::network::NetworkToolConfig;
25
26pub struct SubagentManager {
32 provider: Arc<dyn LLMProvider>,
33 workspace: PathBuf,
34 bus: MessageBus,
35 model: String,
36 network_config: Arc<RwLock<NetworkToolConfig>>,
37 exec_timeout: u64,
38 restrict_to_workspace: bool,
39 running_tasks: Arc<tokio::sync::Mutex<HashMap<String, JoinHandle<()>>>>,
40}
41
42impl SubagentManager {
43 #[allow(clippy::too_many_arguments)]
45 pub fn new(
46 provider: Arc<dyn LLMProvider>,
47 workspace: PathBuf,
48 bus: MessageBus,
49 model: Option<String>,
50 network_config: NetworkToolConfig,
51 exec_timeout: Option<u64>,
52 restrict_to_workspace: bool,
53 ) -> Self {
54 let model = model.unwrap_or_else(|| provider.get_default_model());
55 let exec_timeout = exec_timeout.unwrap_or(30);
56
57 Self {
58 provider,
59 workspace,
60 bus,
61 model,
62 network_config: Arc::new(RwLock::new(network_config)),
63 exec_timeout,
64 restrict_to_workspace,
65 running_tasks: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
66 }
67 }
68
69 pub async fn update_network_config(&self, network_config: NetworkToolConfig) {
70 let mut guard = self.network_config.write().await;
71 *guard = network_config;
72 }
73
74 pub async fn spawn(
85 &self,
86 task: String,
87 label: Option<String>,
88 origin_channel: String,
89 origin_chat_id: String,
90 ) -> Result<String> {
91 let task_id = Uuid::new_v4().to_string()[..8].to_string();
92 let display_label = label.unwrap_or_else(|| {
93 if task.len() > 30 {
94 let mut end = 30;
95 while !task.is_char_boundary(end) {
96 end -= 1;
97 }
98 format!("{}...", &task[..end])
99 } else {
100 task.clone()
101 }
102 });
103
104 let provider = Arc::clone(&self.provider);
105 let workspace = self.workspace.clone();
106 let bus = self.bus.clone();
107 let model = self.model.clone();
108 let network_config = self.network_config.read().await.clone();
109 let exec_timeout = self.exec_timeout;
110 let restrict_to_workspace = self.restrict_to_workspace;
111
112 let task_id_clone = task_id.clone();
113 let display_label_clone = display_label.clone();
114 let running_tasks = Arc::clone(&self.running_tasks);
115
116 let bg_task = tokio::spawn(async move {
118 Self::run_subagent(
119 task_id_clone.clone(),
120 task.clone(),
121 display_label_clone.clone(),
122 origin_channel,
123 origin_chat_id,
124 provider,
125 workspace,
126 bus.clone(),
127 model,
128 network_config,
129 exec_timeout,
130 restrict_to_workspace,
131 )
132 .await;
133
134 let mut tasks = running_tasks.lock().await;
136 tasks.remove(&task_id_clone);
137 });
138
139 let mut tasks = self.running_tasks.lock().await;
141 tasks.insert(task_id.clone(), bg_task);
142 drop(tasks);
143
144 info!("Spawned subagent [{}]: {}", task_id, display_label);
145 Ok(format!(
146 "Subagent [{}] started (id: {}). I'll notify you when it completes.",
147 display_label, task_id
148 ))
149 }
150
151 #[allow(clippy::too_many_arguments)]
153 async fn run_subagent(
154 task_id: String,
155 task: String,
156 label: String,
157 origin_channel: String,
158 origin_chat_id: String,
159 provider: Arc<dyn LLMProvider>,
160 workspace: PathBuf,
161 bus: MessageBus,
162 model: String,
163 network_config: NetworkToolConfig,
164 exec_timeout: u64,
165 restrict_to_workspace: bool,
166 ) {
167 info!("Subagent [{}] starting task: {}", task_id, label);
168
169 let result = Self::execute_subagent_task(
170 &task_id,
171 &task,
172 &provider,
173 &workspace,
174 &model,
175 &network_config,
176 exec_timeout,
177 restrict_to_workspace,
178 )
179 .await;
180
181 let (final_result, status) = match result {
182 Ok(content) => {
183 info!("Subagent [{}] completed successfully", task_id);
184 (content, "ok")
185 }
186 Err(e) => {
187 let error_msg = format!("Error: {}", e);
188 error!("Subagent [{}] failed: {}", task_id, e);
189 (error_msg, "error")
190 }
191 };
192
193 Self::announce_result(
194 &task_id,
195 &label,
196 &task,
197 &final_result,
198 &origin_channel,
199 &origin_chat_id,
200 status,
201 &bus,
202 )
203 .await;
204 }
205
206 #[allow(clippy::too_many_arguments)]
208 async fn execute_subagent_task(
209 task_id: &str,
210 task: &str,
211 provider: &Arc<dyn LLMProvider>,
212 workspace: &Path,
213 model: &str,
214 network_config: &NetworkToolConfig,
215 _exec_timeout: u64,
216 restrict_to_workspace: bool,
217 ) -> Result<String> {
218 let mut tools = ToolRegistry::new();
220 let security_config = if restrict_to_workspace {
221 SecurityConfig {
222 level: SecurityLevel::Standard,
223 workspace_only: true,
224 ..SecurityConfig::default()
225 }
226 } else {
227 SecurityConfig::default()
228 };
229 let security = Arc::new(SecurityPolicy::with_config(
230 workspace.to_path_buf(),
231 security_config,
232 ));
233
234 tools.register(Arc::new(ReadFileTool::new(security.clone())));
235 tools.register(Arc::new(WriteFileTool::new(security.clone())));
236 tools.register(Arc::new(ListDirTool::new(security)));
237 tools.register(Arc::new(ExecTool::new()));
238 if network_config.web.search.enabled {
239 tools.register(Arc::new(WebSearchTool::with_provider_and_max_results(
240 network_config.web.search.provider.clone(),
241 network_config.web.search.api_key.clone(),
242 network_config.web.search.normalized_max_results(),
243 )));
244 }
245 if network_config.web.fetch.enabled {
246 tools.register(Arc::new(WebFetchTool::new()));
247 }
248
249 let system_prompt = Self::build_subagent_prompt(task, workspace);
251 let mut messages = vec![
252 Message::system(system_prompt),
253 Message::user(task.to_string()),
254 ];
255
256 let max_iterations = 15;
258 let mut iteration = 0;
259 let mut final_result: Option<String> = None;
260
261 while iteration < max_iterations {
262 iteration += 1;
263
264 let response = provider
265 .chat(
266 messages.clone(),
267 Some(tools.get_definitions()),
268 Some(model.to_string()),
269 2000,
270 0.7,
271 )
272 .await?;
273
274 if response.has_tool_calls() {
275 messages.push(Message {
277 role: "assistant".to_string(),
278 content: response.content.clone().unwrap_or_default(),
279 name: None,
280 tool_call_id: None,
281 tool_calls: Some(response.tool_calls.clone()),
282 reasoning_content: response.reasoning_content.clone(),
283 thinking_blocks: None,
284 });
285
286 for tool_call in &response.tool_calls {
288 let args_json = serde_json::to_value(&tool_call.arguments)?;
289 let args_str = serde_json::to_string(&tool_call.arguments)?;
290 debug!(
291 "Subagent [{}] executing: {} with arguments: {}",
292 task_id, tool_call.name, args_str
293 );
294 let result = tools.execute(&tool_call.name, args_json).await;
295 messages.push(Message::tool(result, tool_call.id.clone()));
296 }
297 } else {
298 final_result = response.content;
299 break;
300 }
301 }
302
303 Ok(final_result
304 .unwrap_or_else(|| "Task completed but no final response was generated.".to_string()))
305 }
306
307 #[allow(clippy::too_many_arguments)]
309 async fn announce_result(
310 task_id: &str,
311 label: &str,
312 task: &str,
313 result: &str,
314 origin_channel: &str,
315 origin_chat_id: &str,
316 status: &str,
317 bus: &MessageBus,
318 ) {
319 let status_text = if status == "ok" {
320 "completed successfully"
321 } else {
322 "failed"
323 };
324
325 let announce_content = format!(
326 "[Subagent '{}' {}]\n\nTask: {}\n\nResult:\n{}\n\nSummarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like \"subagent\" or task IDs.",
327 label, status_text, task, result
328 );
329
330 let msg = InboundMessage::new(origin_channel, "subagent", origin_chat_id, announce_content);
333
334 if let Err(e) = bus.publish_inbound(msg) {
335 error!("Failed to announce subagent result: {}", e);
336 }
337
338 debug!(
339 "Subagent [{}] announced result to {}:{}",
340 task_id, origin_channel, origin_chat_id
341 );
342 }
343
344 fn build_subagent_prompt(task: &str, workspace: &Path) -> String {
346 let soul_summary = Self::build_identity_summary(workspace);
347 format!(
348 r#"# Subagent
349
350You are a subagent spawned by the main agent to complete a specific task.
351
352## Your Task
353{}
354
355## Inherited Identity
356{}
357
358## Rules
3591. Stay focused - complete only the assigned task, nothing else
3602. Your final response will be reported back to the main agent
3613. Do not initiate conversations or take on side tasks
3624. Be concise but informative in your findings
363
364## What You Can Do
365- Read and write files in the workspace
366- Execute shell commands
367- Search the web and fetch web pages
368- Complete the task thoroughly
369
370## What You Cannot Do
371- Send messages directly to users (no message tool available)
372- Spawn other subagents
373- Access the main agent's conversation history
374
375## Workspace
376Your workspace is at: {}
377
378When you have completed the task, provide a clear summary of your findings or actions."#,
379 task,
380 soul_summary,
381 workspace.display()
382 )
383 }
384
385 fn build_identity_summary(workspace: &Path) -> String {
386 let mut sections = Vec::new();
387 for file in ["SOUL.md", "IDENTITY.md", "USER.md"] {
388 let path = workspace.join(file);
389 let Ok(raw) = std::fs::read_to_string(path) else {
390 continue;
391 };
392 let trimmed = raw.trim();
393 if trimmed.is_empty() {
394 continue;
395 }
396 let content = if trimmed.chars().count() > 800 {
397 truncate(trimmed, 3200)
398 } else {
399 trimmed.to_string()
400 };
401 sections.push(format!("### {}\n{}", file, content));
402 }
403
404 if sections.is_empty() {
405 "No persisted soul identity found. Follow the task faithfully, remain concise, and preserve user intent.".to_string()
406 } else {
407 sections.join("\n\n")
408 }
409 }
410
411 pub async fn get_running_count(&self) -> usize {
413 let tasks = self.running_tasks.lock().await;
414 tasks.len()
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::SubagentManager;
421
422 #[test]
423 fn test_build_subagent_prompt_includes_identity_summary() {
424 let temp = tempfile::tempdir().unwrap();
425 std::fs::write(temp.path().join("SOUL.md"), "# Soul\n\nKeep concise.").unwrap();
426 std::fs::write(temp.path().join("IDENTITY.md"), "# Identity\n\nAgent Diva.").unwrap();
427 std::fs::write(
428 temp.path().join("USER.md"),
429 "# User\n\nPrefer direct replies.",
430 )
431 .unwrap();
432
433 let prompt = SubagentManager::build_subagent_prompt("analyze logs", temp.path());
434 assert!(prompt.contains("## Inherited Identity"));
435 assert!(prompt.contains("### SOUL.md"));
436 assert!(prompt.contains("### IDENTITY.md"));
437 assert!(prompt.contains("### USER.md"));
438 }
439
440 #[test]
441 fn test_build_subagent_prompt_fallback_without_identity_files() {
442 let temp = tempfile::tempdir().unwrap();
443 let prompt = SubagentManager::build_subagent_prompt("analyze logs", temp.path());
444 assert!(prompt.contains("No persisted soul identity found"));
445 }
446}