1use super::{
7 DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8 orchestrator::Orchestrator,
9 subtask::{SubTask, SubTaskResult, SubTaskStatus},
10};
11use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
12
13pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
15use crate::{
16 agent::Agent,
17 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
18 rlm::RlmExecutor,
19 swarm::{SwarmArtifact, SwarmStats},
20 tool::ToolRegistry,
21 worktree::{WorktreeInfo, WorktreeManager},
22};
23use anyhow::Result;
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::Instant;
27use tokio::sync::{RwLock, mpsc};
28use tokio::time::{Duration, timeout};
29
30const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
32
33const RESPONSE_RESERVE_TOKENS: usize = 8_192;
35
36const TRUNCATION_THRESHOLD: f64 = 0.85;
38
39fn estimate_tokens(text: &str) -> usize {
41 (text.len() as f64 / 3.5).ceil() as usize
45}
46
47fn estimate_message_tokens(message: &Message) -> usize {
49 let mut tokens = 4; for part in &message.content {
52 tokens += match part {
53 ContentPart::Text { text } => estimate_tokens(text),
54 ContentPart::ToolCall {
55 id,
56 name,
57 arguments,
58 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
59 ContentPart::ToolResult {
60 tool_call_id,
61 content,
62 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
63 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, };
65 }
66
67 tokens
68}
69
70fn estimate_total_tokens(messages: &[Message]) -> usize {
72 messages.iter().map(estimate_message_tokens).sum()
73}
74
75fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
83 let target_tokens =
84 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
85
86 let current_tokens = estimate_total_tokens(messages);
87 if current_tokens <= target_tokens {
88 return;
89 }
90
91 tracing::warn!(
92 current_tokens = current_tokens,
93 target_tokens = target_tokens,
94 context_limit = context_limit,
95 "Context approaching limit, truncating conversation history"
96 );
97
98 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
102 if after_tool_truncation <= target_tokens {
103 tracing::info!(
104 old_tokens = current_tokens,
105 new_tokens = after_tool_truncation,
106 "Truncated large tool results, context now within limits"
107 );
108 return;
109 }
110
111 if messages.len() <= 6 {
113 tracing::warn!(
114 tokens = after_tool_truncation,
115 target = target_tokens,
116 "Cannot truncate further - conversation too short"
117 );
118 return;
119 }
120
121 let keep_start = 2;
124 let keep_end = 4;
125 let removable_count = messages.len() - keep_start - keep_end;
126
127 if removable_count == 0 {
128 return;
129 }
130
131 let removed_messages: Vec<_> = messages
133 .drain(keep_start..keep_start + removable_count)
134 .collect();
135 let summary = summarize_removed_messages(&removed_messages);
136
137 messages.insert(
139 keep_start,
140 Message {
141 role: Role::User,
142 content: vec![ContentPart::Text {
143 text: format!(
144 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
145 removed_messages.len(),
146 summary
147 ),
148 }],
149 },
150 );
151
152 let new_tokens = estimate_total_tokens(messages);
153 tracing::info!(
154 removed_messages = removed_messages.len(),
155 old_tokens = current_tokens,
156 new_tokens = new_tokens,
157 "Truncated conversation history"
158 );
159}
160
161fn summarize_removed_messages(messages: &[Message]) -> String {
163 let mut summary = String::new();
164 let mut tool_calls: Vec<String> = Vec::new();
165
166 for msg in messages {
167 for part in &msg.content {
168 if let ContentPart::ToolCall { name, .. } = part {
169 if !tool_calls.contains(name) {
170 tool_calls.push(name.clone());
171 }
172 }
173 }
174 }
175
176 if !tool_calls.is_empty() {
177 summary.push_str(&format!(
178 "Tools used in truncated history: {}",
179 tool_calls.join(", ")
180 ));
181 }
182
183 summary
184}
185
186fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
188 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
190 let mut saved_tokens = 0usize;
191
192 for message in messages.iter_mut() {
193 for part in message.content.iter_mut() {
194 if let ContentPart::ToolResult { content, .. } = part {
195 let tokens = estimate_tokens(content);
196 if tokens > max_tokens_per_result {
197 let old_len = content.len();
198 *content = truncate_single_result(content, char_limit);
199 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
200 if content.len() < old_len {
201 truncated_count += 1;
202 }
203 }
204 }
205 }
206 }
207
208 if truncated_count > 0 {
209 tracing::info!(
210 truncated_count = truncated_count,
211 saved_tokens = saved_tokens,
212 max_tokens_per_result = max_tokens_per_result,
213 "Truncated large tool results"
214 );
215 }
216}
217
218fn truncate_single_result(content: &str, max_chars: usize) -> String {
220 if content.len() <= max_chars {
221 return content.to_string();
222 }
223
224 let safe_limit = {
226 let mut limit = max_chars.min(content.len());
227 while limit > 0 && !content.is_char_boundary(limit) {
228 limit -= 1;
229 }
230 limit
231 };
232
233 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
235
236 let truncated = format!(
237 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
238 &content[..break_point],
239 content.len(),
240 break_point
241 );
242
243 tracing::debug!(
244 original_len = content.len(),
245 truncated_len = truncated.len(),
246 "Truncated large result"
247 );
248
249 truncated
250}
251
252const RLM_THRESHOLD_CHARS: usize = 50_000;
254
255const SIMPLE_TRUNCATE_CHARS: usize = 6000;
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum AgentLoopExit {
260 Completed,
261 MaxStepsReached,
262 TimedOut,
263}
264
265async fn process_large_result_with_rlm(
267 content: &str,
268 tool_name: &str,
269 provider: Arc<dyn Provider>,
270 model: &str,
271) -> String {
272 if content.len() <= SIMPLE_TRUNCATE_CHARS {
273 return content.to_string();
274 }
275
276 if content.len() <= RLM_THRESHOLD_CHARS {
278 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
279 }
280
281 tracing::info!(
283 tool = %tool_name,
284 content_len = content.len(),
285 "Using RLM to process large tool result"
286 );
287
288 let query = format!(
289 "Summarize the key information from this {} output. \
290 Focus on: errors, warnings, important findings, and actionable items. \
291 Be concise but thorough.",
292 tool_name
293 );
294
295 let mut executor =
296 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
297
298 match executor.analyze(&query).await {
299 Ok(result) => {
300 tracing::info!(
301 tool = %tool_name,
302 original_len = content.len(),
303 summary_len = result.answer.len(),
304 iterations = result.iterations,
305 "RLM summarized large result"
306 );
307
308 format!(
309 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
310 tool_name,
311 content.len(),
312 result.answer.len(),
313 result.answer
314 )
315 }
316 Err(e) => {
317 tracing::warn!(
318 tool = %tool_name,
319 error = %e,
320 "RLM analysis failed, falling back to truncation"
321 );
322 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
323 }
324 }
325}
326
327pub struct SwarmExecutor {
329 config: SwarmConfig,
330 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
332 event_tx: Option<mpsc::Sender<SwarmEvent>>,
334}
335
336impl SwarmExecutor {
337 pub fn new(config: SwarmConfig) -> Self {
339 Self {
340 config,
341 coordinator_agent: None,
342 event_tx: None,
343 }
344 }
345
346 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
348 self.event_tx = Some(tx);
349 self
350 }
351
352 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
354 tracing::debug!("Setting coordinator agent for swarm execution");
355 self.coordinator_agent = Some(agent);
356 self
357 }
358
359 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
361 self.coordinator_agent.as_ref()
362 }
363
364 fn try_send_event(&self, event: SwarmEvent) {
366 if let Some(ref tx) = self.event_tx {
367 let _ = tx.try_send(event);
368 }
369 }
370
371 pub async fn execute(
373 &self,
374 task: &str,
375 strategy: DecompositionStrategy,
376 ) -> Result<SwarmResult> {
377 let start_time = Instant::now();
378
379 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
381
382 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
383
384 let subtasks = orchestrator.decompose(task, strategy).await?;
386
387 if subtasks.is_empty() {
388 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
389 return Ok(SwarmResult {
390 success: false,
391 result: String::new(),
392 subtask_results: Vec::new(),
393 stats: SwarmStats::default(),
394 artifacts: Vec::new(),
395 error: Some("No subtasks generated".to_string()),
396 });
397 }
398
399 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
400
401 self.try_send_event(SwarmEvent::Started {
402 task: task.to_string(),
403 total_subtasks: subtasks.len(),
404 });
405
406 self.try_send_event(SwarmEvent::Decomposed {
408 subtasks: subtasks
409 .iter()
410 .map(|s| SubTaskInfo {
411 id: s.id.clone(),
412 name: s.name.clone(),
413 status: SubTaskStatus::Pending,
414 stage: s.stage,
415 dependencies: s.dependencies.clone(),
416 agent_name: s.specialty.clone(),
417 current_tool: None,
418 steps: 0,
419 max_steps: self.config.max_steps_per_subagent,
420 tool_call_history: Vec::new(),
421 messages: Vec::new(),
422 output: None,
423 error: None,
424 })
425 .collect(),
426 });
427
428 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
430 let mut all_results: Vec<SubTaskResult> = Vec::new();
431 let artifacts: Vec<SwarmArtifact> = Vec::new();
432
433 let completed_results: Arc<RwLock<HashMap<String, String>>> =
435 Arc::new(RwLock::new(HashMap::new()));
436
437 for stage in 0..=max_stage {
438 let stage_start = Instant::now();
439
440 let stage_subtasks: Vec<SubTask> = orchestrator
441 .subtasks_for_stage(stage)
442 .into_iter()
443 .cloned()
444 .collect();
445
446 tracing::debug!(
447 "Stage {} has {} subtasks (max_stage={})",
448 stage,
449 stage_subtasks.len(),
450 max_stage
451 );
452
453 if stage_subtasks.is_empty() {
454 continue;
455 }
456
457 tracing::info!(
458 provider_name = %orchestrator.provider(),
459 "Executing stage {} with {} subtasks",
460 stage,
461 stage_subtasks.len()
462 );
463
464 let stage_results = self
466 .execute_stage(&orchestrator, stage_subtasks, completed_results.clone())
467 .await?;
468
469 {
471 let mut completed = completed_results.write().await;
472 for result in &stage_results {
473 completed.insert(result.subtask_id.clone(), result.result.clone());
474 }
475 }
476
477 let stage_time = stage_start.elapsed().as_millis() as u64;
479 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
480 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
481
482 orchestrator.stats_mut().stages.push(StageStats {
483 stage,
484 subagent_count: stage_results.len(),
485 max_steps,
486 total_steps,
487 execution_time_ms: stage_time,
488 });
489
490 for result in &stage_results {
492 orchestrator.complete_subtask(&result.subtask_id, result.clone());
493 }
494
495 let stage_completed = stage_results.iter().filter(|r| r.success).count();
497 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
498 self.try_send_event(SwarmEvent::StageComplete {
499 stage,
500 completed: stage_completed,
501 failed: stage_failed,
502 });
503
504 all_results.extend(stage_results);
505 }
506
507 let provider_name = orchestrator.provider().to_string();
509
510 let stats = orchestrator.stats_mut();
512 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
513 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
514 stats.calculate_critical_path();
515 stats.calculate_speedup();
516
517 let success = all_results.iter().all(|r| r.success);
519 let result = self.aggregate_results(&all_results).await?;
520
521 tracing::info!(
522 provider_name = %provider_name,
523 "Swarm execution complete: {} subtasks, {:.1}x speedup",
524 all_results.len(),
525 stats.speedup_factor
526 );
527
528 let final_stats = orchestrator.stats().clone();
529 self.try_send_event(SwarmEvent::Complete {
530 success,
531 stats: final_stats.clone(),
532 });
533
534 Ok(SwarmResult {
535 success,
536 result,
537 subtask_results: all_results,
538 stats: final_stats,
539 artifacts,
540 error: None,
541 })
542 }
543
544 async fn execute_stage(
546 &self,
547 orchestrator: &Orchestrator,
548 subtasks: Vec<SubTask>,
549 completed_results: Arc<RwLock<HashMap<String, String>>>,
550 ) -> Result<Vec<SubTaskResult>> {
551 let mut handles: Vec<(
552 String,
553 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
554 )> = Vec::new();
555
556 let semaphore = Arc::new(tokio::sync::Semaphore::new(
558 self.config.max_concurrent_requests,
559 ));
560 let delay_ms = self.config.request_delay_ms;
561
562 let model = orchestrator.model().to_string();
564 let provider_name = orchestrator.provider().to_string();
565 let providers = orchestrator.providers();
566 let provider = providers
567 .get(&provider_name)
568 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
569
570 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
571
572 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
574 let tool_definitions: Vec<_> = tool_registry
576 .definitions()
577 .into_iter()
578 .filter(|t| t.name != "question")
579 .collect();
580
581 let worktree_manager = if self.config.worktree_enabled {
583 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
584 std::env::current_dir()
585 .map(|p| p.to_string_lossy().to_string())
586 .unwrap_or_else(|_| ".".to_string())
587 });
588
589 match WorktreeManager::new(&working_dir) {
590 Ok(mgr) => {
591 tracing::info!(
592 working_dir = %working_dir,
593 "Worktree isolation enabled for parallel sub-agents"
594 );
595 Some(Arc::new(mgr) as Arc<WorktreeManager>)
596 }
597 Err(e) => {
598 tracing::warn!(
599 error = %e,
600 "Failed to create worktree manager, falling back to shared directory"
601 );
602 None
603 }
604 }
605 } else {
606 None
607 };
608
609 for (idx, subtask) in subtasks.into_iter().enumerate() {
610 let model = model.clone();
611 let _provider_name = provider_name.clone();
612 let provider = Arc::clone(&provider);
613
614 let context = {
616 let completed = completed_results.read().await;
617 let mut dep_context = String::new();
618 for dep_id in &subtask.dependencies {
619 if let Some(result) = completed.get(dep_id) {
620 dep_context.push_str(&format!(
621 "\n--- Result from dependency {} ---\n{}\n",
622 dep_id, result
623 ));
624 }
625 }
626 dep_context
627 };
628
629 let instruction = subtask.instruction.clone();
630 let subtask_name = subtask.name.clone();
631 let specialty = subtask.specialty.clone().unwrap_or_default();
632 let subtask_id = subtask.id.clone();
633 let subtask_id_for_handle = subtask_id.clone();
634 let max_steps = self.config.max_steps_per_subagent;
635 let timeout_secs = self.config.subagent_timeout_secs;
636
637 let tools = tool_definitions.clone();
639 let registry = Arc::clone(&tool_registry);
640 let sem = Arc::clone(&semaphore);
641 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
643 let event_tx = self.event_tx.clone();
644
645 let handle = tokio::spawn(async move {
647 if stagger_delay > 0 {
649 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
650 }
651 let _permit = sem
652 .acquire()
653 .await
654 .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
655
656 let start = Instant::now();
657
658 let worktree_info = if let Some(ref mgr) = worktree_mgr {
660 let task_slug = subtask_id.replace("-", "_");
661 match mgr.create(&task_slug) {
662 Ok(wt) => {
663 tracing::info!(
664 subtask_id = %subtask_id,
665 worktree_path = %wt.path.display(),
666 worktree_branch = %wt.branch,
667 "Created worktree for sub-agent"
668 );
669 Some(wt)
670 }
671 Err(e) => {
672 tracing::warn!(
673 subtask_id = %subtask_id,
674 error = %e,
675 "Failed to create worktree, using shared directory"
676 );
677 None
678 }
679 }
680 } else {
681 None
682 };
683
684 let working_dir = worktree_info
686 .as_ref()
687 .map(|wt| wt.path.display().to_string())
688 .unwrap_or_else(|| ".".to_string());
689
690 let working_path = std::path::Path::new(&working_dir);
692 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
693 .map(|(content, _)| {
694 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
695 })
696 .unwrap_or_default();
697
698 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
700 let system_prompt = format!(
701 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
702
703WORKING DIRECTORY: {}
704All file operations should be relative to this directory.
705
706IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
707
708Available tools:
709- read: Read file contents
710- write: Write/create files
711- edit: Edit existing files (search and replace)
712- multiedit: Make multiple edits at once
713- glob: Find files by pattern
714- grep: Search file contents
715- bash: Run shell commands (use cwd: \"{}\" parameter)
716- webfetch: Fetch web pages
717- prd: Generate structured PRD for complex tasks
718- ralph: Run autonomous agent loop on a PRD
719
720COMPLEX TASKS:
721If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
7221. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
7232. Break down into user stories with acceptance criteria
7243. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
7254. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
726
727NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
728
729When done, provide a brief summary of what you accomplished.{agents_md_content}",
730 specialty,
731 subtask_id,
732 working_dir,
733 working_dir,
734 prd_filename,
735 prd_filename,
736 prd_filename
737 );
738
739 let user_prompt = if context.is_empty() {
740 format!("Complete this task:\n\n{}", instruction)
741 } else {
742 format!(
743 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
744 instruction, context
745 )
746 };
747
748 if let Some(ref tx) = event_tx {
750 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
751 id: subtask_id.clone(),
752 name: subtask_name.clone(),
753 status: SubTaskStatus::Running,
754 agent_name: Some(format!("agent-{}", subtask_id)),
755 });
756 let _ = tx.try_send(SwarmEvent::AgentStarted {
757 subtask_id: subtask_id.clone(),
758 agent_name: format!("agent-{}", subtask_id),
759 specialty: specialty.clone(),
760 });
761 }
762
763 let result = run_agent_loop(
765 provider,
766 &model,
767 &system_prompt,
768 &user_prompt,
769 tools,
770 registry,
771 max_steps,
772 timeout_secs,
773 event_tx.clone(),
774 subtask_id.clone(),
775 )
776 .await;
777
778 match result {
779 Ok((output, steps, tool_calls, exit_reason)) => {
780 let (success, status, error) = match exit_reason {
781 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
782 AgentLoopExit::MaxStepsReached => (
783 false,
784 SubTaskStatus::Failed,
785 Some(format!("Sub-agent hit max steps ({max_steps})")),
786 ),
787 AgentLoopExit::TimedOut => (
788 false,
789 SubTaskStatus::TimedOut,
790 Some(format!("Sub-agent timed out after {timeout_secs}s")),
791 ),
792 };
793
794 if let Some(ref tx) = event_tx {
796 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
797 id: subtask_id.clone(),
798 name: subtask_name.clone(),
799 status,
800 agent_name: Some(format!("agent-{}", subtask_id)),
801 });
802 if let Some(ref message) = error {
803 let _ = tx.try_send(SwarmEvent::AgentError {
804 subtask_id: subtask_id.clone(),
805 error: message.clone(),
806 });
807 }
808 let _ = tx.try_send(SwarmEvent::AgentOutput {
809 subtask_id: subtask_id.clone(),
810 output: output.clone(),
811 });
812 let _ = tx.try_send(SwarmEvent::AgentComplete {
813 subtask_id: subtask_id.clone(),
814 success,
815 steps,
816 });
817 }
818 Ok((
819 SubTaskResult {
820 subtask_id: subtask_id.clone(),
821 subagent_id: format!("agent-{}", subtask_id),
822 success,
823 result: output,
824 steps,
825 tool_calls,
826 execution_time_ms: start.elapsed().as_millis() as u64,
827 error,
828 artifacts: Vec::new(),
829 },
830 worktree_info,
831 ))
832 }
833 Err(e) => {
834 if let Some(ref tx) = event_tx {
836 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
837 id: subtask_id.clone(),
838 name: subtask_name.clone(),
839 status: SubTaskStatus::Failed,
840 agent_name: Some(format!("agent-{}", subtask_id)),
841 });
842 let _ = tx.try_send(SwarmEvent::AgentError {
843 subtask_id: subtask_id.clone(),
844 error: e.to_string(),
845 });
846 let _ = tx.try_send(SwarmEvent::AgentComplete {
847 subtask_id: subtask_id.clone(),
848 success: false,
849 steps: 0,
850 });
851 }
852 Ok((
853 SubTaskResult {
854 subtask_id: subtask_id.clone(),
855 subagent_id: format!("agent-{}", subtask_id),
856 success: false,
857 result: String::new(),
858 steps: 0,
859 tool_calls: 0,
860 execution_time_ms: start.elapsed().as_millis() as u64,
861 error: Some(e.to_string()),
862 artifacts: Vec::new(),
863 },
864 worktree_info,
865 ))
866 }
867 }
868 });
869
870 handles.push((subtask_id_for_handle, handle));
871 }
872
873 let mut results = Vec::new();
875 let auto_merge = self.config.worktree_auto_merge;
876
877 for (subtask_id, handle) in handles {
878 match handle.await {
879 Ok(Ok((mut result, worktree_info))) => {
880 if let Some(wt) = worktree_info {
882 if result.success && auto_merge {
883 if let Some(ref mgr) = worktree_manager {
884 match mgr.merge(&wt) {
885 Ok(merge_result) => {
886 if merge_result.success {
887 tracing::info!(
888 subtask_id = %result.subtask_id,
889 files_changed = merge_result.files_changed,
890 "Merged worktree changes successfully"
891 );
892 result.result.push_str(&format!(
893 "\n\n--- Merge Result ---\n{}",
894 merge_result.summary
895 ));
896 } else if merge_result.aborted {
897 tracing::warn!(
899 subtask_id = %result.subtask_id,
900 summary = %merge_result.summary,
901 "Merge was aborted"
902 );
903 result.result.push_str(&format!(
904 "\n\n--- Merge Aborted ---\n{}",
905 merge_result.summary
906 ));
907 } else {
908 tracing::warn!(
909 subtask_id = %result.subtask_id,
910 conflicts = ?merge_result.conflicts,
911 "Merge had conflicts"
912 );
913 result.result.push_str(&format!(
914 "\n\n--- Merge Conflicts ---\n{}",
915 merge_result.summary
916 ));
917 }
918
919 if let Err(e) = mgr.cleanup(&wt) {
921 tracing::warn!(
922 error = %e,
923 "Failed to cleanup worktree"
924 );
925 }
926 }
927 Err(e) => {
928 tracing::error!(
929 subtask_id = %result.subtask_id,
930 error = %e,
931 "Failed to merge worktree"
932 );
933 }
934 }
935 }
936 } else if !result.success {
937 tracing::info!(
939 subtask_id = %result.subtask_id,
940 worktree_path = %wt.path.display(),
941 "Keeping worktree for debugging (task failed)"
942 );
943 }
944 }
945
946 results.push(result);
947 }
948 Ok(Err(e)) => {
949 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
950 if let Some(ref tx) = self.event_tx {
951 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
952 id: subtask_id.clone(),
953 name: subtask_id.clone(),
954 status: SubTaskStatus::Failed,
955 agent_name: Some(format!("agent-{}", subtask_id)),
956 });
957 let _ = tx.try_send(SwarmEvent::AgentError {
958 subtask_id: subtask_id.clone(),
959 error: e.to_string(),
960 });
961 let _ = tx.try_send(SwarmEvent::AgentComplete {
962 subtask_id: subtask_id.clone(),
963 success: false,
964 steps: 0,
965 });
966 }
967 results.push(SubTaskResult {
968 subtask_id: subtask_id.clone(),
969 subagent_id: format!("agent-{}", subtask_id),
970 success: false,
971 result: String::new(),
972 steps: 0,
973 tool_calls: 0,
974 execution_time_ms: 0,
975 error: Some(e.to_string()),
976 artifacts: Vec::new(),
977 });
978 }
979 Err(e) => {
980 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
981 if let Some(ref tx) = self.event_tx {
982 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
983 id: subtask_id.clone(),
984 name: subtask_id.clone(),
985 status: SubTaskStatus::Failed,
986 agent_name: Some(format!("agent-{}", subtask_id)),
987 });
988 let _ = tx.try_send(SwarmEvent::AgentError {
989 subtask_id: subtask_id.clone(),
990 error: format!("Task join error: {}", e),
991 });
992 let _ = tx.try_send(SwarmEvent::AgentComplete {
993 subtask_id: subtask_id.clone(),
994 success: false,
995 steps: 0,
996 });
997 }
998 results.push(SubTaskResult {
999 subtask_id: subtask_id.clone(),
1000 subagent_id: format!("agent-{}", subtask_id),
1001 success: false,
1002 result: String::new(),
1003 steps: 0,
1004 tool_calls: 0,
1005 execution_time_ms: 0,
1006 error: Some(format!("Task join error: {}", e)),
1007 artifacts: Vec::new(),
1008 });
1009 }
1010 }
1011 }
1012
1013 Ok(results)
1014 }
1015
1016 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1018 let mut aggregated = String::new();
1019
1020 for (i, result) in results.iter().enumerate() {
1021 if result.success {
1022 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1023 } else {
1024 aggregated.push_str(&format!(
1025 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1026 i + 1,
1027 result.error.as_deref().unwrap_or("Unknown error")
1028 ));
1029 }
1030 }
1031
1032 Ok(aggregated)
1033 }
1034
1035 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1037 self.execute(task, DecompositionStrategy::None).await
1038 }
1039}
1040
1041pub struct SwarmExecutorBuilder {
1043 config: SwarmConfig,
1044}
1045
1046impl SwarmExecutorBuilder {
1047 pub fn new() -> Self {
1048 Self {
1049 config: SwarmConfig::default(),
1050 }
1051 }
1052
1053 pub fn max_subagents(mut self, max: usize) -> Self {
1054 self.config.max_subagents = max;
1055 self
1056 }
1057
1058 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1059 self.config.max_steps_per_subagent = max;
1060 self
1061 }
1062
1063 pub fn max_total_steps(mut self, max: usize) -> Self {
1064 self.config.max_total_steps = max;
1065 self
1066 }
1067
1068 pub fn timeout_secs(mut self, secs: u64) -> Self {
1069 self.config.subagent_timeout_secs = secs;
1070 self
1071 }
1072
1073 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1074 self.config.parallel_enabled = enabled;
1075 self
1076 }
1077
1078 pub fn build(self) -> SwarmExecutor {
1079 SwarmExecutor::new(self.config)
1080 }
1081}
1082
1083impl Default for SwarmExecutorBuilder {
1084 fn default() -> Self {
1085 Self::new()
1086 }
1087}
1088
1089#[allow(clippy::too_many_arguments)]
1091pub async fn run_agent_loop(
1093 provider: Arc<dyn Provider>,
1094 model: &str,
1095 system_prompt: &str,
1096 user_prompt: &str,
1097 tools: Vec<crate::provider::ToolDefinition>,
1098 registry: Arc<ToolRegistry>,
1099 max_steps: usize,
1100 timeout_secs: u64,
1101 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1102 subtask_id: String,
1103) -> Result<(String, usize, usize, AgentLoopExit)> {
1104 let temperature = 0.7;
1106
1107 tracing::info!(
1108 model = %model,
1109 max_steps = max_steps,
1110 timeout_secs = timeout_secs,
1111 "Sub-agent starting agentic loop"
1112 );
1113 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1114 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1115
1116 let mut messages = vec![
1118 Message {
1119 role: Role::System,
1120 content: vec![ContentPart::Text {
1121 text: system_prompt.to_string(),
1122 }],
1123 },
1124 Message {
1125 role: Role::User,
1126 content: vec![ContentPart::Text {
1127 text: user_prompt.to_string(),
1128 }],
1129 },
1130 ];
1131
1132 let mut steps = 0;
1133 let mut total_tool_calls = 0;
1134 let mut final_output = String::new();
1135
1136 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1137
1138 let exit_reason = loop {
1139 if steps >= max_steps {
1140 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1141 break AgentLoopExit::MaxStepsReached;
1142 }
1143
1144 if Instant::now() > deadline {
1145 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1146 break AgentLoopExit::TimedOut;
1147 }
1148
1149 steps += 1;
1150 tracing::info!(step = steps, "Sub-agent step starting");
1151
1152 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1154
1155 let request = CompletionRequest {
1156 messages: messages.clone(),
1157 tools: tools.clone(),
1158 model: model.to_string(),
1159 temperature: Some(temperature),
1160 top_p: None,
1161 max_tokens: Some(8192),
1162 stop: Vec::new(),
1163 };
1164
1165 let step_start = Instant::now();
1166 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1167 let step_duration = step_start.elapsed();
1168
1169 tracing::info!(
1170 step = steps,
1171 duration_ms = step_duration.as_millis() as u64,
1172 finish_reason = ?response.finish_reason,
1173 prompt_tokens = response.usage.prompt_tokens,
1174 completion_tokens = response.usage.completion_tokens,
1175 "Sub-agent step completed LLM call"
1176 );
1177
1178 let mut text_parts = Vec::new();
1180 let mut tool_calls = Vec::new();
1181
1182 for part in &response.message.content {
1183 match part {
1184 ContentPart::Text { text } => {
1185 text_parts.push(text.clone());
1186 }
1187 ContentPart::ToolCall {
1188 id,
1189 name,
1190 arguments,
1191 } => {
1192 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1193 }
1194 _ => {}
1195 }
1196 }
1197
1198 if !text_parts.is_empty() {
1200 final_output = text_parts.join("\n");
1201 tracing::info!(
1202 step = steps,
1203 output_len = final_output.len(),
1204 "Sub-agent text output"
1205 );
1206 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1207
1208 if let Some(ref tx) = event_tx {
1210 let preview = if final_output.len() > 500 {
1211 let mut end = 500;
1212 while end > 0 && !final_output.is_char_boundary(end) {
1213 end -= 1;
1214 }
1215 format!("{}...", &final_output[..end])
1216 } else {
1217 final_output.clone()
1218 };
1219 let _ = tx.try_send(SwarmEvent::AgentMessage {
1220 subtask_id: subtask_id.clone(),
1221 entry: AgentMessageEntry {
1222 role: "assistant".to_string(),
1223 content: preview,
1224 is_tool_call: false,
1225 },
1226 });
1227 }
1228 }
1229
1230 if !tool_calls.is_empty() {
1232 tracing::info!(
1233 step = steps,
1234 num_tool_calls = tool_calls.len(),
1235 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1236 "Sub-agent requesting tool calls"
1237 );
1238 }
1239
1240 messages.push(response.message.clone());
1242
1243 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1245 tracing::info!(
1246 steps = steps,
1247 total_tool_calls = total_tool_calls,
1248 "Sub-agent finished"
1249 );
1250 break AgentLoopExit::Completed;
1251 }
1252
1253 let mut tool_results = Vec::new();
1255
1256 for (call_id, tool_name, arguments) in tool_calls {
1257 total_tool_calls += 1;
1258
1259 if let Some(ref tx) = event_tx {
1261 let _ = tx.try_send(SwarmEvent::AgentToolCall {
1262 subtask_id: subtask_id.clone(),
1263 tool_name: tool_name.clone(),
1264 });
1265 }
1266
1267 tracing::info!(
1268 step = steps,
1269 tool_call_id = %call_id,
1270 tool = %tool_name,
1271 "Executing tool"
1272 );
1273 tracing::debug!(
1274 tool = %tool_name,
1275 arguments = %arguments,
1276 "Tool call arguments"
1277 );
1278
1279 let tool_start = Instant::now();
1280 let mut tool_success = true;
1281 let result = if let Some(tool) = registry.get(&tool_name) {
1282 let args: serde_json::Value =
1284 serde_json::from_str(&arguments).unwrap_or_else(|e| {
1285 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1286 serde_json::json!({})
1287 });
1288
1289 match tool.execute(args).await {
1290 Ok(r) => {
1291 if r.success {
1292 tracing::info!(
1293 tool = %tool_name,
1294 duration_ms = tool_start.elapsed().as_millis() as u64,
1295 success = true,
1296 "Tool execution completed"
1297 );
1298 r.output
1299 } else {
1300 tool_success = false;
1301 tracing::warn!(
1302 tool = %tool_name,
1303 error = %r.output,
1304 "Tool returned error"
1305 );
1306 format!("Tool error: {}", r.output)
1307 }
1308 }
1309 Err(e) => {
1310 tool_success = false;
1311 tracing::error!(
1312 tool = %tool_name,
1313 error = %e,
1314 "Tool execution failed"
1315 );
1316 format!("Tool execution failed: {}", e)
1317 }
1318 }
1319 } else {
1320 tool_success = false;
1321 tracing::error!(tool = %tool_name, "Unknown tool requested");
1322 format!("Unknown tool: {}", tool_name)
1323 };
1324
1325 if let Some(ref tx) = event_tx {
1327 let input_preview = if arguments.len() > 200 {
1328 let mut end = 200;
1329 while end > 0 && !arguments.is_char_boundary(end) {
1330 end -= 1;
1331 }
1332 format!("{}...", &arguments[..end])
1333 } else {
1334 arguments.clone()
1335 };
1336 let output_preview = if result.len() > 500 {
1337 let mut end = 500;
1338 while end > 0 && !result.is_char_boundary(end) {
1339 end -= 1;
1340 }
1341 format!("{}...", &result[..end])
1342 } else {
1343 result.clone()
1344 };
1345 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1346 subtask_id: subtask_id.clone(),
1347 detail: AgentToolCallDetail {
1348 tool_name: tool_name.clone(),
1349 input_preview,
1350 output_preview,
1351 success: tool_success,
1352 },
1353 });
1354 }
1355
1356 tracing::debug!(
1357 tool = %tool_name,
1358 result_len = result.len(),
1359 "Tool result"
1360 );
1361
1362 let result = if result.len() > RLM_THRESHOLD_CHARS {
1364 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1366 .await
1367 } else {
1368 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1370 };
1371
1372 tool_results.push((call_id, tool_name, result));
1373 }
1374
1375 for (call_id, _tool_name, result) in tool_results {
1377 messages.push(Message {
1378 role: Role::Tool,
1379 content: vec![ContentPart::ToolResult {
1380 tool_call_id: call_id,
1381 content: result,
1382 }],
1383 });
1384 }
1385
1386 deadline = Instant::now() + Duration::from_secs(timeout_secs);
1388 };
1389
1390 Ok((final_output, steps, total_tool_calls, exit_reason))
1391}