1use super::{
7 DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8 orchestrator::Orchestrator,
9 subtask::{SubTask, SubTaskResult, SubTaskStatus},
10};
11use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
12
13pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
15use crate::{
16 agent::Agent,
17 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
18 rlm::RlmExecutor,
19 telemetry::SwarmTelemetryCollector,
20 swarm::{SwarmArtifact, SwarmStats},
21 tool::ToolRegistry,
22 worktree::{WorktreeInfo, WorktreeManager},
23};
24use anyhow::Result;
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28use tokio::sync::{RwLock, mpsc};
29use tokio::time::{Duration, timeout};
30
31const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
33
34const RESPONSE_RESERVE_TOKENS: usize = 8_192;
36
37const TRUNCATION_THRESHOLD: f64 = 0.85;
39
40fn estimate_tokens(text: &str) -> usize {
42 (text.len() as f64 / 3.5).ceil() as usize
46}
47
48fn estimate_message_tokens(message: &Message) -> usize {
50 let mut tokens = 4; for part in &message.content {
53 tokens += match part {
54 ContentPart::Text { text } => estimate_tokens(text),
55 ContentPart::ToolCall {
56 id,
57 name,
58 arguments,
59 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
60 ContentPart::ToolResult {
61 tool_call_id,
62 content,
63 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
64 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, };
66 }
67
68 tokens
69}
70
71fn estimate_total_tokens(messages: &[Message]) -> usize {
73 messages.iter().map(estimate_message_tokens).sum()
74}
75
76fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
84 let target_tokens =
85 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
86
87 let current_tokens = estimate_total_tokens(messages);
88 if current_tokens <= target_tokens {
89 return;
90 }
91
92 tracing::warn!(
93 current_tokens = current_tokens,
94 target_tokens = target_tokens,
95 context_limit = context_limit,
96 "Context approaching limit, truncating conversation history"
97 );
98
99 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
103 if after_tool_truncation <= target_tokens {
104 tracing::info!(
105 old_tokens = current_tokens,
106 new_tokens = after_tool_truncation,
107 "Truncated large tool results, context now within limits"
108 );
109 return;
110 }
111
112 if messages.len() <= 6 {
114 tracing::warn!(
115 tokens = after_tool_truncation,
116 target = target_tokens,
117 "Cannot truncate further - conversation too short"
118 );
119 return;
120 }
121
122 let keep_start = 2;
125 let keep_end = 4;
126 let removable_count = messages.len() - keep_start - keep_end;
127
128 if removable_count == 0 {
129 return;
130 }
131
132 let removed_messages: Vec<_> = messages
134 .drain(keep_start..keep_start + removable_count)
135 .collect();
136 let summary = summarize_removed_messages(&removed_messages);
137
138 messages.insert(
140 keep_start,
141 Message {
142 role: Role::User,
143 content: vec![ContentPart::Text {
144 text: format!(
145 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
146 removed_messages.len(),
147 summary
148 ),
149 }],
150 },
151 );
152
153 let new_tokens = estimate_total_tokens(messages);
154 tracing::info!(
155 removed_messages = removed_messages.len(),
156 old_tokens = current_tokens,
157 new_tokens = new_tokens,
158 "Truncated conversation history"
159 );
160}
161
162fn summarize_removed_messages(messages: &[Message]) -> String {
164 let mut summary = String::new();
165 let mut tool_calls: Vec<String> = Vec::new();
166
167 for msg in messages {
168 for part in &msg.content {
169 if let ContentPart::ToolCall { name, .. } = part {
170 if !tool_calls.contains(name) {
171 tool_calls.push(name.clone());
172 }
173 }
174 }
175 }
176
177 if !tool_calls.is_empty() {
178 summary.push_str(&format!(
179 "Tools used in truncated history: {}",
180 tool_calls.join(", ")
181 ));
182 }
183
184 summary
185}
186
187fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
189 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
191 let mut saved_tokens = 0usize;
192
193 for message in messages.iter_mut() {
194 for part in message.content.iter_mut() {
195 if let ContentPart::ToolResult { content, .. } = part {
196 let tokens = estimate_tokens(content);
197 if tokens > max_tokens_per_result {
198 let old_len = content.len();
199 *content = truncate_single_result(content, char_limit);
200 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
201 if content.len() < old_len {
202 truncated_count += 1;
203 }
204 }
205 }
206 }
207 }
208
209 if truncated_count > 0 {
210 tracing::info!(
211 truncated_count = truncated_count,
212 saved_tokens = saved_tokens,
213 max_tokens_per_result = max_tokens_per_result,
214 "Truncated large tool results"
215 );
216 }
217}
218
219fn truncate_single_result(content: &str, max_chars: usize) -> String {
221 if content.len() <= max_chars {
222 return content.to_string();
223 }
224
225 let safe_limit = {
227 let mut limit = max_chars.min(content.len());
228 while limit > 0 && !content.is_char_boundary(limit) {
229 limit -= 1;
230 }
231 limit
232 };
233
234 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
236
237 let truncated = format!(
238 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
239 &content[..break_point],
240 content.len(),
241 break_point
242 );
243
244 tracing::debug!(
245 original_len = content.len(),
246 truncated_len = truncated.len(),
247 "Truncated large result"
248 );
249
250 truncated
251}
252
253const RLM_THRESHOLD_CHARS: usize = 50_000;
255
256const SIMPLE_TRUNCATE_CHARS: usize = 6000;
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum AgentLoopExit {
261 Completed,
262 MaxStepsReached,
263 TimedOut,
264}
265
266async fn process_large_result_with_rlm(
268 content: &str,
269 tool_name: &str,
270 provider: Arc<dyn Provider>,
271 model: &str,
272) -> String {
273 if content.len() <= SIMPLE_TRUNCATE_CHARS {
274 return content.to_string();
275 }
276
277 if content.len() <= RLM_THRESHOLD_CHARS {
279 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
280 }
281
282 tracing::info!(
284 tool = %tool_name,
285 content_len = content.len(),
286 "Using RLM to process large tool result"
287 );
288
289 let query = format!(
290 "Summarize the key information from this {} output. \
291 Focus on: errors, warnings, important findings, and actionable items. \
292 Be concise but thorough.",
293 tool_name
294 );
295
296 let mut executor =
297 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
298
299 match executor.analyze(&query).await {
300 Ok(result) => {
301 tracing::info!(
302 tool = %tool_name,
303 original_len = content.len(),
304 summary_len = result.answer.len(),
305 iterations = result.iterations,
306 "RLM summarized large result"
307 );
308
309 format!(
310 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
311 tool_name,
312 content.len(),
313 result.answer.len(),
314 result.answer
315 )
316 }
317 Err(e) => {
318 tracing::warn!(
319 tool = %tool_name,
320 error = %e,
321 "RLM analysis failed, falling back to truncation"
322 );
323 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
324 }
325 }
326}
327
328pub struct SwarmExecutor {
330 config: SwarmConfig,
331 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
333 event_tx: Option<mpsc::Sender<SwarmEvent>>,
335 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
337}
338
339impl SwarmExecutor {
340 pub fn new(config: SwarmConfig) -> Self {
342 Self {
343 config,
344 coordinator_agent: None,
345 event_tx: None,
346 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
347 }
348 }
349
350 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
352 self.event_tx = Some(tx);
353 self
354 }
355
356 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
358 tracing::debug!("Setting coordinator agent for swarm execution");
359 self.coordinator_agent = Some(agent);
360 self
361 }
362
363 pub fn with_telemetry(mut self, telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>) -> Self {
365 self.telemetry = telemetry;
366 self
367 }
368
369 pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
371 Arc::clone(&self.telemetry)
372 }
373 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
375 self.coordinator_agent.as_ref()
376 }
377
378 fn try_send_event(&self, event: SwarmEvent) {
380 if let Some(ref tx) = self.event_tx {
381 let _ = tx.try_send(event);
382 }
383 }
384
385 pub async fn execute(
387 &self,
388 task: &str,
389 strategy: DecompositionStrategy,
390 ) -> Result<SwarmResult> {
391 let start_time = Instant::now();
392
393 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
395
396 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
397
398 let subtasks = orchestrator.decompose(task, strategy).await?;
400
401 if subtasks.is_empty() {
402 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
403 return Ok(SwarmResult {
404 success: false,
405 result: String::new(),
406 subtask_results: Vec::new(),
407 stats: SwarmStats::default(),
408 artifacts: Vec::new(),
409 error: Some("No subtasks generated".to_string()),
410 });
411 }
412
413 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
414
415 self.try_send_event(SwarmEvent::Started {
416 task: task.to_string(),
417 total_subtasks: subtasks.len(),
418 });
419
420 self.try_send_event(SwarmEvent::Decomposed {
422 subtasks: subtasks
423 .iter()
424 .map(|s| SubTaskInfo {
425 id: s.id.clone(),
426 name: s.name.clone(),
427 status: SubTaskStatus::Pending,
428 stage: s.stage,
429 dependencies: s.dependencies.clone(),
430 agent_name: s.specialty.clone(),
431 current_tool: None,
432 steps: 0,
433 max_steps: self.config.max_steps_per_subagent,
434 tool_call_history: Vec::new(),
435 messages: Vec::new(),
436 output: None,
437 error: None,
438 })
439 .collect(),
440 });
441
442 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
444 let mut all_results: Vec<SubTaskResult> = Vec::new();
445 let artifacts: Vec<SwarmArtifact> = Vec::new();
446
447 let swarm_id = uuid::Uuid::new_v4().to_string();
449 self.telemetry.lock().await.start_swarm(swarm_id.clone(), subtasks.len(), &format!("{:?}", strategy));
450
451
452 let completed_results: Arc<RwLock<HashMap<String, String>>> =
454 Arc::new(RwLock::new(HashMap::new()));
455
456 for stage in 0..=max_stage {
457 let stage_start = Instant::now();
458
459 let stage_subtasks: Vec<SubTask> = orchestrator
460 .subtasks_for_stage(stage)
461 .into_iter()
462 .cloned()
463 .collect();
464
465 tracing::debug!(
466 "Stage {} has {} subtasks (max_stage={})",
467 stage,
468 stage_subtasks.len(),
469 max_stage
470 );
471
472 if stage_subtasks.is_empty() {
473 continue;
474 }
475
476 tracing::info!(
477 provider_name = %orchestrator.provider(),
478 "Executing stage {} with {} subtasks",
479 stage,
480 stage_subtasks.len()
481 );
482
483 let stage_results = self
485 .execute_stage(&orchestrator, stage_subtasks, completed_results.clone(), &swarm_id)
486 .await?;
487
488 {
490 let mut completed = completed_results.write().await;
491 for result in &stage_results {
492 completed.insert(result.subtask_id.clone(), result.result.clone());
493 }
494 }
495
496 let stage_time = stage_start.elapsed().as_millis() as u64;
498 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
499 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
500
501 orchestrator.stats_mut().stages.push(StageStats {
502 stage,
503 subagent_count: stage_results.len(),
504 max_steps,
505 total_steps,
506 execution_time_ms: stage_time,
507 });
508
509 for result in &stage_results {
511 orchestrator.complete_subtask(&result.subtask_id, result.clone());
512 }
513
514 let stage_completed = stage_results.iter().filter(|r| r.success).count();
516 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
517 self.try_send_event(SwarmEvent::StageComplete {
518 stage,
519 completed: stage_completed,
520 failed: stage_failed,
521 });
522
523 all_results.extend(stage_results);
524 }
525
526 let provider_name = orchestrator.provider().to_string();
528
529 self.telemetry.lock().await.record_swarm_latency("total_execution", start_time.elapsed());
531
532 let stats = orchestrator.stats_mut();
534 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
535 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
536 stats.calculate_critical_path();
537 stats.calculate_speedup();
538
539 let success = all_results.iter().all(|r| r.success);
541
542 let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
544 let result = self.aggregate_results(&all_results).await?;
545
546 tracing::info!(
547 provider_name = %provider_name,
548 "Swarm execution complete: {} subtasks, {:.1}x speedup",
549 all_results.len(),
550 stats.speedup_factor
551 );
552
553 let final_stats = orchestrator.stats().clone();
554 self.try_send_event(SwarmEvent::Complete {
555 success,
556 stats: final_stats.clone(),
557 });
558
559 Ok(SwarmResult {
560 success,
561 result,
562 subtask_results: all_results,
563 stats: final_stats,
564 artifacts,
565 error: None,
566 })
567 }
568
569 async fn execute_stage(
571 &self,
572 orchestrator: &Orchestrator,
573 subtasks: Vec<SubTask>,
574 completed_results: Arc<RwLock<HashMap<String, String>>>,
575 swarm_id: &str,
576 ) -> Result<Vec<SubTaskResult>> {
577 let mut handles: Vec<(
578 String,
579 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
580 )> = Vec::new();
581
582 let semaphore = Arc::new(tokio::sync::Semaphore::new(
584 self.config.max_concurrent_requests,
585 ));
586 let delay_ms = self.config.request_delay_ms;
587
588 let model = orchestrator.model().to_string();
590 let provider_name = orchestrator.provider().to_string();
591 let providers = orchestrator.providers();
592 let provider = providers
593 .get(&provider_name)
594 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
595
596 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
597
598 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
600 let tool_definitions: Vec<_> = tool_registry
602 .definitions()
603 .into_iter()
604 .filter(|t| t.name != "question")
605 .collect();
606
607 let worktree_manager = if self.config.worktree_enabled {
609 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
610 std::env::current_dir()
611 .map(|p| p.to_string_lossy().to_string())
612 .unwrap_or_else(|_| ".".to_string())
613 });
614
615 match WorktreeManager::new(&working_dir) {
616 Ok(mgr) => {
617 tracing::info!(
618 working_dir = %working_dir,
619 "Worktree isolation enabled for parallel sub-agents"
620 );
621 Some(Arc::new(mgr) as Arc<WorktreeManager>)
622 }
623 Err(e) => {
624 tracing::warn!(
625 error = %e,
626 "Failed to create worktree manager, falling back to shared directory"
627 );
628 None
629 }
630 }
631 } else {
632 None
633 };
634
635 for (idx, subtask) in subtasks.into_iter().enumerate() {
636 let model = model.clone();
637 let _provider_name = provider_name.clone();
638 let provider = Arc::clone(&provider);
639
640 let context = {
642 let completed = completed_results.read().await;
643 let mut dep_context = String::new();
644 for dep_id in &subtask.dependencies {
645 if let Some(result) = completed.get(dep_id) {
646 dep_context.push_str(&format!(
647 "\n--- Result from dependency {} ---\n{}\n",
648 dep_id, result
649 ));
650 }
651 }
652 dep_context
653 };
654
655 let instruction = subtask.instruction.clone();
656 let subtask_name = subtask.name.clone();
657 let specialty = subtask.specialty.clone().unwrap_or_default();
658 let subtask_id = subtask.id.clone();
659 let subtask_id_for_handle = subtask_id.clone();
660 let max_steps = self.config.max_steps_per_subagent;
661 let timeout_secs = self.config.subagent_timeout_secs;
662
663 let tools = tool_definitions.clone();
665 let registry = Arc::clone(&tool_registry);
666 let sem = Arc::clone(&semaphore);
667 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
669 let event_tx = self.event_tx.clone();
670
671 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
673
674 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
676
677 let handle = tokio::spawn(async move {
679 if stagger_delay > 0 {
681 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
682 }
683 let _permit = sem
684 .acquire()
685 .await
686 .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
687
688 let agent_start = Instant::now();
689
690
691 let start = Instant::now();
692
693 let worktree_info = if let Some(ref mgr) = worktree_mgr {
695 let task_slug = subtask_id.replace("-", "_");
696 match mgr.create(&task_slug) {
697 Ok(wt) => {
698 tracing::info!(
699 subtask_id = %subtask_id,
700 worktree_path = %wt.path.display(),
701 worktree_branch = %wt.branch,
702 "Created worktree for sub-agent"
703 );
704 Some(wt)
705 }
706 Err(e) => {
707 tracing::warn!(
708 subtask_id = %subtask_id,
709 error = %e,
710 "Failed to create worktree, using shared directory"
711 );
712 None
713 }
714 }
715 } else {
716 None
717 };
718
719 let working_dir = worktree_info
721 .as_ref()
722 .map(|wt| wt.path.display().to_string())
723 .unwrap_or_else(|| ".".to_string());
724
725 let working_path = std::path::Path::new(&working_dir);
727 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
728 .map(|(content, _)| {
729 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
730 })
731 .unwrap_or_default();
732
733 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
735 let system_prompt = format!(
736 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
737
738WORKING DIRECTORY: {}
739All file operations should be relative to this directory.
740
741IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
742
743Available tools:
744- read: Read file contents
745- write: Write/create files
746- edit: Edit existing files (search and replace)
747- multiedit: Make multiple edits at once
748- glob: Find files by pattern
749- grep: Search file contents
750- bash: Run shell commands (use cwd: \"{}\" parameter)
751- webfetch: Fetch web pages
752- prd: Generate structured PRD for complex tasks
753- ralph: Run autonomous agent loop on a PRD
754
755COMPLEX TASKS:
756If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
7571. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
7582. Break down into user stories with acceptance criteria
7593. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
7604. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
761
762NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
763
764When done, provide a brief summary of what you accomplished.{agents_md_content}",
765 specialty,
766 subtask_id,
767 working_dir,
768 working_dir,
769 prd_filename,
770 prd_filename,
771 prd_filename
772 );
773
774 let user_prompt = if context.is_empty() {
775 format!("Complete this task:\n\n{}", instruction)
776 } else {
777 format!(
778 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
779 instruction, context
780 )
781 };
782
783 if let Some(ref tx) = event_tx {
785 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
786 id: subtask_id.clone(),
787 name: subtask_name.clone(),
788 status: SubTaskStatus::Running,
789 agent_name: Some(format!("agent-{}", subtask_id)),
790 });
791 let _ = tx.try_send(SwarmEvent::AgentStarted {
792 subtask_id: subtask_id.clone(),
793 agent_name: format!("agent-{}", subtask_id),
794 specialty: specialty.clone(),
795 });
796 }
797
798 let result = run_agent_loop(
800 provider,
801 &model,
802 &system_prompt,
803 &user_prompt,
804 tools,
805 registry,
806 max_steps,
807 timeout_secs,
808 event_tx.clone(),
809 subtask_id.clone(),
810 )
811 .await;
812
813 match result {
814 Ok((output, steps, tool_calls, exit_reason)) => {
815 let (success, status, error) = match exit_reason {
816 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
817 AgentLoopExit::MaxStepsReached => (
818 false,
819 SubTaskStatus::Failed,
820 Some(format!("Sub-agent hit max steps ({max_steps})")),
821 ),
822 AgentLoopExit::TimedOut => (
823 false,
824 SubTaskStatus::TimedOut,
825 Some(format!("Sub-agent timed out after {timeout_secs}s")),
826 ),
827 };
828
829 if let Some(ref tx) = event_tx {
831 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
832 id: subtask_id.clone(),
833 name: subtask_name.clone(),
834 status,
835 agent_name: Some(format!("agent-{}", subtask_id)),
836 });
837 if let Some(ref message) = error {
838 let _ = tx.try_send(SwarmEvent::AgentError {
839 subtask_id: subtask_id.clone(),
840 error: message.clone(),
841 });
842 }
843 let _ = tx.try_send(SwarmEvent::AgentOutput {
844 subtask_id: subtask_id.clone(),
845 output: output.clone(),
846 });
847 let _ = tx.try_send(SwarmEvent::AgentComplete {
848 subtask_id: subtask_id.clone(),
849 success,
850 steps,
851 });
852 }
853 Ok((
854 SubTaskResult {
855 subtask_id: subtask_id.clone(),
856 subagent_id: format!("agent-{}", subtask_id),
857 success,
858 result: output,
859 steps,
860 tool_calls,
861 execution_time_ms: start.elapsed().as_millis() as u64,
862 error,
863 artifacts: Vec::new(),
864 },
865 worktree_info,
866 ))
867 }
868 Err(e) => {
869 if let Some(ref tx) = event_tx {
871 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
872 id: subtask_id.clone(),
873 name: subtask_name.clone(),
874 status: SubTaskStatus::Failed,
875 agent_name: Some(format!("agent-{}", subtask_id)),
876 });
877 let _ = tx.try_send(SwarmEvent::AgentError {
878 subtask_id: subtask_id.clone(),
879 error: e.to_string(),
880 });
881 let _ = tx.try_send(SwarmEvent::AgentComplete {
882 subtask_id: subtask_id.clone(),
883 success: false,
884 steps: 0,
885 });
886 }
887 Ok((
888 SubTaskResult {
889 subtask_id: subtask_id.clone(),
890 subagent_id: format!("agent-{}", subtask_id),
891 success: false,
892 result: String::new(),
893 steps: 0,
894 tool_calls: 0,
895 execution_time_ms: start.elapsed().as_millis() as u64,
896 error: Some(e.to_string()),
897 artifacts: Vec::new(),
898 },
899 worktree_info,
900 ))
901 }
902 }
903 });
904
905 handles.push((subtask_id_for_handle, handle));
906 }
907
908 let mut results = Vec::new();
910 let auto_merge = self.config.worktree_auto_merge;
911
912 for (subtask_id, handle) in handles {
913 match handle.await {
914 Ok(Ok((mut result, worktree_info))) => {
915 if let Some(wt) = worktree_info {
917 if result.success && auto_merge {
918 if let Some(ref mgr) = worktree_manager {
919 match mgr.merge(&wt) {
920 Ok(merge_result) => {
921 if merge_result.success {
922 tracing::info!(
923 subtask_id = %result.subtask_id,
924 files_changed = merge_result.files_changed,
925 "Merged worktree changes successfully"
926 );
927 result.result.push_str(&format!(
928 "\n\n--- Merge Result ---\n{}",
929 merge_result.summary
930 ));
931 } else if merge_result.aborted {
932 tracing::warn!(
934 subtask_id = %result.subtask_id,
935 summary = %merge_result.summary,
936 "Merge was aborted"
937 );
938 result.result.push_str(&format!(
939 "\n\n--- Merge Aborted ---\n{}",
940 merge_result.summary
941 ));
942 } else {
943 tracing::warn!(
944 subtask_id = %result.subtask_id,
945 conflicts = ?merge_result.conflicts,
946 "Merge had conflicts"
947 );
948 result.result.push_str(&format!(
949 "\n\n--- Merge Conflicts ---\n{}",
950 merge_result.summary
951 ));
952 }
953
954 if let Err(e) = mgr.cleanup(&wt) {
956 tracing::warn!(
957 error = %e,
958 "Failed to cleanup worktree"
959 );
960 }
961 }
962 Err(e) => {
963 tracing::error!(
964 subtask_id = %result.subtask_id,
965 error = %e,
966 "Failed to merge worktree"
967 );
968 }
969 }
970 }
971 } else if !result.success {
972 tracing::info!(
974 subtask_id = %result.subtask_id,
975 worktree_path = %wt.path.display(),
976 "Keeping worktree for debugging (task failed)"
977 );
978 }
979 }
980
981 results.push(result);
982 }
983 Ok(Err(e)) => {
984 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
985 if let Some(ref tx) = self.event_tx {
986 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
987 id: subtask_id.clone(),
988 name: subtask_id.clone(),
989 status: SubTaskStatus::Failed,
990 agent_name: Some(format!("agent-{}", subtask_id)),
991 });
992 let _ = tx.try_send(SwarmEvent::AgentError {
993 subtask_id: subtask_id.clone(),
994 error: e.to_string(),
995 });
996 let _ = tx.try_send(SwarmEvent::AgentComplete {
997 subtask_id: subtask_id.clone(),
998 success: false,
999 steps: 0,
1000 });
1001 }
1002 results.push(SubTaskResult {
1003 subtask_id: subtask_id.clone(),
1004 subagent_id: format!("agent-{}", subtask_id),
1005 success: false,
1006 result: String::new(),
1007 steps: 0,
1008 tool_calls: 0,
1009 execution_time_ms: 0,
1010 error: Some(e.to_string()),
1011 artifacts: Vec::new(),
1012 });
1013 }
1014 Err(e) => {
1015 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1016 if let Some(ref tx) = self.event_tx {
1017 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1018 id: subtask_id.clone(),
1019 name: subtask_id.clone(),
1020 status: SubTaskStatus::Failed,
1021 agent_name: Some(format!("agent-{}", subtask_id)),
1022 });
1023 let _ = tx.try_send(SwarmEvent::AgentError {
1024 subtask_id: subtask_id.clone(),
1025 error: format!("Task join error: {}", e),
1026 });
1027 let _ = tx.try_send(SwarmEvent::AgentComplete {
1028 subtask_id: subtask_id.clone(),
1029 success: false,
1030 steps: 0,
1031 });
1032 }
1033 results.push(SubTaskResult {
1034 subtask_id: subtask_id.clone(),
1035 subagent_id: format!("agent-{}", subtask_id),
1036 success: false,
1037 result: String::new(),
1038 steps: 0,
1039 tool_calls: 0,
1040 execution_time_ms: 0,
1041 error: Some(format!("Task join error: {}", e)),
1042 artifacts: Vec::new(),
1043 });
1044 }
1045 }
1046 }
1047
1048 Ok(results)
1049 }
1050
1051 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1053 let mut aggregated = String::new();
1054
1055 for (i, result) in results.iter().enumerate() {
1056 if result.success {
1057 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1058 } else {
1059 aggregated.push_str(&format!(
1060 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1061 i + 1,
1062 result.error.as_deref().unwrap_or("Unknown error")
1063 ));
1064 }
1065 }
1066
1067 Ok(aggregated)
1068 }
1069
1070 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1072 self.execute(task, DecompositionStrategy::None).await
1073 }
1074}
1075
1076pub struct SwarmExecutorBuilder {
1078 config: SwarmConfig,
1079}
1080
1081impl SwarmExecutorBuilder {
1082 pub fn new() -> Self {
1083 Self {
1084 config: SwarmConfig::default(),
1085 }
1086 }
1087
1088 pub fn max_subagents(mut self, max: usize) -> Self {
1089 self.config.max_subagents = max;
1090 self
1091 }
1092
1093 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1094 self.config.max_steps_per_subagent = max;
1095 self
1096 }
1097
1098 pub fn max_total_steps(mut self, max: usize) -> Self {
1099 self.config.max_total_steps = max;
1100 self
1101 }
1102
1103 pub fn timeout_secs(mut self, secs: u64) -> Self {
1104 self.config.subagent_timeout_secs = secs;
1105 self
1106 }
1107
1108 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1109 self.config.parallel_enabled = enabled;
1110 self
1111 }
1112
1113 pub fn build(self) -> SwarmExecutor {
1114 SwarmExecutor::new(self.config)
1115 }
1116}
1117
1118impl Default for SwarmExecutorBuilder {
1119 fn default() -> Self {
1120 Self::new()
1121 }
1122}
1123
1124#[allow(clippy::too_many_arguments)]
1126pub async fn run_agent_loop(
1128 provider: Arc<dyn Provider>,
1129 model: &str,
1130 system_prompt: &str,
1131 user_prompt: &str,
1132 tools: Vec<crate::provider::ToolDefinition>,
1133 registry: Arc<ToolRegistry>,
1134 max_steps: usize,
1135 timeout_secs: u64,
1136 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1137 subtask_id: String,
1138) -> Result<(String, usize, usize, AgentLoopExit)> {
1139 let temperature = 0.7;
1141
1142 tracing::info!(
1143 model = %model,
1144 max_steps = max_steps,
1145 timeout_secs = timeout_secs,
1146 "Sub-agent starting agentic loop"
1147 );
1148 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1149 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1150
1151 let mut messages = vec![
1153 Message {
1154 role: Role::System,
1155 content: vec![ContentPart::Text {
1156 text: system_prompt.to_string(),
1157 }],
1158 },
1159 Message {
1160 role: Role::User,
1161 content: vec![ContentPart::Text {
1162 text: user_prompt.to_string(),
1163 }],
1164 },
1165 ];
1166
1167 let mut steps = 0;
1168 let mut total_tool_calls = 0;
1169 let mut final_output = String::new();
1170
1171 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1172
1173 let exit_reason = loop {
1174 if steps >= max_steps {
1175 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1176 break AgentLoopExit::MaxStepsReached;
1177 }
1178
1179 if Instant::now() > deadline {
1180 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1181 break AgentLoopExit::TimedOut;
1182 }
1183
1184 steps += 1;
1185 tracing::info!(step = steps, "Sub-agent step starting");
1186
1187 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1189
1190 let request = CompletionRequest {
1191 messages: messages.clone(),
1192 tools: tools.clone(),
1193 model: model.to_string(),
1194 temperature: Some(temperature),
1195 top_p: None,
1196 max_tokens: Some(8192),
1197 stop: Vec::new(),
1198 };
1199
1200 let step_start = Instant::now();
1201 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1202 let step_duration = step_start.elapsed();
1203
1204 tracing::info!(
1205 step = steps,
1206 duration_ms = step_duration.as_millis() as u64,
1207 finish_reason = ?response.finish_reason,
1208 prompt_tokens = response.usage.prompt_tokens,
1209 completion_tokens = response.usage.completion_tokens,
1210 "Sub-agent step completed LLM call"
1211 );
1212
1213 let mut text_parts = Vec::new();
1215 let mut tool_calls = Vec::new();
1216
1217 for part in &response.message.content {
1218 match part {
1219 ContentPart::Text { text } => {
1220 text_parts.push(text.clone());
1221 }
1222 ContentPart::ToolCall {
1223 id,
1224 name,
1225 arguments,
1226 } => {
1227 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1228 }
1229 _ => {}
1230 }
1231 }
1232
1233 if !text_parts.is_empty() {
1235 final_output = text_parts.join("\n");
1236 tracing::info!(
1237 step = steps,
1238 output_len = final_output.len(),
1239 "Sub-agent text output"
1240 );
1241 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1242
1243 if let Some(ref tx) = event_tx {
1245 let preview = if final_output.len() > 500 {
1246 let mut end = 500;
1247 while end > 0 && !final_output.is_char_boundary(end) {
1248 end -= 1;
1249 }
1250 format!("{}...", &final_output[..end])
1251 } else {
1252 final_output.clone()
1253 };
1254 let _ = tx.try_send(SwarmEvent::AgentMessage {
1255 subtask_id: subtask_id.clone(),
1256 entry: AgentMessageEntry {
1257 role: "assistant".to_string(),
1258 content: preview,
1259 is_tool_call: false,
1260 },
1261 });
1262 }
1263 }
1264
1265 if !tool_calls.is_empty() {
1267 tracing::info!(
1268 step = steps,
1269 num_tool_calls = tool_calls.len(),
1270 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1271 "Sub-agent requesting tool calls"
1272 );
1273 }
1274
1275 messages.push(response.message.clone());
1277
1278 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1280 tracing::info!(
1281 steps = steps,
1282 total_tool_calls = total_tool_calls,
1283 "Sub-agent finished"
1284 );
1285 break AgentLoopExit::Completed;
1286 }
1287
1288 let mut tool_results = Vec::new();
1290
1291 for (call_id, tool_name, arguments) in tool_calls {
1292 total_tool_calls += 1;
1293
1294 if let Some(ref tx) = event_tx {
1296 let _ = tx.try_send(SwarmEvent::AgentToolCall {
1297 subtask_id: subtask_id.clone(),
1298 tool_name: tool_name.clone(),
1299 });
1300 }
1301
1302 tracing::info!(
1303 step = steps,
1304 tool_call_id = %call_id,
1305 tool = %tool_name,
1306 "Executing tool"
1307 );
1308 tracing::debug!(
1309 tool = %tool_name,
1310 arguments = %arguments,
1311 "Tool call arguments"
1312 );
1313
1314 let tool_start = Instant::now();
1315 let mut tool_success = true;
1316 let result = if let Some(tool) = registry.get(&tool_name) {
1317 let args: serde_json::Value =
1319 serde_json::from_str(&arguments).unwrap_or_else(|e| {
1320 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1321 serde_json::json!({})
1322 });
1323
1324 match tool.execute(args).await {
1325 Ok(r) => {
1326 if r.success {
1327 tracing::info!(
1328 tool = %tool_name,
1329 duration_ms = tool_start.elapsed().as_millis() as u64,
1330 success = true,
1331 "Tool execution completed"
1332 );
1333 r.output
1334 } else {
1335 tool_success = false;
1336 tracing::warn!(
1337 tool = %tool_name,
1338 error = %r.output,
1339 "Tool returned error"
1340 );
1341 format!("Tool error: {}", r.output)
1342 }
1343 }
1344 Err(e) => {
1345 tool_success = false;
1346 tracing::error!(
1347 tool = %tool_name,
1348 error = %e,
1349 "Tool execution failed"
1350 );
1351 format!("Tool execution failed: {}", e)
1352 }
1353 }
1354 } else {
1355 tool_success = false;
1356 tracing::error!(tool = %tool_name, "Unknown tool requested");
1357 format!("Unknown tool: {}", tool_name)
1358 };
1359
1360 if let Some(ref tx) = event_tx {
1362 let input_preview = if arguments.len() > 200 {
1363 let mut end = 200;
1364 while end > 0 && !arguments.is_char_boundary(end) {
1365 end -= 1;
1366 }
1367 format!("{}...", &arguments[..end])
1368 } else {
1369 arguments.clone()
1370 };
1371 let output_preview = if result.len() > 500 {
1372 let mut end = 500;
1373 while end > 0 && !result.is_char_boundary(end) {
1374 end -= 1;
1375 }
1376 format!("{}...", &result[..end])
1377 } else {
1378 result.clone()
1379 };
1380 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1381 subtask_id: subtask_id.clone(),
1382 detail: AgentToolCallDetail {
1383 tool_name: tool_name.clone(),
1384 input_preview,
1385 output_preview,
1386 success: tool_success,
1387 },
1388 });
1389 }
1390
1391 tracing::debug!(
1392 tool = %tool_name,
1393 result_len = result.len(),
1394 "Tool result"
1395 );
1396
1397 let result = if result.len() > RLM_THRESHOLD_CHARS {
1399 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1401 .await
1402 } else {
1403 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1405 };
1406
1407 tool_results.push((call_id, tool_name, result));
1408 }
1409
1410 for (call_id, _tool_name, result) in tool_results {
1412 messages.push(Message {
1413 role: Role::Tool,
1414 content: vec![ContentPart::ToolResult {
1415 tool_call_id: call_id,
1416 content: result,
1417 }],
1418 });
1419 }
1420
1421 deadline = Instant::now() + Duration::from_secs(timeout_secs);
1423 };
1424
1425 Ok((final_output, steps, total_tool_calls, exit_reason))
1426}