1use super::{
7 DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8 orchestrator::Orchestrator,
9 subtask::{SubTask, SubTaskResult, SubTaskStatus},
10};
11use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
12
13pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
15use crate::{
16 agent::Agent,
17 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
18 rlm::RlmExecutor,
19 swarm::{SwarmArtifact, SwarmStats},
20 telemetry::SwarmTelemetryCollector,
21 tool::ToolRegistry,
22 worktree::{WorktreeInfo, WorktreeManager},
23};
24use anyhow::Result;
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28use tokio::sync::{RwLock, mpsc};
29use tokio::time::{Duration, timeout};
30
31const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
33
34const RESPONSE_RESERVE_TOKENS: usize = 8_192;
36
37const TRUNCATION_THRESHOLD: f64 = 0.85;
39
40fn estimate_tokens(text: &str) -> usize {
42 (text.len() as f64 / 3.5).ceil() as usize
46}
47
48fn estimate_message_tokens(message: &Message) -> usize {
50 let mut tokens = 4; for part in &message.content {
53 tokens += match part {
54 ContentPart::Text { text } => estimate_tokens(text),
55 ContentPart::ToolCall {
56 id,
57 name,
58 arguments,
59 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
60 ContentPart::ToolResult {
61 tool_call_id,
62 content,
63 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
64 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, };
66 }
67
68 tokens
69}
70
71fn estimate_total_tokens(messages: &[Message]) -> usize {
73 messages.iter().map(estimate_message_tokens).sum()
74}
75
76fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
84 let target_tokens =
85 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
86
87 let current_tokens = estimate_total_tokens(messages);
88 if current_tokens <= target_tokens {
89 return;
90 }
91
92 tracing::warn!(
93 current_tokens = current_tokens,
94 target_tokens = target_tokens,
95 context_limit = context_limit,
96 "Context approaching limit, truncating conversation history"
97 );
98
99 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
103 if after_tool_truncation <= target_tokens {
104 tracing::info!(
105 old_tokens = current_tokens,
106 new_tokens = after_tool_truncation,
107 "Truncated large tool results, context now within limits"
108 );
109 return;
110 }
111
112 if messages.len() <= 6 {
114 tracing::warn!(
115 tokens = after_tool_truncation,
116 target = target_tokens,
117 "Cannot truncate further - conversation too short"
118 );
119 return;
120 }
121
122 let keep_start = 2;
125 let keep_end = 4;
126 let removable_count = messages.len() - keep_start - keep_end;
127
128 if removable_count == 0 {
129 return;
130 }
131
132 let removed_messages: Vec<_> = messages
134 .drain(keep_start..keep_start + removable_count)
135 .collect();
136 let summary = summarize_removed_messages(&removed_messages);
137
138 messages.insert(
140 keep_start,
141 Message {
142 role: Role::User,
143 content: vec![ContentPart::Text {
144 text: format!(
145 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
146 removed_messages.len(),
147 summary
148 ),
149 }],
150 },
151 );
152
153 let new_tokens = estimate_total_tokens(messages);
154 tracing::info!(
155 removed_messages = removed_messages.len(),
156 old_tokens = current_tokens,
157 new_tokens = new_tokens,
158 "Truncated conversation history"
159 );
160}
161
162fn summarize_removed_messages(messages: &[Message]) -> String {
164 let mut summary = String::new();
165 let mut tool_calls: Vec<String> = Vec::new();
166
167 for msg in messages {
168 for part in &msg.content {
169 if let ContentPart::ToolCall { name, .. } = part {
170 if !tool_calls.contains(name) {
171 tool_calls.push(name.clone());
172 }
173 }
174 }
175 }
176
177 if !tool_calls.is_empty() {
178 summary.push_str(&format!(
179 "Tools used in truncated history: {}",
180 tool_calls.join(", ")
181 ));
182 }
183
184 summary
185}
186
187fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
189 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
191 let mut saved_tokens = 0usize;
192
193 for message in messages.iter_mut() {
194 for part in message.content.iter_mut() {
195 if let ContentPart::ToolResult { content, .. } = part {
196 let tokens = estimate_tokens(content);
197 if tokens > max_tokens_per_result {
198 let old_len = content.len();
199 *content = truncate_single_result(content, char_limit);
200 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
201 if content.len() < old_len {
202 truncated_count += 1;
203 }
204 }
205 }
206 }
207 }
208
209 if truncated_count > 0 {
210 tracing::info!(
211 truncated_count = truncated_count,
212 saved_tokens = saved_tokens,
213 max_tokens_per_result = max_tokens_per_result,
214 "Truncated large tool results"
215 );
216 }
217}
218
219fn truncate_single_result(content: &str, max_chars: usize) -> String {
221 if content.len() <= max_chars {
222 return content.to_string();
223 }
224
225 let safe_limit = {
227 let mut limit = max_chars.min(content.len());
228 while limit > 0 && !content.is_char_boundary(limit) {
229 limit -= 1;
230 }
231 limit
232 };
233
234 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
236
237 let truncated = format!(
238 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
239 &content[..break_point],
240 content.len(),
241 break_point
242 );
243
244 tracing::debug!(
245 original_len = content.len(),
246 truncated_len = truncated.len(),
247 "Truncated large result"
248 );
249
250 truncated
251}
252
253const RLM_THRESHOLD_CHARS: usize = 50_000;
255
256const SIMPLE_TRUNCATE_CHARS: usize = 6000;
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum AgentLoopExit {
261 Completed,
262 MaxStepsReached,
263 TimedOut,
264}
265
266async fn process_large_result_with_rlm(
268 content: &str,
269 tool_name: &str,
270 provider: Arc<dyn Provider>,
271 model: &str,
272) -> String {
273 if content.len() <= SIMPLE_TRUNCATE_CHARS {
274 return content.to_string();
275 }
276
277 if content.len() <= RLM_THRESHOLD_CHARS {
279 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
280 }
281
282 tracing::info!(
284 tool = %tool_name,
285 content_len = content.len(),
286 "Using RLM to process large tool result"
287 );
288
289 let query = format!(
290 "Summarize the key information from this {} output. \
291 Focus on: errors, warnings, important findings, and actionable items. \
292 Be concise but thorough.",
293 tool_name
294 );
295
296 let mut executor =
297 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
298
299 match executor.analyze(&query).await {
300 Ok(result) => {
301 tracing::info!(
302 tool = %tool_name,
303 original_len = content.len(),
304 summary_len = result.answer.len(),
305 iterations = result.iterations,
306 "RLM summarized large result"
307 );
308
309 format!(
310 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
311 tool_name,
312 content.len(),
313 result.answer.len(),
314 result.answer
315 )
316 }
317 Err(e) => {
318 tracing::warn!(
319 tool = %tool_name,
320 error = %e,
321 "RLM analysis failed, falling back to truncation"
322 );
323 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
324 }
325 }
326}
327
328pub struct SwarmExecutor {
330 config: SwarmConfig,
331 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
333 event_tx: Option<mpsc::Sender<SwarmEvent>>,
335 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
337}
338
339impl SwarmExecutor {
340 pub fn new(config: SwarmConfig) -> Self {
342 Self {
343 config,
344 coordinator_agent: None,
345 event_tx: None,
346 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
347 }
348 }
349
350 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
352 self.event_tx = Some(tx);
353 self
354 }
355
356 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
358 tracing::debug!("Setting coordinator agent for swarm execution");
359 self.coordinator_agent = Some(agent);
360 self
361 }
362
363 pub fn with_telemetry(
365 mut self,
366 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
367 ) -> Self {
368 self.telemetry = telemetry;
369 self
370 }
371
372 pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
374 Arc::clone(&self.telemetry)
375 }
376 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
378 self.coordinator_agent.as_ref()
379 }
380
381 fn try_send_event(&self, event: SwarmEvent) {
383 if let Some(ref tx) = self.event_tx {
384 let _ = tx.try_send(event);
385 }
386 }
387
388 pub async fn execute(
390 &self,
391 task: &str,
392 strategy: DecompositionStrategy,
393 ) -> Result<SwarmResult> {
394 let start_time = Instant::now();
395
396 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
398
399 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
400
401 let subtasks = orchestrator.decompose(task, strategy).await?;
403
404 if subtasks.is_empty() {
405 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
406 return Ok(SwarmResult {
407 success: false,
408 result: String::new(),
409 subtask_results: Vec::new(),
410 stats: SwarmStats::default(),
411 artifacts: Vec::new(),
412 error: Some("No subtasks generated".to_string()),
413 });
414 }
415
416 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
417
418 self.try_send_event(SwarmEvent::Started {
419 task: task.to_string(),
420 total_subtasks: subtasks.len(),
421 });
422
423 self.try_send_event(SwarmEvent::Decomposed {
425 subtasks: subtasks
426 .iter()
427 .map(|s| SubTaskInfo {
428 id: s.id.clone(),
429 name: s.name.clone(),
430 status: SubTaskStatus::Pending,
431 stage: s.stage,
432 dependencies: s.dependencies.clone(),
433 agent_name: s.specialty.clone(),
434 current_tool: None,
435 steps: 0,
436 max_steps: self.config.max_steps_per_subagent,
437 tool_call_history: Vec::new(),
438 messages: Vec::new(),
439 output: None,
440 error: None,
441 })
442 .collect(),
443 });
444
445 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
447 let mut all_results: Vec<SubTaskResult> = Vec::new();
448 let artifacts: Vec<SwarmArtifact> = Vec::new();
449
450 let swarm_id = uuid::Uuid::new_v4().to_string();
452 self.telemetry.lock().await.start_swarm(
453 swarm_id.clone(),
454 subtasks.len(),
455 &format!("{:?}", strategy),
456 );
457
458 let completed_results: Arc<RwLock<HashMap<String, String>>> =
460 Arc::new(RwLock::new(HashMap::new()));
461
462 for stage in 0..=max_stage {
463 let stage_start = Instant::now();
464
465 let stage_subtasks: Vec<SubTask> = orchestrator
466 .subtasks_for_stage(stage)
467 .into_iter()
468 .cloned()
469 .collect();
470
471 tracing::debug!(
472 "Stage {} has {} subtasks (max_stage={})",
473 stage,
474 stage_subtasks.len(),
475 max_stage
476 );
477
478 if stage_subtasks.is_empty() {
479 continue;
480 }
481
482 tracing::info!(
483 provider_name = %orchestrator.provider(),
484 "Executing stage {} with {} subtasks",
485 stage,
486 stage_subtasks.len()
487 );
488
489 let stage_results = self
491 .execute_stage(
492 &orchestrator,
493 stage_subtasks,
494 completed_results.clone(),
495 &swarm_id,
496 )
497 .await?;
498
499 {
501 let mut completed = completed_results.write().await;
502 for result in &stage_results {
503 completed.insert(result.subtask_id.clone(), result.result.clone());
504 }
505 }
506
507 let stage_time = stage_start.elapsed().as_millis() as u64;
509 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
510 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
511
512 orchestrator.stats_mut().stages.push(StageStats {
513 stage,
514 subagent_count: stage_results.len(),
515 max_steps,
516 total_steps,
517 execution_time_ms: stage_time,
518 });
519
520 for result in &stage_results {
522 orchestrator.complete_subtask(&result.subtask_id, result.clone());
523 }
524
525 let stage_completed = stage_results.iter().filter(|r| r.success).count();
527 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
528 self.try_send_event(SwarmEvent::StageComplete {
529 stage,
530 completed: stage_completed,
531 failed: stage_failed,
532 });
533
534 all_results.extend(stage_results);
535 }
536
537 let provider_name = orchestrator.provider().to_string();
539
540 self.telemetry
542 .lock()
543 .await
544 .record_swarm_latency("total_execution", start_time.elapsed());
545
546 let stats = orchestrator.stats_mut();
548 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
549 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
550 stats.calculate_critical_path();
551 stats.calculate_speedup();
552
553 let success = all_results.iter().all(|r| r.success);
555
556 let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
558 let result = self.aggregate_results(&all_results).await?;
559
560 tracing::info!(
561 provider_name = %provider_name,
562 "Swarm execution complete: {} subtasks, {:.1}x speedup",
563 all_results.len(),
564 stats.speedup_factor
565 );
566
567 let final_stats = orchestrator.stats().clone();
568 self.try_send_event(SwarmEvent::Complete {
569 success,
570 stats: final_stats.clone(),
571 });
572
573 Ok(SwarmResult {
574 success,
575 result,
576 subtask_results: all_results,
577 stats: final_stats,
578 artifacts,
579 error: None,
580 })
581 }
582
583 async fn execute_stage(
585 &self,
586 orchestrator: &Orchestrator,
587 subtasks: Vec<SubTask>,
588 completed_results: Arc<RwLock<HashMap<String, String>>>,
589 swarm_id: &str,
590 ) -> Result<Vec<SubTaskResult>> {
591 let mut handles: Vec<(
592 String,
593 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
594 )> = Vec::new();
595
596 let semaphore = Arc::new(tokio::sync::Semaphore::new(
598 self.config.max_concurrent_requests,
599 ));
600 let delay_ms = self.config.request_delay_ms;
601
602 let model = orchestrator.model().to_string();
604 let provider_name = orchestrator.provider().to_string();
605 let providers = orchestrator.providers();
606 let provider = providers
607 .get(&provider_name)
608 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
609
610 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
611
612 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
614 let tool_definitions: Vec<_> = tool_registry
616 .definitions()
617 .into_iter()
618 .filter(|t| t.name != "question")
619 .collect();
620
621 let worktree_manager = if self.config.worktree_enabled {
623 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
624 std::env::current_dir()
625 .map(|p| p.to_string_lossy().to_string())
626 .unwrap_or_else(|_| ".".to_string())
627 });
628
629 match WorktreeManager::new(&working_dir) {
630 Ok(mgr) => {
631 tracing::info!(
632 working_dir = %working_dir,
633 "Worktree isolation enabled for parallel sub-agents"
634 );
635 Some(Arc::new(mgr) as Arc<WorktreeManager>)
636 }
637 Err(e) => {
638 tracing::warn!(
639 error = %e,
640 "Failed to create worktree manager, falling back to shared directory"
641 );
642 None
643 }
644 }
645 } else {
646 None
647 };
648
649 for (idx, subtask) in subtasks.into_iter().enumerate() {
650 let model = model.clone();
651 let _provider_name = provider_name.clone();
652 let provider = Arc::clone(&provider);
653
654 let context = {
656 let completed = completed_results.read().await;
657 let mut dep_context = String::new();
658 for dep_id in &subtask.dependencies {
659 if let Some(result) = completed.get(dep_id) {
660 dep_context.push_str(&format!(
661 "\n--- Result from dependency {} ---\n{}\n",
662 dep_id, result
663 ));
664 }
665 }
666 dep_context
667 };
668
669 let instruction = subtask.instruction.clone();
670 let subtask_name = subtask.name.clone();
671 let specialty = subtask.specialty.clone().unwrap_or_default();
672 let subtask_id = subtask.id.clone();
673 let subtask_id_for_handle = subtask_id.clone();
674 let max_steps = self.config.max_steps_per_subagent;
675 let timeout_secs = self.config.subagent_timeout_secs;
676
677 let tools = tool_definitions.clone();
679 let registry = Arc::clone(&tool_registry);
680 let sem = Arc::clone(&semaphore);
681 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
683 let event_tx = self.event_tx.clone();
684
685 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
687
688 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
690
691 let handle = tokio::spawn(async move {
693 if stagger_delay > 0 {
695 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
696 }
697 let _permit = sem
698 .acquire()
699 .await
700 .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
701
702 let _agent_start = Instant::now();
703
704 let start = Instant::now();
705
706 let worktree_info = if let Some(ref mgr) = worktree_mgr {
708 let task_slug = subtask_id.replace("-", "_");
709 match mgr.create(&task_slug) {
710 Ok(wt) => {
711 tracing::info!(
712 subtask_id = %subtask_id,
713 worktree_path = %wt.path.display(),
714 worktree_branch = %wt.branch,
715 "Created worktree for sub-agent"
716 );
717 Some(wt)
718 }
719 Err(e) => {
720 tracing::warn!(
721 subtask_id = %subtask_id,
722 error = %e,
723 "Failed to create worktree, using shared directory"
724 );
725 None
726 }
727 }
728 } else {
729 None
730 };
731
732 let working_dir = worktree_info
734 .as_ref()
735 .map(|wt| wt.path.display().to_string())
736 .unwrap_or_else(|| ".".to_string());
737
738 let working_path = std::path::Path::new(&working_dir);
740 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
741 .map(|(content, _)| {
742 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
743 })
744 .unwrap_or_default();
745
746 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
748 let system_prompt = format!(
749 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
750
751WORKING DIRECTORY: {}
752All file operations should be relative to this directory.
753
754IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
755
756Available tools:
757- read: Read file contents
758- write: Write/create files
759- edit: Edit existing files (search and replace)
760- multiedit: Make multiple edits at once
761- glob: Find files by pattern
762- grep: Search file contents
763- bash: Run shell commands (use cwd: \"{}\" parameter)
764- webfetch: Fetch web pages
765- prd: Generate structured PRD for complex tasks
766- ralph: Run autonomous agent loop on a PRD
767
768COMPLEX TASKS:
769If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
7701. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
7712. Break down into user stories with acceptance criteria
7723. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
7734. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
774
775NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
776
777When done, provide a brief summary of what you accomplished.{agents_md_content}",
778 specialty,
779 subtask_id,
780 working_dir,
781 working_dir,
782 prd_filename,
783 prd_filename,
784 prd_filename
785 );
786
787 let user_prompt = if context.is_empty() {
788 format!("Complete this task:\n\n{}", instruction)
789 } else {
790 format!(
791 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
792 instruction, context
793 )
794 };
795
796 if let Some(ref tx) = event_tx {
798 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
799 id: subtask_id.clone(),
800 name: subtask_name.clone(),
801 status: SubTaskStatus::Running,
802 agent_name: Some(format!("agent-{}", subtask_id)),
803 });
804 let _ = tx.try_send(SwarmEvent::AgentStarted {
805 subtask_id: subtask_id.clone(),
806 agent_name: format!("agent-{}", subtask_id),
807 specialty: specialty.clone(),
808 });
809 }
810
811 let result = run_agent_loop(
813 provider,
814 &model,
815 &system_prompt,
816 &user_prompt,
817 tools,
818 registry,
819 max_steps,
820 timeout_secs,
821 event_tx.clone(),
822 subtask_id.clone(),
823 )
824 .await;
825
826 match result {
827 Ok((output, steps, tool_calls, exit_reason)) => {
828 let (success, status, error) = match exit_reason {
829 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
830 AgentLoopExit::MaxStepsReached => (
831 false,
832 SubTaskStatus::Failed,
833 Some(format!("Sub-agent hit max steps ({max_steps})")),
834 ),
835 AgentLoopExit::TimedOut => (
836 false,
837 SubTaskStatus::TimedOut,
838 Some(format!("Sub-agent timed out after {timeout_secs}s")),
839 ),
840 };
841
842 if let Some(ref tx) = event_tx {
844 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
845 id: subtask_id.clone(),
846 name: subtask_name.clone(),
847 status,
848 agent_name: Some(format!("agent-{}", subtask_id)),
849 });
850 if let Some(ref message) = error {
851 let _ = tx.try_send(SwarmEvent::AgentError {
852 subtask_id: subtask_id.clone(),
853 error: message.clone(),
854 });
855 }
856 let _ = tx.try_send(SwarmEvent::AgentOutput {
857 subtask_id: subtask_id.clone(),
858 output: output.clone(),
859 });
860 let _ = tx.try_send(SwarmEvent::AgentComplete {
861 subtask_id: subtask_id.clone(),
862 success,
863 steps,
864 });
865 }
866 Ok((
867 SubTaskResult {
868 subtask_id: subtask_id.clone(),
869 subagent_id: format!("agent-{}", subtask_id),
870 success,
871 result: output,
872 steps,
873 tool_calls,
874 execution_time_ms: start.elapsed().as_millis() as u64,
875 error,
876 artifacts: Vec::new(),
877 },
878 worktree_info,
879 ))
880 }
881 Err(e) => {
882 if let Some(ref tx) = event_tx {
884 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
885 id: subtask_id.clone(),
886 name: subtask_name.clone(),
887 status: SubTaskStatus::Failed,
888 agent_name: Some(format!("agent-{}", subtask_id)),
889 });
890 let _ = tx.try_send(SwarmEvent::AgentError {
891 subtask_id: subtask_id.clone(),
892 error: e.to_string(),
893 });
894 let _ = tx.try_send(SwarmEvent::AgentComplete {
895 subtask_id: subtask_id.clone(),
896 success: false,
897 steps: 0,
898 });
899 }
900 Ok((
901 SubTaskResult {
902 subtask_id: subtask_id.clone(),
903 subagent_id: format!("agent-{}", subtask_id),
904 success: false,
905 result: String::new(),
906 steps: 0,
907 tool_calls: 0,
908 execution_time_ms: start.elapsed().as_millis() as u64,
909 error: Some(e.to_string()),
910 artifacts: Vec::new(),
911 },
912 worktree_info,
913 ))
914 }
915 }
916 });
917
918 handles.push((subtask_id_for_handle, handle));
919 }
920
921 let mut results = Vec::new();
923 let auto_merge = self.config.worktree_auto_merge;
924
925 for (subtask_id, handle) in handles {
926 match handle.await {
927 Ok(Ok((mut result, worktree_info))) => {
928 if let Some(wt) = worktree_info {
930 if result.success && auto_merge {
931 if let Some(ref mgr) = worktree_manager {
932 match mgr.merge(&wt) {
933 Ok(merge_result) => {
934 if merge_result.success {
935 tracing::info!(
936 subtask_id = %result.subtask_id,
937 files_changed = merge_result.files_changed,
938 "Merged worktree changes successfully"
939 );
940 result.result.push_str(&format!(
941 "\n\n--- Merge Result ---\n{}",
942 merge_result.summary
943 ));
944 } else if merge_result.aborted {
945 tracing::warn!(
947 subtask_id = %result.subtask_id,
948 summary = %merge_result.summary,
949 "Merge was aborted"
950 );
951 result.result.push_str(&format!(
952 "\n\n--- Merge Aborted ---\n{}",
953 merge_result.summary
954 ));
955 } else {
956 tracing::warn!(
957 subtask_id = %result.subtask_id,
958 conflicts = ?merge_result.conflicts,
959 "Merge had conflicts"
960 );
961 result.result.push_str(&format!(
962 "\n\n--- Merge Conflicts ---\n{}",
963 merge_result.summary
964 ));
965 }
966
967 if let Err(e) = mgr.cleanup(&wt) {
969 tracing::warn!(
970 error = %e,
971 "Failed to cleanup worktree"
972 );
973 }
974 }
975 Err(e) => {
976 tracing::error!(
977 subtask_id = %result.subtask_id,
978 error = %e,
979 "Failed to merge worktree"
980 );
981 }
982 }
983 }
984 } else if !result.success {
985 tracing::info!(
987 subtask_id = %result.subtask_id,
988 worktree_path = %wt.path.display(),
989 "Keeping worktree for debugging (task failed)"
990 );
991 }
992 }
993
994 results.push(result);
995 }
996 Ok(Err(e)) => {
997 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
998 if let Some(ref tx) = self.event_tx {
999 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1000 id: subtask_id.clone(),
1001 name: subtask_id.clone(),
1002 status: SubTaskStatus::Failed,
1003 agent_name: Some(format!("agent-{}", subtask_id)),
1004 });
1005 let _ = tx.try_send(SwarmEvent::AgentError {
1006 subtask_id: subtask_id.clone(),
1007 error: e.to_string(),
1008 });
1009 let _ = tx.try_send(SwarmEvent::AgentComplete {
1010 subtask_id: subtask_id.clone(),
1011 success: false,
1012 steps: 0,
1013 });
1014 }
1015 results.push(SubTaskResult {
1016 subtask_id: subtask_id.clone(),
1017 subagent_id: format!("agent-{}", subtask_id),
1018 success: false,
1019 result: String::new(),
1020 steps: 0,
1021 tool_calls: 0,
1022 execution_time_ms: 0,
1023 error: Some(e.to_string()),
1024 artifacts: Vec::new(),
1025 });
1026 }
1027 Err(e) => {
1028 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1029 if let Some(ref tx) = self.event_tx {
1030 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1031 id: subtask_id.clone(),
1032 name: subtask_id.clone(),
1033 status: SubTaskStatus::Failed,
1034 agent_name: Some(format!("agent-{}", subtask_id)),
1035 });
1036 let _ = tx.try_send(SwarmEvent::AgentError {
1037 subtask_id: subtask_id.clone(),
1038 error: format!("Task join error: {}", e),
1039 });
1040 let _ = tx.try_send(SwarmEvent::AgentComplete {
1041 subtask_id: subtask_id.clone(),
1042 success: false,
1043 steps: 0,
1044 });
1045 }
1046 results.push(SubTaskResult {
1047 subtask_id: subtask_id.clone(),
1048 subagent_id: format!("agent-{}", subtask_id),
1049 success: false,
1050 result: String::new(),
1051 steps: 0,
1052 tool_calls: 0,
1053 execution_time_ms: 0,
1054 error: Some(format!("Task join error: {}", e)),
1055 artifacts: Vec::new(),
1056 });
1057 }
1058 }
1059 }
1060
1061 Ok(results)
1062 }
1063
1064 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1066 let mut aggregated = String::new();
1067
1068 for (i, result) in results.iter().enumerate() {
1069 if result.success {
1070 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1071 } else {
1072 aggregated.push_str(&format!(
1073 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1074 i + 1,
1075 result.error.as_deref().unwrap_or("Unknown error")
1076 ));
1077 }
1078 }
1079
1080 Ok(aggregated)
1081 }
1082
1083 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1085 self.execute(task, DecompositionStrategy::None).await
1086 }
1087}
1088
1089pub struct SwarmExecutorBuilder {
1091 config: SwarmConfig,
1092}
1093
1094impl SwarmExecutorBuilder {
1095 pub fn new() -> Self {
1096 Self {
1097 config: SwarmConfig::default(),
1098 }
1099 }
1100
1101 pub fn max_subagents(mut self, max: usize) -> Self {
1102 self.config.max_subagents = max;
1103 self
1104 }
1105
1106 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1107 self.config.max_steps_per_subagent = max;
1108 self
1109 }
1110
1111 pub fn max_total_steps(mut self, max: usize) -> Self {
1112 self.config.max_total_steps = max;
1113 self
1114 }
1115
1116 pub fn timeout_secs(mut self, secs: u64) -> Self {
1117 self.config.subagent_timeout_secs = secs;
1118 self
1119 }
1120
1121 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1122 self.config.parallel_enabled = enabled;
1123 self
1124 }
1125
1126 pub fn build(self) -> SwarmExecutor {
1127 SwarmExecutor::new(self.config)
1128 }
1129}
1130
1131impl Default for SwarmExecutorBuilder {
1132 fn default() -> Self {
1133 Self::new()
1134 }
1135}
1136
1137#[allow(clippy::too_many_arguments)]
1139pub async fn run_agent_loop(
1141 provider: Arc<dyn Provider>,
1142 model: &str,
1143 system_prompt: &str,
1144 user_prompt: &str,
1145 tools: Vec<crate::provider::ToolDefinition>,
1146 registry: Arc<ToolRegistry>,
1147 max_steps: usize,
1148 timeout_secs: u64,
1149 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1150 subtask_id: String,
1151) -> Result<(String, usize, usize, AgentLoopExit)> {
1152 let temperature = 0.7;
1154
1155 tracing::info!(
1156 model = %model,
1157 max_steps = max_steps,
1158 timeout_secs = timeout_secs,
1159 "Sub-agent starting agentic loop"
1160 );
1161 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1162 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1163
1164 let mut messages = vec![
1166 Message {
1167 role: Role::System,
1168 content: vec![ContentPart::Text {
1169 text: system_prompt.to_string(),
1170 }],
1171 },
1172 Message {
1173 role: Role::User,
1174 content: vec![ContentPart::Text {
1175 text: user_prompt.to_string(),
1176 }],
1177 },
1178 ];
1179
1180 let mut steps = 0;
1181 let mut total_tool_calls = 0;
1182 let mut final_output = String::new();
1183
1184 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1185
1186 let exit_reason = loop {
1187 if steps >= max_steps {
1188 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1189 break AgentLoopExit::MaxStepsReached;
1190 }
1191
1192 if Instant::now() > deadline {
1193 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1194 break AgentLoopExit::TimedOut;
1195 }
1196
1197 steps += 1;
1198 tracing::info!(step = steps, "Sub-agent step starting");
1199
1200 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1202
1203 let request = CompletionRequest {
1204 messages: messages.clone(),
1205 tools: tools.clone(),
1206 model: model.to_string(),
1207 temperature: Some(temperature),
1208 top_p: None,
1209 max_tokens: Some(8192),
1210 stop: Vec::new(),
1211 };
1212
1213 let step_start = Instant::now();
1214 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1215 let step_duration = step_start.elapsed();
1216
1217 tracing::info!(
1218 step = steps,
1219 duration_ms = step_duration.as_millis() as u64,
1220 finish_reason = ?response.finish_reason,
1221 prompt_tokens = response.usage.prompt_tokens,
1222 completion_tokens = response.usage.completion_tokens,
1223 "Sub-agent step completed LLM call"
1224 );
1225
1226 let mut text_parts = Vec::new();
1228 let mut tool_calls = Vec::new();
1229
1230 for part in &response.message.content {
1231 match part {
1232 ContentPart::Text { text } => {
1233 text_parts.push(text.clone());
1234 }
1235 ContentPart::ToolCall {
1236 id,
1237 name,
1238 arguments,
1239 } => {
1240 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1241 }
1242 _ => {}
1243 }
1244 }
1245
1246 if !text_parts.is_empty() {
1248 let step_output = text_parts.join("\n");
1249 if !final_output.is_empty() {
1250 final_output.push('\n');
1251 }
1252 final_output.push_str(&step_output);
1253 tracing::info!(
1254 step = steps,
1255 output_len = final_output.len(),
1256 "Sub-agent text output"
1257 );
1258 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1259
1260 if let Some(ref tx) = event_tx {
1262 let preview = if step_output.len() > 500 {
1263 let mut end = 500;
1264 while end > 0 && !step_output.is_char_boundary(end) {
1265 end -= 1;
1266 }
1267 format!("{}...", &step_output[..end])
1268 } else {
1269 step_output.clone()
1270 };
1271 let _ = tx.try_send(SwarmEvent::AgentMessage {
1272 subtask_id: subtask_id.clone(),
1273 entry: AgentMessageEntry {
1274 role: "assistant".to_string(),
1275 content: preview,
1276 is_tool_call: false,
1277 },
1278 });
1279 }
1280 }
1281
1282 if !tool_calls.is_empty() {
1284 tracing::info!(
1285 step = steps,
1286 num_tool_calls = tool_calls.len(),
1287 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1288 "Sub-agent requesting tool calls"
1289 );
1290 }
1291
1292 messages.push(response.message.clone());
1294
1295 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1297 tracing::info!(
1298 steps = steps,
1299 total_tool_calls = total_tool_calls,
1300 "Sub-agent finished"
1301 );
1302 break AgentLoopExit::Completed;
1303 }
1304
1305 let mut tool_results = Vec::new();
1307
1308 for (call_id, tool_name, arguments) in tool_calls {
1309 total_tool_calls += 1;
1310
1311 if let Some(ref tx) = event_tx {
1313 let _ = tx.try_send(SwarmEvent::AgentToolCall {
1314 subtask_id: subtask_id.clone(),
1315 tool_name: tool_name.clone(),
1316 });
1317 }
1318
1319 tracing::info!(
1320 step = steps,
1321 tool_call_id = %call_id,
1322 tool = %tool_name,
1323 "Executing tool"
1324 );
1325 tracing::debug!(
1326 tool = %tool_name,
1327 arguments = %arguments,
1328 "Tool call arguments"
1329 );
1330
1331 let tool_start = Instant::now();
1332 let mut tool_success = true;
1333 let result = if let Some(tool) = registry.get(&tool_name) {
1334 let args: serde_json::Value =
1336 serde_json::from_str(&arguments).unwrap_or_else(|e| {
1337 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1338 serde_json::json!({})
1339 });
1340
1341 match tool.execute(args).await {
1342 Ok(r) => {
1343 if r.success {
1344 tracing::info!(
1345 tool = %tool_name,
1346 duration_ms = tool_start.elapsed().as_millis() as u64,
1347 success = true,
1348 "Tool execution completed"
1349 );
1350 r.output
1351 } else {
1352 tool_success = false;
1353 tracing::warn!(
1354 tool = %tool_name,
1355 error = %r.output,
1356 "Tool returned error"
1357 );
1358 format!("Tool error: {}", r.output)
1359 }
1360 }
1361 Err(e) => {
1362 tool_success = false;
1363 tracing::error!(
1364 tool = %tool_name,
1365 error = %e,
1366 "Tool execution failed"
1367 );
1368 format!("Tool execution failed: {}", e)
1369 }
1370 }
1371 } else {
1372 tool_success = false;
1373 tracing::error!(tool = %tool_name, "Unknown tool requested");
1374 format!("Unknown tool: {}", tool_name)
1375 };
1376
1377 if let Some(ref tx) = event_tx {
1379 let input_preview = if arguments.len() > 200 {
1380 let mut end = 200;
1381 while end > 0 && !arguments.is_char_boundary(end) {
1382 end -= 1;
1383 }
1384 format!("{}...", &arguments[..end])
1385 } else {
1386 arguments.clone()
1387 };
1388 let output_preview = if result.len() > 500 {
1389 let mut end = 500;
1390 while end > 0 && !result.is_char_boundary(end) {
1391 end -= 1;
1392 }
1393 format!("{}...", &result[..end])
1394 } else {
1395 result.clone()
1396 };
1397 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1398 subtask_id: subtask_id.clone(),
1399 detail: AgentToolCallDetail {
1400 tool_name: tool_name.clone(),
1401 input_preview,
1402 output_preview,
1403 success: tool_success,
1404 },
1405 });
1406 }
1407
1408 tracing::debug!(
1409 tool = %tool_name,
1410 result_len = result.len(),
1411 "Tool result"
1412 );
1413
1414 let result = if result.len() > RLM_THRESHOLD_CHARS {
1416 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1418 .await
1419 } else {
1420 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1422 };
1423
1424 tool_results.push((call_id, tool_name, result));
1425 }
1426
1427 for (call_id, _tool_name, result) in tool_results {
1429 messages.push(Message {
1430 role: Role::Tool,
1431 content: vec![ContentPart::ToolResult {
1432 tool_call_id: call_id,
1433 content: result,
1434 }],
1435 });
1436 }
1437
1438 deadline = Instant::now() + Duration::from_secs(timeout_secs);
1440 };
1441
1442 Ok((final_output, steps, total_tool_calls, exit_reason))
1443}