Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8    orchestrator::Orchestrator,
9    subtask::{SubTask, SubTaskResult, SubTaskStatus},
10};
11use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
12
13// Re-export swarm types for convenience
14pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
15use crate::{
16    agent::Agent,
17    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
18    rlm::RlmExecutor,
19    swarm::{SwarmArtifact, SwarmStats},
20    tool::ToolRegistry,
21    worktree::{WorktreeInfo, WorktreeManager},
22};
23use anyhow::Result;
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::Instant;
27use tokio::sync::{RwLock, mpsc};
28use tokio::time::{Duration, timeout};
29
30/// Default context limit (256k tokens - conservative for most models)
31const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
32
33/// Reserve tokens for the response generation
34const RESPONSE_RESERVE_TOKENS: usize = 8_192;
35
36/// Safety margin before we start truncating (90% of limit)
37const TRUNCATION_THRESHOLD: f64 = 0.85;
38
39/// Estimate token count from text (rough heuristic: ~4 chars per token)
40fn estimate_tokens(text: &str) -> usize {
41    // This is a rough estimate - actual tokenization varies by model
42    // Most tokenizers average 3-5 chars per token for English text
43    // We use 3.5 to be conservative
44    (text.len() as f64 / 3.5).ceil() as usize
45}
46
47/// Estimate total tokens in a message
48fn estimate_message_tokens(message: &Message) -> usize {
49    let mut tokens = 4; // Role overhead
50
51    for part in &message.content {
52        tokens += match part {
53            ContentPart::Text { text } => estimate_tokens(text),
54            ContentPart::ToolCall {
55                id,
56                name,
57                arguments,
58            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
59            ContentPart::ToolResult {
60                tool_call_id,
61                content,
62            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
63            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
64        };
65    }
66
67    tokens
68}
69
70/// Estimate total tokens in all messages
71fn estimate_total_tokens(messages: &[Message]) -> usize {
72    messages.iter().map(estimate_message_tokens).sum()
73}
74
75/// Truncate messages to fit within context limit
76///
77/// Strategy:
78/// 1. First, aggressively truncate large tool results
79/// 2. Keep system message (first) and user message (second) always
80/// 3. Keep the most recent assistant + tool result pairs together
81/// 4. Drop oldest middle messages in matched pairs
82fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
83    let target_tokens =
84        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
85
86    let current_tokens = estimate_total_tokens(messages);
87    if current_tokens <= target_tokens {
88        return;
89    }
90
91    tracing::warn!(
92        current_tokens = current_tokens,
93        target_tokens = target_tokens,
94        context_limit = context_limit,
95        "Context approaching limit, truncating conversation history"
96    );
97
98    // FIRST: Aggressively truncate large tool results (this is the main culprit)
99    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
100
101    let after_tool_truncation = estimate_total_tokens(messages);
102    if after_tool_truncation <= target_tokens {
103        tracing::info!(
104            old_tokens = current_tokens,
105            new_tokens = after_tool_truncation,
106            "Truncated large tool results, context now within limits"
107        );
108        return;
109    }
110
111    // Minimum: keep first 2 (system + initial user) and last 4 messages
112    if messages.len() <= 6 {
113        tracing::warn!(
114            tokens = after_tool_truncation,
115            target = target_tokens,
116            "Cannot truncate further - conversation too short"
117        );
118        return;
119    }
120
121    // Remove messages from the middle, keeping first 2 and last 4
122    // But we need to remove in pairs to maintain tool_call_id references
123    let keep_start = 2;
124    let keep_end = 4;
125    let removable_count = messages.len() - keep_start - keep_end;
126
127    if removable_count == 0 {
128        return;
129    }
130
131    // Remove all middle messages
132    let removed_messages: Vec<_> = messages
133        .drain(keep_start..keep_start + removable_count)
134        .collect();
135    let summary = summarize_removed_messages(&removed_messages);
136
137    // Insert a summary message where we removed content
138    messages.insert(
139        keep_start,
140        Message {
141            role: Role::User,
142            content: vec![ContentPart::Text {
143                text: format!(
144                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
145                    removed_messages.len(),
146                    summary
147                ),
148            }],
149        },
150    );
151
152    let new_tokens = estimate_total_tokens(messages);
153    tracing::info!(
154        removed_messages = removed_messages.len(),
155        old_tokens = current_tokens,
156        new_tokens = new_tokens,
157        "Truncated conversation history"
158    );
159}
160
161/// Summarize removed messages for context preservation
162fn summarize_removed_messages(messages: &[Message]) -> String {
163    let mut summary = String::new();
164    let mut tool_calls: Vec<String> = Vec::new();
165
166    for msg in messages {
167        for part in &msg.content {
168            if let ContentPart::ToolCall { name, .. } = part {
169                if !tool_calls.contains(name) {
170                    tool_calls.push(name.clone());
171                }
172            }
173        }
174    }
175
176    if !tool_calls.is_empty() {
177        summary.push_str(&format!(
178            "Tools used in truncated history: {}",
179            tool_calls.join(", ")
180        ));
181    }
182
183    summary
184}
185
186/// Truncate large tool results that exceed a reasonable size
187fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
188    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
189    let mut truncated_count = 0;
190    let mut saved_tokens = 0usize;
191
192    for message in messages.iter_mut() {
193        for part in message.content.iter_mut() {
194            if let ContentPart::ToolResult { content, .. } = part {
195                let tokens = estimate_tokens(content);
196                if tokens > max_tokens_per_result {
197                    let old_len = content.len();
198                    *content = truncate_single_result(content, char_limit);
199                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
200                    if content.len() < old_len {
201                        truncated_count += 1;
202                    }
203                }
204            }
205        }
206    }
207
208    if truncated_count > 0 {
209        tracing::info!(
210            truncated_count = truncated_count,
211            saved_tokens = saved_tokens,
212            max_tokens_per_result = max_tokens_per_result,
213            "Truncated large tool results"
214        );
215    }
216}
217
218/// Truncate a single result string to a maximum character limit
219fn truncate_single_result(content: &str, max_chars: usize) -> String {
220    if content.len() <= max_chars {
221        return content.to_string();
222    }
223
224    // Find a valid char boundary at or before max_chars
225    let safe_limit = {
226        let mut limit = max_chars.min(content.len());
227        while limit > 0 && !content.is_char_boundary(limit) {
228            limit -= 1;
229        }
230        limit
231    };
232
233    // Try to find a good break point (newline) near the limit
234    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
235
236    let truncated = format!(
237        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
238        &content[..break_point],
239        content.len(),
240        break_point
241    );
242
243    tracing::debug!(
244        original_len = content.len(),
245        truncated_len = truncated.len(),
246        "Truncated large result"
247    );
248
249    truncated
250}
251
252/// Threshold for when to use RLM vs truncation (in characters)
253const RLM_THRESHOLD_CHARS: usize = 50_000;
254
255/// Max chars for simple truncation (below RLM threshold)
256const SIMPLE_TRUNCATE_CHARS: usize = 6000;
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum AgentLoopExit {
260    Completed,
261    MaxStepsReached,
262    TimedOut,
263}
264
265/// Process a large tool result using RLM to intelligently summarize it
266async fn process_large_result_with_rlm(
267    content: &str,
268    tool_name: &str,
269    provider: Arc<dyn Provider>,
270    model: &str,
271) -> String {
272    if content.len() <= SIMPLE_TRUNCATE_CHARS {
273        return content.to_string();
274    }
275
276    // For medium-sized content, just truncate
277    if content.len() <= RLM_THRESHOLD_CHARS {
278        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
279    }
280
281    // For very large content, use RLM to intelligently summarize
282    tracing::info!(
283        tool = %tool_name,
284        content_len = content.len(),
285        "Using RLM to process large tool result"
286    );
287
288    let query = format!(
289        "Summarize the key information from this {} output. \
290         Focus on: errors, warnings, important findings, and actionable items. \
291         Be concise but thorough.",
292        tool_name
293    );
294
295    let mut executor =
296        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
297
298    match executor.analyze(&query).await {
299        Ok(result) => {
300            tracing::info!(
301                tool = %tool_name,
302                original_len = content.len(),
303                summary_len = result.answer.len(),
304                iterations = result.iterations,
305                "RLM summarized large result"
306            );
307
308            format!(
309                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
310                tool_name,
311                content.len(),
312                result.answer.len(),
313                result.answer
314            )
315        }
316        Err(e) => {
317            tracing::warn!(
318                tool = %tool_name,
319                error = %e,
320                "RLM analysis failed, falling back to truncation"
321            );
322            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
323        }
324    }
325}
326
327/// The swarm executor runs subtasks in parallel
328pub struct SwarmExecutor {
329    config: SwarmConfig,
330    /// Optional agent for handling swarm-level coordination (reserved for future use)
331    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
332    /// Optional event channel for TUI real-time updates
333    event_tx: Option<mpsc::Sender<SwarmEvent>>,
334}
335
336impl SwarmExecutor {
337    /// Create a new executor
338    pub fn new(config: SwarmConfig) -> Self {
339        Self {
340            config,
341            coordinator_agent: None,
342            event_tx: None,
343        }
344    }
345
346    /// Set an event channel for real-time TUI updates
347    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
348        self.event_tx = Some(tx);
349        self
350    }
351
352    /// Set a coordinator agent for swarm-level coordination
353    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
354        tracing::debug!("Setting coordinator agent for swarm execution");
355        self.coordinator_agent = Some(agent);
356        self
357    }
358
359    /// Get the coordinator agent if set
360    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
361        self.coordinator_agent.as_ref()
362    }
363
364    /// Send an event to the TUI if channel is connected (non-blocking)
365    fn try_send_event(&self, event: SwarmEvent) {
366        if let Some(ref tx) = self.event_tx {
367            let _ = tx.try_send(event);
368        }
369    }
370
371    /// Execute a task using the swarm
372    pub async fn execute(
373        &self,
374        task: &str,
375        strategy: DecompositionStrategy,
376    ) -> Result<SwarmResult> {
377        let start_time = Instant::now();
378
379        // Create orchestrator
380        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
381
382        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
383
384        // Decompose the task
385        let subtasks = orchestrator.decompose(task, strategy).await?;
386
387        if subtasks.is_empty() {
388            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
389            return Ok(SwarmResult {
390                success: false,
391                result: String::new(),
392                subtask_results: Vec::new(),
393                stats: SwarmStats::default(),
394                artifacts: Vec::new(),
395                error: Some("No subtasks generated".to_string()),
396            });
397        }
398
399        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
400
401        self.try_send_event(SwarmEvent::Started {
402            task: task.to_string(),
403            total_subtasks: subtasks.len(),
404        });
405
406        // Emit decomposition event for TUI
407        self.try_send_event(SwarmEvent::Decomposed {
408            subtasks: subtasks
409                .iter()
410                .map(|s| SubTaskInfo {
411                    id: s.id.clone(),
412                    name: s.name.clone(),
413                    status: SubTaskStatus::Pending,
414                    stage: s.stage,
415                    dependencies: s.dependencies.clone(),
416                    agent_name: s.specialty.clone(),
417                    current_tool: None,
418                    steps: 0,
419                    max_steps: self.config.max_steps_per_subagent,
420                    tool_call_history: Vec::new(),
421                    messages: Vec::new(),
422                    output: None,
423                    error: None,
424                })
425                .collect(),
426        });
427
428        // Execute stages in order
429        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
430        let mut all_results: Vec<SubTaskResult> = Vec::new();
431        let artifacts: Vec<SwarmArtifact> = Vec::new();
432
433        // Shared state for completed results
434        let completed_results: Arc<RwLock<HashMap<String, String>>> =
435            Arc::new(RwLock::new(HashMap::new()));
436
437        for stage in 0..=max_stage {
438            let stage_start = Instant::now();
439
440            let stage_subtasks: Vec<SubTask> = orchestrator
441                .subtasks_for_stage(stage)
442                .into_iter()
443                .cloned()
444                .collect();
445
446            tracing::debug!(
447                "Stage {} has {} subtasks (max_stage={})",
448                stage,
449                stage_subtasks.len(),
450                max_stage
451            );
452
453            if stage_subtasks.is_empty() {
454                continue;
455            }
456
457            tracing::info!(
458                provider_name = %orchestrator.provider(),
459                "Executing stage {} with {} subtasks",
460                stage,
461                stage_subtasks.len()
462            );
463
464            // Execute all subtasks in this stage in parallel
465            let stage_results = self
466                .execute_stage(&orchestrator, stage_subtasks, completed_results.clone())
467                .await?;
468
469            // Update completed results for next stage
470            {
471                let mut completed = completed_results.write().await;
472                for result in &stage_results {
473                    completed.insert(result.subtask_id.clone(), result.result.clone());
474                }
475            }
476
477            // Update orchestrator stats
478            let stage_time = stage_start.elapsed().as_millis() as u64;
479            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
480            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
481
482            orchestrator.stats_mut().stages.push(StageStats {
483                stage,
484                subagent_count: stage_results.len(),
485                max_steps,
486                total_steps,
487                execution_time_ms: stage_time,
488            });
489
490            // Mark subtasks as completed
491            for result in &stage_results {
492                orchestrator.complete_subtask(&result.subtask_id, result.clone());
493            }
494
495            // Emit stage complete event
496            let stage_completed = stage_results.iter().filter(|r| r.success).count();
497            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
498            self.try_send_event(SwarmEvent::StageComplete {
499                stage,
500                completed: stage_completed,
501                failed: stage_failed,
502            });
503
504            all_results.extend(stage_results);
505        }
506
507        // Get provider name before mutable borrow
508        let provider_name = orchestrator.provider().to_string();
509
510        // Calculate final stats
511        let stats = orchestrator.stats_mut();
512        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
513        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
514        stats.calculate_critical_path();
515        stats.calculate_speedup();
516
517        // Aggregate results
518        let success = all_results.iter().all(|r| r.success);
519        let result = self.aggregate_results(&all_results).await?;
520
521        tracing::info!(
522            provider_name = %provider_name,
523            "Swarm execution complete: {} subtasks, {:.1}x speedup",
524            all_results.len(),
525            stats.speedup_factor
526        );
527
528        let final_stats = orchestrator.stats().clone();
529        self.try_send_event(SwarmEvent::Complete {
530            success,
531            stats: final_stats.clone(),
532        });
533
534        Ok(SwarmResult {
535            success,
536            result,
537            subtask_results: all_results,
538            stats: final_stats,
539            artifacts,
540            error: None,
541        })
542    }
543
544    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
545    async fn execute_stage(
546        &self,
547        orchestrator: &Orchestrator,
548        subtasks: Vec<SubTask>,
549        completed_results: Arc<RwLock<HashMap<String, String>>>,
550    ) -> Result<Vec<SubTaskResult>> {
551        let mut handles: Vec<(
552            String,
553            tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
554        )> = Vec::new();
555
556        // Rate limiting: semaphore for max concurrent requests
557        let semaphore = Arc::new(tokio::sync::Semaphore::new(
558            self.config.max_concurrent_requests,
559        ));
560        let delay_ms = self.config.request_delay_ms;
561
562        // Get provider info for tool registry
563        let model = orchestrator.model().to_string();
564        let provider_name = orchestrator.provider().to_string();
565        let providers = orchestrator.providers();
566        let provider = providers
567            .get(&provider_name)
568            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
569
570        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
571
572        // Create shared tool registry with provider for ralph and batch tool
573        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
574        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
575        let tool_definitions: Vec<_> = tool_registry
576            .definitions()
577            .into_iter()
578            .filter(|t| t.name != "question")
579            .collect();
580
581        // Create worktree manager if enabled
582        let worktree_manager = if self.config.worktree_enabled {
583            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
584                std::env::current_dir()
585                    .map(|p| p.to_string_lossy().to_string())
586                    .unwrap_or_else(|_| ".".to_string())
587            });
588
589            match WorktreeManager::new(&working_dir) {
590                Ok(mgr) => {
591                    tracing::info!(
592                        working_dir = %working_dir,
593                        "Worktree isolation enabled for parallel sub-agents"
594                    );
595                    Some(Arc::new(mgr) as Arc<WorktreeManager>)
596                }
597                Err(e) => {
598                    tracing::warn!(
599                        error = %e,
600                        "Failed to create worktree manager, falling back to shared directory"
601                    );
602                    None
603                }
604            }
605        } else {
606            None
607        };
608
609        for (idx, subtask) in subtasks.into_iter().enumerate() {
610            let model = model.clone();
611            let _provider_name = provider_name.clone();
612            let provider = Arc::clone(&provider);
613
614            // Get context from dependencies
615            let context = {
616                let completed = completed_results.read().await;
617                let mut dep_context = String::new();
618                for dep_id in &subtask.dependencies {
619                    if let Some(result) = completed.get(dep_id) {
620                        dep_context.push_str(&format!(
621                            "\n--- Result from dependency {} ---\n{}\n",
622                            dep_id, result
623                        ));
624                    }
625                }
626                dep_context
627            };
628
629            let instruction = subtask.instruction.clone();
630            let subtask_name = subtask.name.clone();
631            let specialty = subtask.specialty.clone().unwrap_or_default();
632            let subtask_id = subtask.id.clone();
633            let subtask_id_for_handle = subtask_id.clone();
634            let max_steps = self.config.max_steps_per_subagent;
635            let timeout_secs = self.config.subagent_timeout_secs;
636
637            // Clone for the async block
638            let tools = tool_definitions.clone();
639            let registry = Arc::clone(&tool_registry);
640            let sem = Arc::clone(&semaphore);
641            let stagger_delay = delay_ms * idx as u64; // Stagger start times
642            let worktree_mgr = worktree_manager.clone();
643            let event_tx = self.event_tx.clone();
644
645            // Spawn the subtask execution with agentic tool loop
646            let handle = tokio::spawn(async move {
647                // Rate limiting: stagger start and acquire semaphore
648                if stagger_delay > 0 {
649                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
650                }
651                let _permit = sem
652                    .acquire()
653                    .await
654                    .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
655
656                let start = Instant::now();
657
658                // Create worktree for this sub-agent if enabled
659                let worktree_info = if let Some(ref mgr) = worktree_mgr {
660                    let task_slug = subtask_id.replace("-", "_");
661                    match mgr.create(&task_slug) {
662                        Ok(wt) => {
663                            tracing::info!(
664                                subtask_id = %subtask_id,
665                                worktree_path = %wt.path.display(),
666                                worktree_branch = %wt.branch,
667                                "Created worktree for sub-agent"
668                            );
669                            Some(wt)
670                        }
671                        Err(e) => {
672                            tracing::warn!(
673                                subtask_id = %subtask_id,
674                                error = %e,
675                                "Failed to create worktree, using shared directory"
676                            );
677                            None
678                        }
679                    }
680                } else {
681                    None
682                };
683
684                // Determine working directory
685                let working_dir = worktree_info
686                    .as_ref()
687                    .map(|wt| wt.path.display().to_string())
688                    .unwrap_or_else(|| ".".to_string());
689
690                // Load AGENTS.md from working directory
691                let working_path = std::path::Path::new(&working_dir);
692                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
693                    .map(|(content, _)| {
694                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
695                    })
696                    .unwrap_or_default();
697
698                // Build the system prompt for this sub-agent
699                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
700                let system_prompt = format!(
701                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
702
703WORKING DIRECTORY: {}
704All file operations should be relative to this directory.
705
706IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
707
708Available tools:
709- read: Read file contents
710- write: Write/create files  
711- edit: Edit existing files (search and replace)
712- multiedit: Make multiple edits at once
713- glob: Find files by pattern
714- grep: Search file contents
715- bash: Run shell commands (use cwd: \"{}\" parameter)
716- webfetch: Fetch web pages
717- prd: Generate structured PRD for complex tasks
718- ralph: Run autonomous agent loop on a PRD
719
720COMPLEX TASKS:
721If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
7221. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
7232. Break down into user stories with acceptance criteria
7243. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
7254. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
726
727NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
728
729When done, provide a brief summary of what you accomplished.{agents_md_content}",
730                    specialty,
731                    subtask_id,
732                    working_dir,
733                    working_dir,
734                    prd_filename,
735                    prd_filename,
736                    prd_filename
737                );
738
739                let user_prompt = if context.is_empty() {
740                    format!("Complete this task:\n\n{}", instruction)
741                } else {
742                    format!(
743                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
744                        instruction, context
745                    )
746                };
747
748                // Emit AgentStarted event
749                if let Some(ref tx) = event_tx {
750                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
751                        id: subtask_id.clone(),
752                        name: subtask_name.clone(),
753                        status: SubTaskStatus::Running,
754                        agent_name: Some(format!("agent-{}", subtask_id)),
755                    });
756                    let _ = tx.try_send(SwarmEvent::AgentStarted {
757                        subtask_id: subtask_id.clone(),
758                        agent_name: format!("agent-{}", subtask_id),
759                        specialty: specialty.clone(),
760                    });
761                }
762
763                // Run the agentic loop
764                let result = run_agent_loop(
765                    provider,
766                    &model,
767                    &system_prompt,
768                    &user_prompt,
769                    tools,
770                    registry,
771                    max_steps,
772                    timeout_secs,
773                    event_tx.clone(),
774                    subtask_id.clone(),
775                )
776                .await;
777
778                match result {
779                    Ok((output, steps, tool_calls, exit_reason)) => {
780                        let (success, status, error) = match exit_reason {
781                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
782                            AgentLoopExit::MaxStepsReached => (
783                                false,
784                                SubTaskStatus::Failed,
785                                Some(format!("Sub-agent hit max steps ({max_steps})")),
786                            ),
787                            AgentLoopExit::TimedOut => (
788                                false,
789                                SubTaskStatus::TimedOut,
790                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
791                            ),
792                        };
793
794                        // Emit completion events
795                        if let Some(ref tx) = event_tx {
796                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
797                                id: subtask_id.clone(),
798                                name: subtask_name.clone(),
799                                status,
800                                agent_name: Some(format!("agent-{}", subtask_id)),
801                            });
802                            if let Some(ref message) = error {
803                                let _ = tx.try_send(SwarmEvent::AgentError {
804                                    subtask_id: subtask_id.clone(),
805                                    error: message.clone(),
806                                });
807                            }
808                            let _ = tx.try_send(SwarmEvent::AgentOutput {
809                                subtask_id: subtask_id.clone(),
810                                output: output.clone(),
811                            });
812                            let _ = tx.try_send(SwarmEvent::AgentComplete {
813                                subtask_id: subtask_id.clone(),
814                                success,
815                                steps,
816                            });
817                        }
818                        Ok((
819                            SubTaskResult {
820                                subtask_id: subtask_id.clone(),
821                                subagent_id: format!("agent-{}", subtask_id),
822                                success,
823                                result: output,
824                                steps,
825                                tool_calls,
826                                execution_time_ms: start.elapsed().as_millis() as u64,
827                                error,
828                                artifacts: Vec::new(),
829                            },
830                            worktree_info,
831                        ))
832                    }
833                    Err(e) => {
834                        // Emit error events
835                        if let Some(ref tx) = event_tx {
836                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
837                                id: subtask_id.clone(),
838                                name: subtask_name.clone(),
839                                status: SubTaskStatus::Failed,
840                                agent_name: Some(format!("agent-{}", subtask_id)),
841                            });
842                            let _ = tx.try_send(SwarmEvent::AgentError {
843                                subtask_id: subtask_id.clone(),
844                                error: e.to_string(),
845                            });
846                            let _ = tx.try_send(SwarmEvent::AgentComplete {
847                                subtask_id: subtask_id.clone(),
848                                success: false,
849                                steps: 0,
850                            });
851                        }
852                        Ok((
853                            SubTaskResult {
854                                subtask_id: subtask_id.clone(),
855                                subagent_id: format!("agent-{}", subtask_id),
856                                success: false,
857                                result: String::new(),
858                                steps: 0,
859                                tool_calls: 0,
860                                execution_time_ms: start.elapsed().as_millis() as u64,
861                                error: Some(e.to_string()),
862                                artifacts: Vec::new(),
863                            },
864                            worktree_info,
865                        ))
866                    }
867                }
868            });
869
870            handles.push((subtask_id_for_handle, handle));
871        }
872
873        // Wait for all handles and handle worktree merging
874        let mut results = Vec::new();
875        let auto_merge = self.config.worktree_auto_merge;
876
877        for (subtask_id, handle) in handles {
878            match handle.await {
879                Ok(Ok((mut result, worktree_info))) => {
880                    // Handle worktree merge if applicable
881                    if let Some(wt) = worktree_info {
882                        if result.success && auto_merge {
883                            if let Some(ref mgr) = worktree_manager {
884                                match mgr.merge(&wt) {
885                                    Ok(merge_result) => {
886                                        if merge_result.success {
887                                            tracing::info!(
888                                                subtask_id = %result.subtask_id,
889                                                files_changed = merge_result.files_changed,
890                                                "Merged worktree changes successfully"
891                                            );
892                                            result.result.push_str(&format!(
893                                                "\n\n--- Merge Result ---\n{}",
894                                                merge_result.summary
895                                            ));
896                                        } else if merge_result.aborted {
897                                            // Merge was aborted due to non-conflict failure
898                                            tracing::warn!(
899                                                subtask_id = %result.subtask_id,
900                                                summary = %merge_result.summary,
901                                                "Merge was aborted"
902                                            );
903                                            result.result.push_str(&format!(
904                                                "\n\n--- Merge Aborted ---\n{}",
905                                                merge_result.summary
906                                            ));
907                                        } else {
908                                            tracing::warn!(
909                                                subtask_id = %result.subtask_id,
910                                                conflicts = ?merge_result.conflicts,
911                                                "Merge had conflicts"
912                                            );
913                                            result.result.push_str(&format!(
914                                                "\n\n--- Merge Conflicts ---\n{}",
915                                                merge_result.summary
916                                            ));
917                                        }
918
919                                        // Cleanup worktree after merge
920                                        if let Err(e) = mgr.cleanup(&wt) {
921                                            tracing::warn!(
922                                                error = %e,
923                                                "Failed to cleanup worktree"
924                                            );
925                                        }
926                                    }
927                                    Err(e) => {
928                                        tracing::error!(
929                                            subtask_id = %result.subtask_id,
930                                            error = %e,
931                                            "Failed to merge worktree"
932                                        );
933                                    }
934                                }
935                            }
936                        } else if !result.success {
937                            // Keep worktree for debugging on failure
938                            tracing::info!(
939                                subtask_id = %result.subtask_id,
940                                worktree_path = %wt.path.display(),
941                                "Keeping worktree for debugging (task failed)"
942                            );
943                        }
944                    }
945
946                    results.push(result);
947                }
948                Ok(Err(e)) => {
949                    tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
950                    if let Some(ref tx) = self.event_tx {
951                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
952                            id: subtask_id.clone(),
953                            name: subtask_id.clone(),
954                            status: SubTaskStatus::Failed,
955                            agent_name: Some(format!("agent-{}", subtask_id)),
956                        });
957                        let _ = tx.try_send(SwarmEvent::AgentError {
958                            subtask_id: subtask_id.clone(),
959                            error: e.to_string(),
960                        });
961                        let _ = tx.try_send(SwarmEvent::AgentComplete {
962                            subtask_id: subtask_id.clone(),
963                            success: false,
964                            steps: 0,
965                        });
966                    }
967                    results.push(SubTaskResult {
968                        subtask_id: subtask_id.clone(),
969                        subagent_id: format!("agent-{}", subtask_id),
970                        success: false,
971                        result: String::new(),
972                        steps: 0,
973                        tool_calls: 0,
974                        execution_time_ms: 0,
975                        error: Some(e.to_string()),
976                        artifacts: Vec::new(),
977                    });
978                }
979                Err(e) => {
980                    tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
981                    if let Some(ref tx) = self.event_tx {
982                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
983                            id: subtask_id.clone(),
984                            name: subtask_id.clone(),
985                            status: SubTaskStatus::Failed,
986                            agent_name: Some(format!("agent-{}", subtask_id)),
987                        });
988                        let _ = tx.try_send(SwarmEvent::AgentError {
989                            subtask_id: subtask_id.clone(),
990                            error: format!("Task join error: {}", e),
991                        });
992                        let _ = tx.try_send(SwarmEvent::AgentComplete {
993                            subtask_id: subtask_id.clone(),
994                            success: false,
995                            steps: 0,
996                        });
997                    }
998                    results.push(SubTaskResult {
999                        subtask_id: subtask_id.clone(),
1000                        subagent_id: format!("agent-{}", subtask_id),
1001                        success: false,
1002                        result: String::new(),
1003                        steps: 0,
1004                        tool_calls: 0,
1005                        execution_time_ms: 0,
1006                        error: Some(format!("Task join error: {}", e)),
1007                        artifacts: Vec::new(),
1008                    });
1009                }
1010            }
1011        }
1012
1013        Ok(results)
1014    }
1015
1016    /// Aggregate results from all subtasks into a final result
1017    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1018        let mut aggregated = String::new();
1019
1020        for (i, result) in results.iter().enumerate() {
1021            if result.success {
1022                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1023            } else {
1024                aggregated.push_str(&format!(
1025                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1026                    i + 1,
1027                    result.error.as_deref().unwrap_or("Unknown error")
1028                ));
1029            }
1030        }
1031
1032        Ok(aggregated)
1033    }
1034
1035    /// Execute a single task without decomposition (for simple cases)
1036    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1037        self.execute(task, DecompositionStrategy::None).await
1038    }
1039}
1040
1041/// Builder for swarm execution
1042pub struct SwarmExecutorBuilder {
1043    config: SwarmConfig,
1044}
1045
1046impl SwarmExecutorBuilder {
1047    pub fn new() -> Self {
1048        Self {
1049            config: SwarmConfig::default(),
1050        }
1051    }
1052
1053    pub fn max_subagents(mut self, max: usize) -> Self {
1054        self.config.max_subagents = max;
1055        self
1056    }
1057
1058    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1059        self.config.max_steps_per_subagent = max;
1060        self
1061    }
1062
1063    pub fn max_total_steps(mut self, max: usize) -> Self {
1064        self.config.max_total_steps = max;
1065        self
1066    }
1067
1068    pub fn timeout_secs(mut self, secs: u64) -> Self {
1069        self.config.subagent_timeout_secs = secs;
1070        self
1071    }
1072
1073    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1074        self.config.parallel_enabled = enabled;
1075        self
1076    }
1077
1078    pub fn build(self) -> SwarmExecutor {
1079        SwarmExecutor::new(self.config)
1080    }
1081}
1082
1083impl Default for SwarmExecutorBuilder {
1084    fn default() -> Self {
1085        Self::new()
1086    }
1087}
1088
1089/// Run the agentic loop for a sub-agent with tool execution
1090#[allow(clippy::too_many_arguments)]
1091/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
1092pub async fn run_agent_loop(
1093    provider: Arc<dyn Provider>,
1094    model: &str,
1095    system_prompt: &str,
1096    user_prompt: &str,
1097    tools: Vec<crate::provider::ToolDefinition>,
1098    registry: Arc<ToolRegistry>,
1099    max_steps: usize,
1100    timeout_secs: u64,
1101    event_tx: Option<mpsc::Sender<SwarmEvent>>,
1102    subtask_id: String,
1103) -> Result<(String, usize, usize, AgentLoopExit)> {
1104    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
1105    let temperature = 0.7;
1106
1107    tracing::info!(
1108        model = %model,
1109        max_steps = max_steps,
1110        timeout_secs = timeout_secs,
1111        "Sub-agent starting agentic loop"
1112    );
1113    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1114    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1115
1116    // Initialize conversation with system and user messages
1117    let mut messages = vec![
1118        Message {
1119            role: Role::System,
1120            content: vec![ContentPart::Text {
1121                text: system_prompt.to_string(),
1122            }],
1123        },
1124        Message {
1125            role: Role::User,
1126            content: vec![ContentPart::Text {
1127                text: user_prompt.to_string(),
1128            }],
1129        },
1130    ];
1131
1132    let mut steps = 0;
1133    let mut total_tool_calls = 0;
1134    let mut final_output = String::new();
1135
1136    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1137
1138    let exit_reason = loop {
1139        if steps >= max_steps {
1140            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1141            break AgentLoopExit::MaxStepsReached;
1142        }
1143
1144        if Instant::now() > deadline {
1145            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1146            break AgentLoopExit::TimedOut;
1147        }
1148
1149        steps += 1;
1150        tracing::info!(step = steps, "Sub-agent step starting");
1151
1152        // Check context size and truncate if approaching limit
1153        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1154
1155        let request = CompletionRequest {
1156            messages: messages.clone(),
1157            tools: tools.clone(),
1158            model: model.to_string(),
1159            temperature: Some(temperature),
1160            top_p: None,
1161            max_tokens: Some(8192),
1162            stop: Vec::new(),
1163        };
1164
1165        let step_start = Instant::now();
1166        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1167        let step_duration = step_start.elapsed();
1168
1169        tracing::info!(
1170            step = steps,
1171            duration_ms = step_duration.as_millis() as u64,
1172            finish_reason = ?response.finish_reason,
1173            prompt_tokens = response.usage.prompt_tokens,
1174            completion_tokens = response.usage.completion_tokens,
1175            "Sub-agent step completed LLM call"
1176        );
1177
1178        // Extract text from response
1179        let mut text_parts = Vec::new();
1180        let mut tool_calls = Vec::new();
1181
1182        for part in &response.message.content {
1183            match part {
1184                ContentPart::Text { text } => {
1185                    text_parts.push(text.clone());
1186                }
1187                ContentPart::ToolCall {
1188                    id,
1189                    name,
1190                    arguments,
1191                } => {
1192                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1193                }
1194                _ => {}
1195            }
1196        }
1197
1198        // Log assistant output
1199        if !text_parts.is_empty() {
1200            final_output = text_parts.join("\n");
1201            tracing::info!(
1202                step = steps,
1203                output_len = final_output.len(),
1204                "Sub-agent text output"
1205            );
1206            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1207
1208            // Emit assistant message event for TUI detail view
1209            if let Some(ref tx) = event_tx {
1210                let preview = if final_output.len() > 500 {
1211                    let mut end = 500;
1212                    while end > 0 && !final_output.is_char_boundary(end) {
1213                        end -= 1;
1214                    }
1215                    format!("{}...", &final_output[..end])
1216                } else {
1217                    final_output.clone()
1218                };
1219                let _ = tx.try_send(SwarmEvent::AgentMessage {
1220                    subtask_id: subtask_id.clone(),
1221                    entry: AgentMessageEntry {
1222                        role: "assistant".to_string(),
1223                        content: preview,
1224                        is_tool_call: false,
1225                    },
1226                });
1227            }
1228        }
1229
1230        // Log tool calls
1231        if !tool_calls.is_empty() {
1232            tracing::info!(
1233                step = steps,
1234                num_tool_calls = tool_calls.len(),
1235                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1236                "Sub-agent requesting tool calls"
1237            );
1238        }
1239
1240        // Add assistant message to history
1241        messages.push(response.message.clone());
1242
1243        // If no tool calls or stop, we're done
1244        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1245            tracing::info!(
1246                steps = steps,
1247                total_tool_calls = total_tool_calls,
1248                "Sub-agent finished"
1249            );
1250            break AgentLoopExit::Completed;
1251        }
1252
1253        // Execute tool calls
1254        let mut tool_results = Vec::new();
1255
1256        for (call_id, tool_name, arguments) in tool_calls {
1257            total_tool_calls += 1;
1258
1259            // Emit tool call event
1260            if let Some(ref tx) = event_tx {
1261                let _ = tx.try_send(SwarmEvent::AgentToolCall {
1262                    subtask_id: subtask_id.clone(),
1263                    tool_name: tool_name.clone(),
1264                });
1265            }
1266
1267            tracing::info!(
1268                step = steps,
1269                tool_call_id = %call_id,
1270                tool = %tool_name,
1271                "Executing tool"
1272            );
1273            tracing::debug!(
1274                tool = %tool_name,
1275                arguments = %arguments,
1276                "Tool call arguments"
1277            );
1278
1279            let tool_start = Instant::now();
1280            let mut tool_success = true;
1281            let result = if let Some(tool) = registry.get(&tool_name) {
1282                // Parse arguments as JSON
1283                let args: serde_json::Value =
1284                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
1285                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1286                        serde_json::json!({})
1287                    });
1288
1289                match tool.execute(args).await {
1290                    Ok(r) => {
1291                        if r.success {
1292                            tracing::info!(
1293                                tool = %tool_name,
1294                                duration_ms = tool_start.elapsed().as_millis() as u64,
1295                                success = true,
1296                                "Tool execution completed"
1297                            );
1298                            r.output
1299                        } else {
1300                            tool_success = false;
1301                            tracing::warn!(
1302                                tool = %tool_name,
1303                                error = %r.output,
1304                                "Tool returned error"
1305                            );
1306                            format!("Tool error: {}", r.output)
1307                        }
1308                    }
1309                    Err(e) => {
1310                        tool_success = false;
1311                        tracing::error!(
1312                            tool = %tool_name,
1313                            error = %e,
1314                            "Tool execution failed"
1315                        );
1316                        format!("Tool execution failed: {}", e)
1317                    }
1318                }
1319            } else {
1320                tool_success = false;
1321                tracing::error!(tool = %tool_name, "Unknown tool requested");
1322                format!("Unknown tool: {}", tool_name)
1323            };
1324
1325            // Emit detailed tool call event
1326            if let Some(ref tx) = event_tx {
1327                let input_preview = if arguments.len() > 200 {
1328                    let mut end = 200;
1329                    while end > 0 && !arguments.is_char_boundary(end) {
1330                        end -= 1;
1331                    }
1332                    format!("{}...", &arguments[..end])
1333                } else {
1334                    arguments.clone()
1335                };
1336                let output_preview = if result.len() > 500 {
1337                    let mut end = 500;
1338                    while end > 0 && !result.is_char_boundary(end) {
1339                        end -= 1;
1340                    }
1341                    format!("{}...", &result[..end])
1342                } else {
1343                    result.clone()
1344                };
1345                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1346                    subtask_id: subtask_id.clone(),
1347                    detail: AgentToolCallDetail {
1348                        tool_name: tool_name.clone(),
1349                        input_preview,
1350                        output_preview,
1351                        success: tool_success,
1352                    },
1353                });
1354            }
1355
1356            tracing::debug!(
1357                tool = %tool_name,
1358                result_len = result.len(),
1359                "Tool result"
1360            );
1361
1362            // Process large results with RLM or truncate smaller ones
1363            let result = if result.len() > RLM_THRESHOLD_CHARS {
1364                // Use RLM for very large results
1365                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1366                    .await
1367            } else {
1368                // Simple truncation for medium results
1369                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1370            };
1371
1372            tool_results.push((call_id, tool_name, result));
1373        }
1374
1375        // Add tool results to conversation
1376        for (call_id, _tool_name, result) in tool_results {
1377            messages.push(Message {
1378                role: Role::Tool,
1379                content: vec![ContentPart::ToolResult {
1380                    tool_call_id: call_id,
1381                    content: result,
1382                }],
1383            });
1384        }
1385
1386        // Reset deadline after each successful step — agent is making progress
1387        deadline = Instant::now() + Duration::from_secs(timeout_secs);
1388    };
1389
1390    Ok((final_output, steps, total_tool_calls, exit_reason))
1391}