Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8    orchestrator::Orchestrator,
9    subtask::{SubTask, SubTaskResult},
10};
11
12// Re-export swarm types for convenience
13pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
14use crate::{
15    agent::Agent,
16    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
17    rlm::RlmExecutor,
18    swarm::{SwarmArtifact, SwarmStats},
19    tool::ToolRegistry,
20    worktree::{WorktreeInfo, WorktreeManager},
21};
22use anyhow::Result;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Instant;
26use tokio::sync::RwLock;
27use tokio::time::{Duration, timeout};
28
29/// Default context limit (256k tokens - conservative for most models)
30const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
31
32/// Reserve tokens for the response generation
33const RESPONSE_RESERVE_TOKENS: usize = 8_192;
34
35/// Safety margin before we start truncating (90% of limit)
36const TRUNCATION_THRESHOLD: f64 = 0.85;
37
38/// Estimate token count from text (rough heuristic: ~4 chars per token)
39fn estimate_tokens(text: &str) -> usize {
40    // This is a rough estimate - actual tokenization varies by model
41    // Most tokenizers average 3-5 chars per token for English text
42    // We use 3.5 to be conservative
43    (text.len() as f64 / 3.5).ceil() as usize
44}
45
46/// Estimate total tokens in a message
47fn estimate_message_tokens(message: &Message) -> usize {
48    let mut tokens = 4; // Role overhead
49
50    for part in &message.content {
51        tokens += match part {
52            ContentPart::Text { text } => estimate_tokens(text),
53            ContentPart::ToolCall {
54                id,
55                name,
56                arguments,
57            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
58            ContentPart::ToolResult {
59                tool_call_id,
60                content,
61            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
62            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
63        };
64    }
65
66    tokens
67}
68
69/// Estimate total tokens in all messages
70fn estimate_total_tokens(messages: &[Message]) -> usize {
71    messages.iter().map(estimate_message_tokens).sum()
72}
73
74/// Truncate messages to fit within context limit
75///
76/// Strategy:
77/// 1. First, aggressively truncate large tool results
78/// 2. Keep system message (first) and user message (second) always
79/// 3. Keep the most recent assistant + tool result pairs together
80/// 4. Drop oldest middle messages in matched pairs
81fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
82    let target_tokens =
83        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
84
85    let current_tokens = estimate_total_tokens(messages);
86    if current_tokens <= target_tokens {
87        return;
88    }
89
90    tracing::warn!(
91        current_tokens = current_tokens,
92        target_tokens = target_tokens,
93        context_limit = context_limit,
94        "Context approaching limit, truncating conversation history"
95    );
96
97    // FIRST: Aggressively truncate large tool results (this is the main culprit)
98    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
99
100    let after_tool_truncation = estimate_total_tokens(messages);
101    if after_tool_truncation <= target_tokens {
102        tracing::info!(
103            old_tokens = current_tokens,
104            new_tokens = after_tool_truncation,
105            "Truncated large tool results, context now within limits"
106        );
107        return;
108    }
109
110    // Minimum: keep first 2 (system + initial user) and last 4 messages
111    if messages.len() <= 6 {
112        tracing::warn!(
113            tokens = after_tool_truncation,
114            target = target_tokens,
115            "Cannot truncate further - conversation too short"
116        );
117        return;
118    }
119
120    // Remove messages from the middle, keeping first 2 and last 4
121    // But we need to remove in pairs to maintain tool_call_id references
122    let keep_start = 2;
123    let keep_end = 4;
124    let removable_count = messages.len() - keep_start - keep_end;
125
126    if removable_count == 0 {
127        return;
128    }
129
130    // Remove all middle messages
131    let removed_messages: Vec<_> = messages
132        .drain(keep_start..keep_start + removable_count)
133        .collect();
134    let summary = summarize_removed_messages(&removed_messages);
135
136    // Insert a summary message where we removed content
137    messages.insert(
138        keep_start,
139        Message {
140            role: Role::User,
141            content: vec![ContentPart::Text {
142                text: format!(
143                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
144                    removed_messages.len(),
145                    summary
146                ),
147            }],
148        },
149    );
150
151    let new_tokens = estimate_total_tokens(messages);
152    tracing::info!(
153        removed_messages = removed_messages.len(),
154        old_tokens = current_tokens,
155        new_tokens = new_tokens,
156        "Truncated conversation history"
157    );
158}
159
160/// Summarize removed messages for context preservation
161fn summarize_removed_messages(messages: &[Message]) -> String {
162    let mut summary = String::new();
163    let mut tool_calls: Vec<String> = Vec::new();
164
165    for msg in messages {
166        for part in &msg.content {
167            if let ContentPart::ToolCall { name, .. } = part {
168                if !tool_calls.contains(name) {
169                    tool_calls.push(name.clone());
170                }
171            }
172        }
173    }
174
175    if !tool_calls.is_empty() {
176        summary.push_str(&format!(
177            "Tools used in truncated history: {}",
178            tool_calls.join(", ")
179        ));
180    }
181
182    summary
183}
184
185/// Truncate large tool results that exceed a reasonable size
186fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
187    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
188    let mut truncated_count = 0;
189    let mut saved_tokens = 0usize;
190
191    for message in messages.iter_mut() {
192        for part in message.content.iter_mut() {
193            if let ContentPart::ToolResult { content, .. } = part {
194                let tokens = estimate_tokens(content);
195                if tokens > max_tokens_per_result {
196                    let old_len = content.len();
197                    *content = truncate_single_result(content, char_limit);
198                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
199                    if content.len() < old_len {
200                        truncated_count += 1;
201                    }
202                }
203            }
204        }
205    }
206
207    if truncated_count > 0 {
208        tracing::info!(
209            truncated_count = truncated_count,
210            saved_tokens = saved_tokens,
211            max_tokens_per_result = max_tokens_per_result,
212            "Truncated large tool results"
213        );
214    }
215}
216
217/// Truncate a single result string to a maximum character limit
218fn truncate_single_result(content: &str, max_chars: usize) -> String {
219    if content.len() <= max_chars {
220        return content.to_string();
221    }
222
223    // Try to find a good break point (newline) near the limit
224    let break_point = content[..max_chars.min(content.len())]
225        .rfind('\n')
226        .unwrap_or(max_chars.min(content.len()));
227
228    let truncated = format!(
229        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
230        &content[..break_point],
231        content.len(),
232        break_point
233    );
234
235    tracing::debug!(
236        original_len = content.len(),
237        truncated_len = truncated.len(),
238        "Truncated large result"
239    );
240
241    truncated
242}
243
244/// Threshold for when to use RLM vs truncation (in characters)
245const RLM_THRESHOLD_CHARS: usize = 50_000;
246
247/// Max chars for simple truncation (below RLM threshold)
248const SIMPLE_TRUNCATE_CHARS: usize = 6000;
249
250/// Process a large tool result using RLM to intelligently summarize it
251async fn process_large_result_with_rlm(
252    content: &str,
253    tool_name: &str,
254    provider: Arc<dyn Provider>,
255    model: &str,
256) -> String {
257    if content.len() <= SIMPLE_TRUNCATE_CHARS {
258        return content.to_string();
259    }
260
261    // For medium-sized content, just truncate
262    if content.len() <= RLM_THRESHOLD_CHARS {
263        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
264    }
265
266    // For very large content, use RLM to intelligently summarize
267    tracing::info!(
268        tool = %tool_name,
269        content_len = content.len(),
270        "Using RLM to process large tool result"
271    );
272
273    let query = format!(
274        "Summarize the key information from this {} output. \
275         Focus on: errors, warnings, important findings, and actionable items. \
276         Be concise but thorough.",
277        tool_name
278    );
279
280    let mut executor =
281        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
282
283    match executor.analyze(&query).await {
284        Ok(result) => {
285            tracing::info!(
286                tool = %tool_name,
287                original_len = content.len(),
288                summary_len = result.answer.len(),
289                iterations = result.iterations,
290                "RLM summarized large result"
291            );
292
293            format!(
294                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
295                tool_name,
296                content.len(),
297                result.answer.len(),
298                result.answer
299            )
300        }
301        Err(e) => {
302            tracing::warn!(
303                tool = %tool_name,
304                error = %e,
305                "RLM analysis failed, falling back to truncation"
306            );
307            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
308        }
309    }
310}
311
312/// The swarm executor runs subtasks in parallel
313pub struct SwarmExecutor {
314    config: SwarmConfig,
315    /// Optional agent for handling swarm-level coordination (reserved for future use)
316    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
317}
318
319impl SwarmExecutor {
320    /// Create a new executor
321    pub fn new(config: SwarmConfig) -> Self {
322        Self {
323            config,
324            coordinator_agent: None,
325        }
326    }
327
328    /// Set a coordinator agent for swarm-level coordination
329    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
330        tracing::debug!("Setting coordinator agent for swarm execution");
331        self.coordinator_agent = Some(agent);
332        self
333    }
334
335    /// Get the coordinator agent if set
336    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
337        self.coordinator_agent.as_ref()
338    }
339
340    /// Execute a task using the swarm
341    pub async fn execute(
342        &self,
343        task: &str,
344        strategy: DecompositionStrategy,
345    ) -> Result<SwarmResult> {
346        let start_time = Instant::now();
347
348        // Create orchestrator
349        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
350
351        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
352
353        // Decompose the task
354        let subtasks = orchestrator.decompose(task, strategy).await?;
355
356        if subtasks.is_empty() {
357            return Ok(SwarmResult {
358                success: false,
359                result: String::new(),
360                subtask_results: Vec::new(),
361                stats: SwarmStats::default(),
362                artifacts: Vec::new(),
363                error: Some("No subtasks generated".to_string()),
364            });
365        }
366
367        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
368
369        // Execute stages in order
370        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
371        let mut all_results: Vec<SubTaskResult> = Vec::new();
372        let artifacts: Vec<SwarmArtifact> = Vec::new();
373
374        // Shared state for completed results
375        let completed_results: Arc<RwLock<HashMap<String, String>>> =
376            Arc::new(RwLock::new(HashMap::new()));
377
378        for stage in 0..=max_stage {
379            let stage_start = Instant::now();
380
381            let stage_subtasks: Vec<SubTask> = orchestrator
382                .subtasks_for_stage(stage)
383                .into_iter()
384                .cloned()
385                .collect();
386
387            tracing::debug!(
388                "Stage {} has {} subtasks (max_stage={})",
389                stage,
390                stage_subtasks.len(),
391                max_stage
392            );
393
394            if stage_subtasks.is_empty() {
395                continue;
396            }
397
398            tracing::info!(
399                provider_name = %orchestrator.provider(),
400                "Executing stage {} with {} subtasks",
401                stage,
402                stage_subtasks.len()
403            );
404
405            // Execute all subtasks in this stage in parallel
406            let stage_results = self
407                .execute_stage(&orchestrator, stage_subtasks, completed_results.clone())
408                .await?;
409
410            // Update completed results for next stage
411            {
412                let mut completed = completed_results.write().await;
413                for result in &stage_results {
414                    completed.insert(result.subtask_id.clone(), result.result.clone());
415                }
416            }
417
418            // Update orchestrator stats
419            let stage_time = stage_start.elapsed().as_millis() as u64;
420            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
421            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
422
423            orchestrator.stats_mut().stages.push(StageStats {
424                stage,
425                subagent_count: stage_results.len(),
426                max_steps,
427                total_steps,
428                execution_time_ms: stage_time,
429            });
430
431            // Mark subtasks as completed
432            for result in &stage_results {
433                orchestrator.complete_subtask(&result.subtask_id, result.clone());
434            }
435
436            all_results.extend(stage_results);
437        }
438
439        // Get provider name before mutable borrow
440        let provider_name = orchestrator.provider().to_string();
441
442        // Calculate final stats
443        let stats = orchestrator.stats_mut();
444        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
445        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
446        stats.calculate_critical_path();
447        stats.calculate_speedup();
448
449        // Aggregate results
450        let success = all_results.iter().all(|r| r.success);
451        let result = self.aggregate_results(&all_results).await?;
452
453        tracing::info!(
454            provider_name = %provider_name,
455            "Swarm execution complete: {} subtasks, {:.1}x speedup",
456            all_results.len(),
457            stats.speedup_factor
458        );
459
460        Ok(SwarmResult {
461            success,
462            result,
463            subtask_results: all_results,
464            stats: orchestrator.stats().clone(),
465            artifacts,
466            error: None,
467        })
468    }
469
470    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
471    async fn execute_stage(
472        &self,
473        orchestrator: &Orchestrator,
474        subtasks: Vec<SubTask>,
475        completed_results: Arc<RwLock<HashMap<String, String>>>,
476    ) -> Result<Vec<SubTaskResult>> {
477        let mut handles: Vec<
478            tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
479        > = Vec::new();
480
481        // Rate limiting: semaphore for max concurrent requests
482        let semaphore = Arc::new(tokio::sync::Semaphore::new(
483            self.config.max_concurrent_requests,
484        ));
485        let delay_ms = self.config.request_delay_ms;
486
487        // Get provider info for tool registry
488        let model = orchestrator.model().to_string();
489        let provider_name = orchestrator.provider().to_string();
490        let providers = orchestrator.providers();
491        let provider = providers
492            .get(&provider_name)
493            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
494
495        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
496
497        // Create shared tool registry with provider for ralph and batch tool
498        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
499        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
500        let tool_definitions: Vec<_> = tool_registry
501            .definitions()
502            .into_iter()
503            .filter(|t| t.name != "question")
504            .collect();
505
506        // Create worktree manager if enabled
507        let worktree_manager = if self.config.worktree_enabled {
508            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
509                std::env::current_dir()
510                    .map(|p| p.to_string_lossy().to_string())
511                    .unwrap_or_else(|_| ".".to_string())
512            });
513
514            match WorktreeManager::new(&working_dir) {
515                Ok(mgr) => {
516                    tracing::info!(
517                        working_dir = %working_dir,
518                        "Worktree isolation enabled for parallel sub-agents"
519                    );
520                    Some(Arc::new(mgr) as Arc<WorktreeManager>)
521                }
522                Err(e) => {
523                    tracing::warn!(
524                        error = %e,
525                        "Failed to create worktree manager, falling back to shared directory"
526                    );
527                    None
528                }
529            }
530        } else {
531            None
532        };
533
534        for (idx, subtask) in subtasks.into_iter().enumerate() {
535            let model = model.clone();
536            let _provider_name = provider_name.clone();
537            let provider = Arc::clone(&provider);
538
539            // Get context from dependencies
540            let context = {
541                let completed = completed_results.read().await;
542                let mut dep_context = String::new();
543                for dep_id in &subtask.dependencies {
544                    if let Some(result) = completed.get(dep_id) {
545                        dep_context.push_str(&format!(
546                            "\n--- Result from dependency {} ---\n{}\n",
547                            dep_id, result
548                        ));
549                    }
550                }
551                dep_context
552            };
553
554            let instruction = subtask.instruction.clone();
555            let specialty = subtask.specialty.clone().unwrap_or_default();
556            let subtask_id = subtask.id.clone();
557            let max_steps = self.config.max_steps_per_subagent;
558            let timeout_secs = self.config.subagent_timeout_secs;
559
560            // Clone for the async block
561            let tools = tool_definitions.clone();
562            let registry = Arc::clone(&tool_registry);
563            let sem = Arc::clone(&semaphore);
564            let stagger_delay = delay_ms * idx as u64; // Stagger start times
565            let worktree_mgr = worktree_manager.clone();
566
567            // Spawn the subtask execution with agentic tool loop
568            let handle = tokio::spawn(async move {
569                // Rate limiting: stagger start and acquire semaphore
570                if stagger_delay > 0 {
571                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
572                }
573                let _permit = sem.acquire().await.expect("semaphore closed");
574
575                let start = Instant::now();
576
577                // Create worktree for this sub-agent if enabled
578                let worktree_info = if let Some(ref mgr) = worktree_mgr {
579                    let task_slug = subtask_id.replace("-", "_");
580                    match mgr.create(&task_slug) {
581                        Ok(wt) => {
582                            tracing::info!(
583                                subtask_id = %subtask_id,
584                                worktree_path = %wt.path.display(),
585                                worktree_branch = %wt.branch,
586                                "Created worktree for sub-agent"
587                            );
588                            Some(wt)
589                        }
590                        Err(e) => {
591                            tracing::warn!(
592                                subtask_id = %subtask_id,
593                                error = %e,
594                                "Failed to create worktree, using shared directory"
595                            );
596                            None
597                        }
598                    }
599                } else {
600                    None
601                };
602
603                // Determine working directory
604                let working_dir = worktree_info
605                    .as_ref()
606                    .map(|wt| wt.path.display().to_string())
607                    .unwrap_or_else(|| ".".to_string());
608
609                // Load AGENTS.md from working directory
610                let working_path = std::path::Path::new(&working_dir);
611                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
612                    .map(|(content, _)| {
613                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
614                    })
615                    .unwrap_or_default();
616
617                // Build the system prompt for this sub-agent
618                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
619                let system_prompt = format!(
620                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
621
622WORKING DIRECTORY: {}
623All file operations should be relative to this directory.
624
625IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
626
627Available tools:
628- read: Read file contents
629- write: Write/create files  
630- edit: Edit existing files (search and replace)
631- multiedit: Make multiple edits at once
632- glob: Find files by pattern
633- grep: Search file contents
634- bash: Run shell commands (use cwd: \"{}\" parameter)
635- webfetch: Fetch web pages
636- prd: Generate structured PRD for complex tasks
637- ralph: Run autonomous agent loop on a PRD
638
639COMPLEX TASKS:
640If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
6411. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
6422. Break down into user stories with acceptance criteria
6433. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
6444. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
645
646NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
647
648When done, provide a brief summary of what you accomplished.{agents_md_content}",
649                    specialty,
650                    subtask_id,
651                    working_dir,
652                    working_dir,
653                    prd_filename,
654                    prd_filename,
655                    prd_filename
656                );
657
658                let user_prompt = if context.is_empty() {
659                    format!("Complete this task:\n\n{}", instruction)
660                } else {
661                    format!(
662                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
663                        instruction, context
664                    )
665                };
666
667                // Run the agentic loop
668                let result = run_agent_loop(
669                    provider,
670                    &model,
671                    &system_prompt,
672                    &user_prompt,
673                    tools,
674                    registry,
675                    max_steps,
676                    timeout_secs,
677                )
678                .await;
679
680                match result {
681                    Ok((output, steps, tool_calls)) => Ok((
682                        SubTaskResult {
683                            subtask_id: subtask_id.clone(),
684                            subagent_id: format!("agent-{}", subtask_id),
685                            success: true,
686                            result: output,
687                            steps,
688                            tool_calls,
689                            execution_time_ms: start.elapsed().as_millis() as u64,
690                            error: None,
691                            artifacts: Vec::new(),
692                        },
693                        worktree_info,
694                    )),
695                    Err(e) => Ok((
696                        SubTaskResult {
697                            subtask_id: subtask_id.clone(),
698                            subagent_id: format!("agent-{}", subtask_id),
699                            success: false,
700                            result: String::new(),
701                            steps: 0,
702                            tool_calls: 0,
703                            execution_time_ms: start.elapsed().as_millis() as u64,
704                            error: Some(e.to_string()),
705                            artifacts: Vec::new(),
706                        },
707                        worktree_info,
708                    )),
709                }
710            });
711
712            handles.push(handle);
713        }
714
715        // Wait for all handles and handle worktree merging
716        let mut results = Vec::new();
717        let auto_merge = self.config.worktree_auto_merge;
718
719        for handle in handles {
720            match handle.await {
721                Ok(Ok((mut result, worktree_info))) => {
722                    // Handle worktree merge if applicable
723                    if let Some(wt) = worktree_info {
724                        if result.success && auto_merge {
725                            if let Some(ref mgr) = worktree_manager {
726                                match mgr.merge(&wt) {
727                                    Ok(merge_result) => {
728                                        if merge_result.success {
729                                            tracing::info!(
730                                                subtask_id = %result.subtask_id,
731                                                files_changed = merge_result.files_changed,
732                                                "Merged worktree changes successfully"
733                                            );
734                                            result.result.push_str(&format!(
735                                                "\n\n--- Merge Result ---\n{}",
736                                                merge_result.summary
737                                            ));
738                                        } else if merge_result.aborted {
739                                            // Merge was aborted due to non-conflict failure
740                                            tracing::warn!(
741                                                subtask_id = %result.subtask_id,
742                                                summary = %merge_result.summary,
743                                                "Merge was aborted"
744                                            );
745                                            result.result.push_str(&format!(
746                                                "\n\n--- Merge Aborted ---\n{}",
747                                                merge_result.summary
748                                            ));
749                                        } else {
750                                            tracing::warn!(
751                                                subtask_id = %result.subtask_id,
752                                                conflicts = ?merge_result.conflicts,
753                                                "Merge had conflicts"
754                                            );
755                                            result.result.push_str(&format!(
756                                                "\n\n--- Merge Conflicts ---\n{}",
757                                                merge_result.summary
758                                            ));
759                                        }
760
761                                        // Cleanup worktree after merge
762                                        if let Err(e) = mgr.cleanup(&wt) {
763                                            tracing::warn!(
764                                                error = %e,
765                                                "Failed to cleanup worktree"
766                                            );
767                                        }
768                                    }
769                                    Err(e) => {
770                                        tracing::error!(
771                                            subtask_id = %result.subtask_id,
772                                            error = %e,
773                                            "Failed to merge worktree"
774                                        );
775                                    }
776                                }
777                            }
778                        } else if !result.success {
779                            // Keep worktree for debugging on failure
780                            tracing::info!(
781                                subtask_id = %result.subtask_id,
782                                worktree_path = %wt.path.display(),
783                                "Keeping worktree for debugging (task failed)"
784                            );
785                        }
786                    }
787
788                    results.push(result);
789                }
790                Ok(Err(e)) => {
791                    tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
792                }
793                Err(e) => {
794                    tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
795                }
796            }
797        }
798
799        Ok(results)
800    }
801
802    /// Aggregate results from all subtasks into a final result
803    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
804        let mut aggregated = String::new();
805
806        for (i, result) in results.iter().enumerate() {
807            if result.success {
808                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
809            } else {
810                aggregated.push_str(&format!(
811                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
812                    i + 1,
813                    result.error.as_deref().unwrap_or("Unknown error")
814                ));
815            }
816        }
817
818        Ok(aggregated)
819    }
820
821    /// Execute a single task without decomposition (for simple cases)
822    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
823        self.execute(task, DecompositionStrategy::None).await
824    }
825}
826
827/// Builder for swarm execution
828pub struct SwarmExecutorBuilder {
829    config: SwarmConfig,
830}
831
832impl SwarmExecutorBuilder {
833    pub fn new() -> Self {
834        Self {
835            config: SwarmConfig::default(),
836        }
837    }
838
839    pub fn max_subagents(mut self, max: usize) -> Self {
840        self.config.max_subagents = max;
841        self
842    }
843
844    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
845        self.config.max_steps_per_subagent = max;
846        self
847    }
848
849    pub fn max_total_steps(mut self, max: usize) -> Self {
850        self.config.max_total_steps = max;
851        self
852    }
853
854    pub fn timeout_secs(mut self, secs: u64) -> Self {
855        self.config.subagent_timeout_secs = secs;
856        self
857    }
858
859    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
860        self.config.parallel_enabled = enabled;
861        self
862    }
863
864    pub fn build(self) -> SwarmExecutor {
865        SwarmExecutor::new(self.config)
866    }
867}
868
869impl Default for SwarmExecutorBuilder {
870    fn default() -> Self {
871        Self::new()
872    }
873}
874
875/// Run the agentic loop for a sub-agent with tool execution
876#[allow(clippy::too_many_arguments)]
877/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
878pub async fn run_agent_loop(
879    provider: Arc<dyn Provider>,
880    model: &str,
881    system_prompt: &str,
882    user_prompt: &str,
883    tools: Vec<crate::provider::ToolDefinition>,
884    registry: Arc<ToolRegistry>,
885    max_steps: usize,
886    timeout_secs: u64,
887) -> Result<(String, usize, usize)> {
888    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
889    let temperature = 0.7;
890
891    tracing::info!(
892        model = %model,
893        max_steps = max_steps,
894        timeout_secs = timeout_secs,
895        "Sub-agent starting agentic loop"
896    );
897    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
898    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
899
900    // Initialize conversation with system and user messages
901    let mut messages = vec![
902        Message {
903            role: Role::System,
904            content: vec![ContentPart::Text {
905                text: system_prompt.to_string(),
906            }],
907        },
908        Message {
909            role: Role::User,
910            content: vec![ContentPart::Text {
911                text: user_prompt.to_string(),
912            }],
913        },
914    ];
915
916    let mut steps = 0;
917    let mut total_tool_calls = 0;
918    let mut final_output = String::new();
919
920    let deadline = Instant::now() + Duration::from_secs(timeout_secs);
921
922    loop {
923        if steps >= max_steps {
924            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
925            break;
926        }
927
928        if Instant::now() > deadline {
929            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
930            break;
931        }
932
933        steps += 1;
934        tracing::info!(step = steps, "Sub-agent step starting");
935
936        // Check context size and truncate if approaching limit
937        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
938
939        let request = CompletionRequest {
940            messages: messages.clone(),
941            tools: tools.clone(),
942            model: model.to_string(),
943            temperature: Some(temperature),
944            top_p: None,
945            max_tokens: Some(8192),
946            stop: Vec::new(),
947        };
948
949        let step_start = Instant::now();
950        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
951        let step_duration = step_start.elapsed();
952
953        tracing::info!(
954            step = steps,
955            duration_ms = step_duration.as_millis() as u64,
956            finish_reason = ?response.finish_reason,
957            prompt_tokens = response.usage.prompt_tokens,
958            completion_tokens = response.usage.completion_tokens,
959            "Sub-agent step completed LLM call"
960        );
961
962        // Extract text from response
963        let mut text_parts = Vec::new();
964        let mut tool_calls = Vec::new();
965
966        for part in &response.message.content {
967            match part {
968                ContentPart::Text { text } => {
969                    text_parts.push(text.clone());
970                }
971                ContentPart::ToolCall {
972                    id,
973                    name,
974                    arguments,
975                } => {
976                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
977                }
978                _ => {}
979            }
980        }
981
982        // Log assistant output
983        if !text_parts.is_empty() {
984            final_output = text_parts.join("\n");
985            tracing::info!(
986                step = steps,
987                output_len = final_output.len(),
988                "Sub-agent text output"
989            );
990            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
991        }
992
993        // Log tool calls
994        if !tool_calls.is_empty() {
995            tracing::info!(
996                step = steps,
997                num_tool_calls = tool_calls.len(),
998                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
999                "Sub-agent requesting tool calls"
1000            );
1001        }
1002
1003        // Add assistant message to history
1004        messages.push(response.message.clone());
1005
1006        // If no tool calls or stop, we're done
1007        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1008            tracing::info!(
1009                steps = steps,
1010                total_tool_calls = total_tool_calls,
1011                "Sub-agent finished"
1012            );
1013            break;
1014        }
1015
1016        // Execute tool calls
1017        let mut tool_results = Vec::new();
1018
1019        for (call_id, tool_name, arguments) in tool_calls {
1020            total_tool_calls += 1;
1021
1022            tracing::info!(
1023                step = steps,
1024                tool_call_id = %call_id,
1025                tool = %tool_name,
1026                "Executing tool"
1027            );
1028            tracing::debug!(
1029                tool = %tool_name,
1030                arguments = %arguments,
1031                "Tool call arguments"
1032            );
1033
1034            let tool_start = Instant::now();
1035            let result = if let Some(tool) = registry.get(&tool_name) {
1036                // Parse arguments as JSON
1037                let args: serde_json::Value =
1038                    serde_json::from_str(&arguments).unwrap_or_else(|_| serde_json::json!({}));
1039
1040                match tool.execute(args).await {
1041                    Ok(r) => {
1042                        if r.success {
1043                            tracing::info!(
1044                                tool = %tool_name,
1045                                duration_ms = tool_start.elapsed().as_millis() as u64,
1046                                success = true,
1047                                "Tool execution completed"
1048                            );
1049                            r.output
1050                        } else {
1051                            tracing::warn!(
1052                                tool = %tool_name,
1053                                error = %r.output,
1054                                "Tool returned error"
1055                            );
1056                            format!("Tool error: {}", r.output)
1057                        }
1058                    }
1059                    Err(e) => {
1060                        tracing::error!(
1061                            tool = %tool_name,
1062                            error = %e,
1063                            "Tool execution failed"
1064                        );
1065                        format!("Tool execution failed: {}", e)
1066                    }
1067                }
1068            } else {
1069                tracing::error!(tool = %tool_name, "Unknown tool requested");
1070                format!("Unknown tool: {}", tool_name)
1071            };
1072
1073            tracing::debug!(
1074                tool = %tool_name,
1075                result_len = result.len(),
1076                "Tool result"
1077            );
1078
1079            // Process large results with RLM or truncate smaller ones
1080            let result = if result.len() > RLM_THRESHOLD_CHARS {
1081                // Use RLM for very large results
1082                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1083                    .await
1084            } else {
1085                // Simple truncation for medium results
1086                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1087            };
1088
1089            tool_results.push((call_id, tool_name, result));
1090        }
1091
1092        // Add tool results to conversation
1093        for (call_id, _tool_name, result) in tool_results {
1094            messages.push(Message {
1095                role: Role::Tool,
1096                content: vec![ContentPart::ToolResult {
1097                    tool_call_id: call_id,
1098                    content: result,
1099                }],
1100            });
1101        }
1102    }
1103
1104    Ok((final_output, steps, total_tool_calls))
1105}