Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8    orchestrator::Orchestrator,
9    subtask::{SubTask, SubTaskResult, SubTaskStatus},
10};
11use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
12
13// Re-export swarm types for convenience
14pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
15use crate::{
16    agent::Agent,
17    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
18    rlm::RlmExecutor,
19    telemetry::SwarmTelemetryCollector,
20    swarm::{SwarmArtifact, SwarmStats},
21    tool::ToolRegistry,
22    worktree::{WorktreeInfo, WorktreeManager},
23};
24use anyhow::Result;
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28use tokio::sync::{RwLock, mpsc};
29use tokio::time::{Duration, timeout};
30
31/// Default context limit (256k tokens - conservative for most models)
32const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
33
34/// Reserve tokens for the response generation
35const RESPONSE_RESERVE_TOKENS: usize = 8_192;
36
37/// Safety margin before we start truncating (90% of limit)
38const TRUNCATION_THRESHOLD: f64 = 0.85;
39
40/// Estimate token count from text (rough heuristic: ~4 chars per token)
41fn estimate_tokens(text: &str) -> usize {
42    // This is a rough estimate - actual tokenization varies by model
43    // Most tokenizers average 3-5 chars per token for English text
44    // We use 3.5 to be conservative
45    (text.len() as f64 / 3.5).ceil() as usize
46}
47
48/// Estimate total tokens in a message
49fn estimate_message_tokens(message: &Message) -> usize {
50    let mut tokens = 4; // Role overhead
51
52    for part in &message.content {
53        tokens += match part {
54            ContentPart::Text { text } => estimate_tokens(text),
55            ContentPart::ToolCall {
56                id,
57                name,
58                arguments,
59            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
60            ContentPart::ToolResult {
61                tool_call_id,
62                content,
63            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
64            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
65        };
66    }
67
68    tokens
69}
70
71/// Estimate total tokens in all messages
72fn estimate_total_tokens(messages: &[Message]) -> usize {
73    messages.iter().map(estimate_message_tokens).sum()
74}
75
76/// Truncate messages to fit within context limit
77///
78/// Strategy:
79/// 1. First, aggressively truncate large tool results
80/// 2. Keep system message (first) and user message (second) always
81/// 3. Keep the most recent assistant + tool result pairs together
82/// 4. Drop oldest middle messages in matched pairs
83fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
84    let target_tokens =
85        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
86
87    let current_tokens = estimate_total_tokens(messages);
88    if current_tokens <= target_tokens {
89        return;
90    }
91
92    tracing::warn!(
93        current_tokens = current_tokens,
94        target_tokens = target_tokens,
95        context_limit = context_limit,
96        "Context approaching limit, truncating conversation history"
97    );
98
99    // FIRST: Aggressively truncate large tool results (this is the main culprit)
100    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
101
102    let after_tool_truncation = estimate_total_tokens(messages);
103    if after_tool_truncation <= target_tokens {
104        tracing::info!(
105            old_tokens = current_tokens,
106            new_tokens = after_tool_truncation,
107            "Truncated large tool results, context now within limits"
108        );
109        return;
110    }
111
112    // Minimum: keep first 2 (system + initial user) and last 4 messages
113    if messages.len() <= 6 {
114        tracing::warn!(
115            tokens = after_tool_truncation,
116            target = target_tokens,
117            "Cannot truncate further - conversation too short"
118        );
119        return;
120    }
121
122    // Remove messages from the middle, keeping first 2 and last 4
123    // But we need to remove in pairs to maintain tool_call_id references
124    let keep_start = 2;
125    let keep_end = 4;
126    let removable_count = messages.len() - keep_start - keep_end;
127
128    if removable_count == 0 {
129        return;
130    }
131
132    // Remove all middle messages
133    let removed_messages: Vec<_> = messages
134        .drain(keep_start..keep_start + removable_count)
135        .collect();
136    let summary = summarize_removed_messages(&removed_messages);
137
138    // Insert a summary message where we removed content
139    messages.insert(
140        keep_start,
141        Message {
142            role: Role::User,
143            content: vec![ContentPart::Text {
144                text: format!(
145                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
146                    removed_messages.len(),
147                    summary
148                ),
149            }],
150        },
151    );
152
153    let new_tokens = estimate_total_tokens(messages);
154    tracing::info!(
155        removed_messages = removed_messages.len(),
156        old_tokens = current_tokens,
157        new_tokens = new_tokens,
158        "Truncated conversation history"
159    );
160}
161
162/// Summarize removed messages for context preservation
163fn summarize_removed_messages(messages: &[Message]) -> String {
164    let mut summary = String::new();
165    let mut tool_calls: Vec<String> = Vec::new();
166
167    for msg in messages {
168        for part in &msg.content {
169            if let ContentPart::ToolCall { name, .. } = part {
170                if !tool_calls.contains(name) {
171                    tool_calls.push(name.clone());
172                }
173            }
174        }
175    }
176
177    if !tool_calls.is_empty() {
178        summary.push_str(&format!(
179            "Tools used in truncated history: {}",
180            tool_calls.join(", ")
181        ));
182    }
183
184    summary
185}
186
187/// Truncate large tool results that exceed a reasonable size
188fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
189    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
190    let mut truncated_count = 0;
191    let mut saved_tokens = 0usize;
192
193    for message in messages.iter_mut() {
194        for part in message.content.iter_mut() {
195            if let ContentPart::ToolResult { content, .. } = part {
196                let tokens = estimate_tokens(content);
197                if tokens > max_tokens_per_result {
198                    let old_len = content.len();
199                    *content = truncate_single_result(content, char_limit);
200                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
201                    if content.len() < old_len {
202                        truncated_count += 1;
203                    }
204                }
205            }
206        }
207    }
208
209    if truncated_count > 0 {
210        tracing::info!(
211            truncated_count = truncated_count,
212            saved_tokens = saved_tokens,
213            max_tokens_per_result = max_tokens_per_result,
214            "Truncated large tool results"
215        );
216    }
217}
218
219/// Truncate a single result string to a maximum character limit
220fn truncate_single_result(content: &str, max_chars: usize) -> String {
221    if content.len() <= max_chars {
222        return content.to_string();
223    }
224
225    // Find a valid char boundary at or before max_chars
226    let safe_limit = {
227        let mut limit = max_chars.min(content.len());
228        while limit > 0 && !content.is_char_boundary(limit) {
229            limit -= 1;
230        }
231        limit
232    };
233
234    // Try to find a good break point (newline) near the limit
235    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
236
237    let truncated = format!(
238        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
239        &content[..break_point],
240        content.len(),
241        break_point
242    );
243
244    tracing::debug!(
245        original_len = content.len(),
246        truncated_len = truncated.len(),
247        "Truncated large result"
248    );
249
250    truncated
251}
252
253/// Threshold for when to use RLM vs truncation (in characters)
254const RLM_THRESHOLD_CHARS: usize = 50_000;
255
256/// Max chars for simple truncation (below RLM threshold)
257const SIMPLE_TRUNCATE_CHARS: usize = 6000;
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum AgentLoopExit {
261    Completed,
262    MaxStepsReached,
263    TimedOut,
264}
265
266/// Process a large tool result using RLM to intelligently summarize it
267async fn process_large_result_with_rlm(
268    content: &str,
269    tool_name: &str,
270    provider: Arc<dyn Provider>,
271    model: &str,
272) -> String {
273    if content.len() <= SIMPLE_TRUNCATE_CHARS {
274        return content.to_string();
275    }
276
277    // For medium-sized content, just truncate
278    if content.len() <= RLM_THRESHOLD_CHARS {
279        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
280    }
281
282    // For very large content, use RLM to intelligently summarize
283    tracing::info!(
284        tool = %tool_name,
285        content_len = content.len(),
286        "Using RLM to process large tool result"
287    );
288
289    let query = format!(
290        "Summarize the key information from this {} output. \
291         Focus on: errors, warnings, important findings, and actionable items. \
292         Be concise but thorough.",
293        tool_name
294    );
295
296    let mut executor =
297        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
298
299    match executor.analyze(&query).await {
300        Ok(result) => {
301            tracing::info!(
302                tool = %tool_name,
303                original_len = content.len(),
304                summary_len = result.answer.len(),
305                iterations = result.iterations,
306                "RLM summarized large result"
307            );
308
309            format!(
310                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
311                tool_name,
312                content.len(),
313                result.answer.len(),
314                result.answer
315            )
316        }
317        Err(e) => {
318            tracing::warn!(
319                tool = %tool_name,
320                error = %e,
321                "RLM analysis failed, falling back to truncation"
322            );
323            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
324        }
325    }
326}
327
328/// The swarm executor runs subtasks in parallel
329pub struct SwarmExecutor {
330    config: SwarmConfig,
331    /// Optional agent for handling swarm-level coordination (reserved for future use)
332    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
333    /// Optional event channel for TUI real-time updates
334    event_tx: Option<mpsc::Sender<SwarmEvent>>,
335    /// Telemetry collector for swarm execution metrics
336    telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
337}
338
339impl SwarmExecutor {
340    /// Create a new executor
341    pub fn new(config: SwarmConfig) -> Self {
342        Self {
343            config,
344            coordinator_agent: None,
345            event_tx: None,
346            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
347        }
348    }
349
350    /// Set an event channel for real-time TUI updates
351    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
352        self.event_tx = Some(tx);
353        self
354    }
355
356    /// Set a coordinator agent for swarm-level coordination
357    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
358        tracing::debug!("Setting coordinator agent for swarm execution");
359        self.coordinator_agent = Some(agent);
360        self
361    }
362
363    /// Set a telemetry collector for swarm execution metrics
364    pub fn with_telemetry(mut self, telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>) -> Self {
365        self.telemetry = telemetry;
366        self
367    }
368
369    /// Get the telemetry collector as an Arc
370    pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
371        Arc::clone(&self.telemetry)
372    }
373    /// Get the coordinator agent if set
374    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
375        self.coordinator_agent.as_ref()
376    }
377
378    /// Send an event to the TUI if channel is connected (non-blocking)
379    fn try_send_event(&self, event: SwarmEvent) {
380        if let Some(ref tx) = self.event_tx {
381            let _ = tx.try_send(event);
382        }
383    }
384
385    /// Execute a task using the swarm
386    pub async fn execute(
387        &self,
388        task: &str,
389        strategy: DecompositionStrategy,
390    ) -> Result<SwarmResult> {
391        let start_time = Instant::now();
392
393        // Create orchestrator
394        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
395
396        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
397
398        // Decompose the task
399        let subtasks = orchestrator.decompose(task, strategy).await?;
400
401        if subtasks.is_empty() {
402            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
403            return Ok(SwarmResult {
404                success: false,
405                result: String::new(),
406                subtask_results: Vec::new(),
407                stats: SwarmStats::default(),
408                artifacts: Vec::new(),
409                error: Some("No subtasks generated".to_string()),
410            });
411        }
412
413        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
414
415        self.try_send_event(SwarmEvent::Started {
416            task: task.to_string(),
417            total_subtasks: subtasks.len(),
418        });
419
420        // Emit decomposition event for TUI
421        self.try_send_event(SwarmEvent::Decomposed {
422            subtasks: subtasks
423                .iter()
424                .map(|s| SubTaskInfo {
425                    id: s.id.clone(),
426                    name: s.name.clone(),
427                    status: SubTaskStatus::Pending,
428                    stage: s.stage,
429                    dependencies: s.dependencies.clone(),
430                    agent_name: s.specialty.clone(),
431                    current_tool: None,
432                    steps: 0,
433                    max_steps: self.config.max_steps_per_subagent,
434                    tool_call_history: Vec::new(),
435                    messages: Vec::new(),
436                    output: None,
437                    error: None,
438                })
439                .collect(),
440        });
441
442        // Execute stages in order
443        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
444        let mut all_results: Vec<SubTaskResult> = Vec::new();
445        let artifacts: Vec<SwarmArtifact> = Vec::new();
446
447        // Initialize telemetry for this swarm execution
448        let swarm_id = uuid::Uuid::new_v4().to_string();
449        self.telemetry.lock().await.start_swarm(swarm_id.clone(), subtasks.len(), &format!("{:?}", strategy));
450
451
452        // Shared state for completed results
453        let completed_results: Arc<RwLock<HashMap<String, String>>> =
454            Arc::new(RwLock::new(HashMap::new()));
455
456        for stage in 0..=max_stage {
457            let stage_start = Instant::now();
458
459            let stage_subtasks: Vec<SubTask> = orchestrator
460                .subtasks_for_stage(stage)
461                .into_iter()
462                .cloned()
463                .collect();
464
465            tracing::debug!(
466                "Stage {} has {} subtasks (max_stage={})",
467                stage,
468                stage_subtasks.len(),
469                max_stage
470            );
471
472            if stage_subtasks.is_empty() {
473                continue;
474            }
475
476            tracing::info!(
477                provider_name = %orchestrator.provider(),
478                "Executing stage {} with {} subtasks",
479                stage,
480                stage_subtasks.len()
481            );
482
483            // Execute all subtasks in this stage in parallel
484            let stage_results = self
485                .execute_stage(&orchestrator, stage_subtasks, completed_results.clone(), &swarm_id)
486                .await?;
487
488            // Update completed results for next stage
489            {
490                let mut completed = completed_results.write().await;
491                for result in &stage_results {
492                    completed.insert(result.subtask_id.clone(), result.result.clone());
493                }
494            }
495
496            // Update orchestrator stats
497            let stage_time = stage_start.elapsed().as_millis() as u64;
498            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
499            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
500
501            orchestrator.stats_mut().stages.push(StageStats {
502                stage,
503                subagent_count: stage_results.len(),
504                max_steps,
505                total_steps,
506                execution_time_ms: stage_time,
507            });
508
509            // Mark subtasks as completed
510            for result in &stage_results {
511                orchestrator.complete_subtask(&result.subtask_id, result.clone());
512            }
513
514            // Emit stage complete event
515            let stage_completed = stage_results.iter().filter(|r| r.success).count();
516            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
517            self.try_send_event(SwarmEvent::StageComplete {
518                stage,
519                completed: stage_completed,
520                failed: stage_failed,
521            });
522
523            all_results.extend(stage_results);
524        }
525
526        // Get provider name before mutable borrow
527        let provider_name = orchestrator.provider().to_string();
528
529        // Record overall execution latency
530        self.telemetry.lock().await.record_swarm_latency("total_execution", start_time.elapsed());
531
532        // Calculate final stats
533        let stats = orchestrator.stats_mut();
534        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
535        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
536        stats.calculate_critical_path();
537        stats.calculate_speedup();
538
539        // Aggregate results
540        let success = all_results.iter().all(|r| r.success);
541
542        // Complete telemetry collection
543        let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
544        let result = self.aggregate_results(&all_results).await?;
545
546        tracing::info!(
547            provider_name = %provider_name,
548            "Swarm execution complete: {} subtasks, {:.1}x speedup",
549            all_results.len(),
550            stats.speedup_factor
551        );
552
553        let final_stats = orchestrator.stats().clone();
554        self.try_send_event(SwarmEvent::Complete {
555            success,
556            stats: final_stats.clone(),
557        });
558
559        Ok(SwarmResult {
560            success,
561            result,
562            subtask_results: all_results,
563            stats: final_stats,
564            artifacts,
565            error: None,
566        })
567    }
568
569    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
570    async fn execute_stage(
571        &self,
572        orchestrator: &Orchestrator,
573        subtasks: Vec<SubTask>,
574        completed_results: Arc<RwLock<HashMap<String, String>>>,
575        swarm_id: &str,
576    ) -> Result<Vec<SubTaskResult>> {
577        let mut handles: Vec<(
578            String,
579            tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
580        )> = Vec::new();
581
582        // Rate limiting: semaphore for max concurrent requests
583        let semaphore = Arc::new(tokio::sync::Semaphore::new(
584            self.config.max_concurrent_requests,
585        ));
586        let delay_ms = self.config.request_delay_ms;
587
588        // Get provider info for tool registry
589        let model = orchestrator.model().to_string();
590        let provider_name = orchestrator.provider().to_string();
591        let providers = orchestrator.providers();
592        let provider = providers
593            .get(&provider_name)
594            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
595
596        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
597
598        // Create shared tool registry with provider for ralph and batch tool
599        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
600        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
601        let tool_definitions: Vec<_> = tool_registry
602            .definitions()
603            .into_iter()
604            .filter(|t| t.name != "question")
605            .collect();
606
607        // Create worktree manager if enabled
608        let worktree_manager = if self.config.worktree_enabled {
609            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
610                std::env::current_dir()
611                    .map(|p| p.to_string_lossy().to_string())
612                    .unwrap_or_else(|_| ".".to_string())
613            });
614
615            match WorktreeManager::new(&working_dir) {
616                Ok(mgr) => {
617                    tracing::info!(
618                        working_dir = %working_dir,
619                        "Worktree isolation enabled for parallel sub-agents"
620                    );
621                    Some(Arc::new(mgr) as Arc<WorktreeManager>)
622                }
623                Err(e) => {
624                    tracing::warn!(
625                        error = %e,
626                        "Failed to create worktree manager, falling back to shared directory"
627                    );
628                    None
629                }
630            }
631        } else {
632            None
633        };
634
635        for (idx, subtask) in subtasks.into_iter().enumerate() {
636            let model = model.clone();
637            let _provider_name = provider_name.clone();
638            let provider = Arc::clone(&provider);
639
640            // Get context from dependencies
641            let context = {
642                let completed = completed_results.read().await;
643                let mut dep_context = String::new();
644                for dep_id in &subtask.dependencies {
645                    if let Some(result) = completed.get(dep_id) {
646                        dep_context.push_str(&format!(
647                            "\n--- Result from dependency {} ---\n{}\n",
648                            dep_id, result
649                        ));
650                    }
651                }
652                dep_context
653            };
654
655            let instruction = subtask.instruction.clone();
656            let subtask_name = subtask.name.clone();
657            let specialty = subtask.specialty.clone().unwrap_or_default();
658            let subtask_id = subtask.id.clone();
659            let subtask_id_for_handle = subtask_id.clone();
660            let max_steps = self.config.max_steps_per_subagent;
661            let timeout_secs = self.config.subagent_timeout_secs;
662
663            // Clone for the async block
664            let tools = tool_definitions.clone();
665            let registry = Arc::clone(&tool_registry);
666            let sem = Arc::clone(&semaphore);
667            let stagger_delay = delay_ms * idx as u64; // Stagger start times
668            let worktree_mgr = worktree_manager.clone();
669            let event_tx = self.event_tx.clone();
670
671            // Generate sub-agent execution ID
672            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
673
674            // Log telemetry for this sub-agent
675            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
676
677            // Spawn the subtask execution with agentic tool loop
678            let handle = tokio::spawn(async move {
679                // Rate limiting: stagger start and acquire semaphore
680                if stagger_delay > 0 {
681                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
682                }
683                let _permit = sem
684                    .acquire()
685                    .await
686                    .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
687
688                let agent_start = Instant::now();
689
690
691                let start = Instant::now();
692
693                // Create worktree for this sub-agent if enabled
694                let worktree_info = if let Some(ref mgr) = worktree_mgr {
695                    let task_slug = subtask_id.replace("-", "_");
696                    match mgr.create(&task_slug) {
697                        Ok(wt) => {
698                            tracing::info!(
699                                subtask_id = %subtask_id,
700                                worktree_path = %wt.path.display(),
701                                worktree_branch = %wt.branch,
702                                "Created worktree for sub-agent"
703                            );
704                            Some(wt)
705                        }
706                        Err(e) => {
707                            tracing::warn!(
708                                subtask_id = %subtask_id,
709                                error = %e,
710                                "Failed to create worktree, using shared directory"
711                            );
712                            None
713                        }
714                    }
715                } else {
716                    None
717                };
718
719                // Determine working directory
720                let working_dir = worktree_info
721                    .as_ref()
722                    .map(|wt| wt.path.display().to_string())
723                    .unwrap_or_else(|| ".".to_string());
724
725                // Load AGENTS.md from working directory
726                let working_path = std::path::Path::new(&working_dir);
727                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
728                    .map(|(content, _)| {
729                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
730                    })
731                    .unwrap_or_default();
732
733                // Build the system prompt for this sub-agent
734                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
735                let system_prompt = format!(
736                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
737
738WORKING DIRECTORY: {}
739All file operations should be relative to this directory.
740
741IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
742
743Available tools:
744- read: Read file contents
745- write: Write/create files  
746- edit: Edit existing files (search and replace)
747- multiedit: Make multiple edits at once
748- glob: Find files by pattern
749- grep: Search file contents
750- bash: Run shell commands (use cwd: \"{}\" parameter)
751- webfetch: Fetch web pages
752- prd: Generate structured PRD for complex tasks
753- ralph: Run autonomous agent loop on a PRD
754
755COMPLEX TASKS:
756If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
7571. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
7582. Break down into user stories with acceptance criteria
7593. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
7604. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
761
762NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
763
764When done, provide a brief summary of what you accomplished.{agents_md_content}",
765                    specialty,
766                    subtask_id,
767                    working_dir,
768                    working_dir,
769                    prd_filename,
770                    prd_filename,
771                    prd_filename
772                );
773
774                let user_prompt = if context.is_empty() {
775                    format!("Complete this task:\n\n{}", instruction)
776                } else {
777                    format!(
778                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
779                        instruction, context
780                    )
781                };
782
783                // Emit AgentStarted event
784                if let Some(ref tx) = event_tx {
785                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
786                        id: subtask_id.clone(),
787                        name: subtask_name.clone(),
788                        status: SubTaskStatus::Running,
789                        agent_name: Some(format!("agent-{}", subtask_id)),
790                    });
791                    let _ = tx.try_send(SwarmEvent::AgentStarted {
792                        subtask_id: subtask_id.clone(),
793                        agent_name: format!("agent-{}", subtask_id),
794                        specialty: specialty.clone(),
795                    });
796                }
797
798                // Run the agentic loop
799                let result = run_agent_loop(
800                    provider,
801                    &model,
802                    &system_prompt,
803                    &user_prompt,
804                    tools,
805                    registry,
806                    max_steps,
807                    timeout_secs,
808                    event_tx.clone(),
809                    subtask_id.clone(),
810                )
811                .await;
812
813                match result {
814                    Ok((output, steps, tool_calls, exit_reason)) => {
815                        let (success, status, error) = match exit_reason {
816                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
817                            AgentLoopExit::MaxStepsReached => (
818                                false,
819                                SubTaskStatus::Failed,
820                                Some(format!("Sub-agent hit max steps ({max_steps})")),
821                            ),
822                            AgentLoopExit::TimedOut => (
823                                false,
824                                SubTaskStatus::TimedOut,
825                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
826                            ),
827                        };
828
829                        // Emit completion events
830                        if let Some(ref tx) = event_tx {
831                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
832                                id: subtask_id.clone(),
833                                name: subtask_name.clone(),
834                                status,
835                                agent_name: Some(format!("agent-{}", subtask_id)),
836                            });
837                            if let Some(ref message) = error {
838                                let _ = tx.try_send(SwarmEvent::AgentError {
839                                    subtask_id: subtask_id.clone(),
840                                    error: message.clone(),
841                                });
842                            }
843                            let _ = tx.try_send(SwarmEvent::AgentOutput {
844                                subtask_id: subtask_id.clone(),
845                                output: output.clone(),
846                            });
847                            let _ = tx.try_send(SwarmEvent::AgentComplete {
848                                subtask_id: subtask_id.clone(),
849                                success,
850                                steps,
851                            });
852                        }
853                        Ok((
854                            SubTaskResult {
855                                subtask_id: subtask_id.clone(),
856                                subagent_id: format!("agent-{}", subtask_id),
857                                success,
858                                result: output,
859                                steps,
860                                tool_calls,
861                                execution_time_ms: start.elapsed().as_millis() as u64,
862                                error,
863                                artifacts: Vec::new(),
864                            },
865                            worktree_info,
866                        ))
867                    }
868                    Err(e) => {
869                        // Emit error events
870                        if let Some(ref tx) = event_tx {
871                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
872                                id: subtask_id.clone(),
873                                name: subtask_name.clone(),
874                                status: SubTaskStatus::Failed,
875                                agent_name: Some(format!("agent-{}", subtask_id)),
876                            });
877                            let _ = tx.try_send(SwarmEvent::AgentError {
878                                subtask_id: subtask_id.clone(),
879                                error: e.to_string(),
880                            });
881                            let _ = tx.try_send(SwarmEvent::AgentComplete {
882                                subtask_id: subtask_id.clone(),
883                                success: false,
884                                steps: 0,
885                            });
886                        }
887                        Ok((
888                            SubTaskResult {
889                                subtask_id: subtask_id.clone(),
890                                subagent_id: format!("agent-{}", subtask_id),
891                                success: false,
892                                result: String::new(),
893                                steps: 0,
894                                tool_calls: 0,
895                                execution_time_ms: start.elapsed().as_millis() as u64,
896                                error: Some(e.to_string()),
897                                artifacts: Vec::new(),
898                            },
899                            worktree_info,
900                        ))
901                    }
902                }
903            });
904
905            handles.push((subtask_id_for_handle, handle));
906        }
907
908        // Wait for all handles and handle worktree merging
909        let mut results = Vec::new();
910        let auto_merge = self.config.worktree_auto_merge;
911
912        for (subtask_id, handle) in handles {
913            match handle.await {
914                Ok(Ok((mut result, worktree_info))) => {
915                    // Handle worktree merge if applicable
916                    if let Some(wt) = worktree_info {
917                        if result.success && auto_merge {
918                            if let Some(ref mgr) = worktree_manager {
919                                match mgr.merge(&wt) {
920                                    Ok(merge_result) => {
921                                        if merge_result.success {
922                                            tracing::info!(
923                                                subtask_id = %result.subtask_id,
924                                                files_changed = merge_result.files_changed,
925                                                "Merged worktree changes successfully"
926                                            );
927                                            result.result.push_str(&format!(
928                                                "\n\n--- Merge Result ---\n{}",
929                                                merge_result.summary
930                                            ));
931                                        } else if merge_result.aborted {
932                                            // Merge was aborted due to non-conflict failure
933                                            tracing::warn!(
934                                                subtask_id = %result.subtask_id,
935                                                summary = %merge_result.summary,
936                                                "Merge was aborted"
937                                            );
938                                            result.result.push_str(&format!(
939                                                "\n\n--- Merge Aborted ---\n{}",
940                                                merge_result.summary
941                                            ));
942                                        } else {
943                                            tracing::warn!(
944                                                subtask_id = %result.subtask_id,
945                                                conflicts = ?merge_result.conflicts,
946                                                "Merge had conflicts"
947                                            );
948                                            result.result.push_str(&format!(
949                                                "\n\n--- Merge Conflicts ---\n{}",
950                                                merge_result.summary
951                                            ));
952                                        }
953
954                                        // Cleanup worktree after merge
955                                        if let Err(e) = mgr.cleanup(&wt) {
956                                            tracing::warn!(
957                                                error = %e,
958                                                "Failed to cleanup worktree"
959                                            );
960                                        }
961                                    }
962                                    Err(e) => {
963                                        tracing::error!(
964                                            subtask_id = %result.subtask_id,
965                                            error = %e,
966                                            "Failed to merge worktree"
967                                        );
968                                    }
969                                }
970                            }
971                        } else if !result.success {
972                            // Keep worktree for debugging on failure
973                            tracing::info!(
974                                subtask_id = %result.subtask_id,
975                                worktree_path = %wt.path.display(),
976                                "Keeping worktree for debugging (task failed)"
977                            );
978                        }
979                    }
980
981                    results.push(result);
982                }
983                Ok(Err(e)) => {
984                    tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
985                    if let Some(ref tx) = self.event_tx {
986                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
987                            id: subtask_id.clone(),
988                            name: subtask_id.clone(),
989                            status: SubTaskStatus::Failed,
990                            agent_name: Some(format!("agent-{}", subtask_id)),
991                        });
992                        let _ = tx.try_send(SwarmEvent::AgentError {
993                            subtask_id: subtask_id.clone(),
994                            error: e.to_string(),
995                        });
996                        let _ = tx.try_send(SwarmEvent::AgentComplete {
997                            subtask_id: subtask_id.clone(),
998                            success: false,
999                            steps: 0,
1000                        });
1001                    }
1002                    results.push(SubTaskResult {
1003                        subtask_id: subtask_id.clone(),
1004                        subagent_id: format!("agent-{}", subtask_id),
1005                        success: false,
1006                        result: String::new(),
1007                        steps: 0,
1008                        tool_calls: 0,
1009                        execution_time_ms: 0,
1010                        error: Some(e.to_string()),
1011                        artifacts: Vec::new(),
1012                    });
1013                }
1014                Err(e) => {
1015                    tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1016                    if let Some(ref tx) = self.event_tx {
1017                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1018                            id: subtask_id.clone(),
1019                            name: subtask_id.clone(),
1020                            status: SubTaskStatus::Failed,
1021                            agent_name: Some(format!("agent-{}", subtask_id)),
1022                        });
1023                        let _ = tx.try_send(SwarmEvent::AgentError {
1024                            subtask_id: subtask_id.clone(),
1025                            error: format!("Task join error: {}", e),
1026                        });
1027                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1028                            subtask_id: subtask_id.clone(),
1029                            success: false,
1030                            steps: 0,
1031                        });
1032                    }
1033                    results.push(SubTaskResult {
1034                        subtask_id: subtask_id.clone(),
1035                        subagent_id: format!("agent-{}", subtask_id),
1036                        success: false,
1037                        result: String::new(),
1038                        steps: 0,
1039                        tool_calls: 0,
1040                        execution_time_ms: 0,
1041                        error: Some(format!("Task join error: {}", e)),
1042                        artifacts: Vec::new(),
1043                    });
1044                }
1045            }
1046        }
1047
1048        Ok(results)
1049    }
1050
1051    /// Aggregate results from all subtasks into a final result
1052    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1053        let mut aggregated = String::new();
1054
1055        for (i, result) in results.iter().enumerate() {
1056            if result.success {
1057                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1058            } else {
1059                aggregated.push_str(&format!(
1060                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1061                    i + 1,
1062                    result.error.as_deref().unwrap_or("Unknown error")
1063                ));
1064            }
1065        }
1066
1067        Ok(aggregated)
1068    }
1069
1070    /// Execute a single task without decomposition (for simple cases)
1071    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1072        self.execute(task, DecompositionStrategy::None).await
1073    }
1074}
1075
1076/// Builder for swarm execution
1077pub struct SwarmExecutorBuilder {
1078    config: SwarmConfig,
1079}
1080
1081impl SwarmExecutorBuilder {
1082    pub fn new() -> Self {
1083        Self {
1084            config: SwarmConfig::default(),
1085        }
1086    }
1087
1088    pub fn max_subagents(mut self, max: usize) -> Self {
1089        self.config.max_subagents = max;
1090        self
1091    }
1092
1093    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1094        self.config.max_steps_per_subagent = max;
1095        self
1096    }
1097
1098    pub fn max_total_steps(mut self, max: usize) -> Self {
1099        self.config.max_total_steps = max;
1100        self
1101    }
1102
1103    pub fn timeout_secs(mut self, secs: u64) -> Self {
1104        self.config.subagent_timeout_secs = secs;
1105        self
1106    }
1107
1108    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1109        self.config.parallel_enabled = enabled;
1110        self
1111    }
1112
1113    pub fn build(self) -> SwarmExecutor {
1114        SwarmExecutor::new(self.config)
1115    }
1116}
1117
1118impl Default for SwarmExecutorBuilder {
1119    fn default() -> Self {
1120        Self::new()
1121    }
1122}
1123
1124/// Run the agentic loop for a sub-agent with tool execution
1125#[allow(clippy::too_many_arguments)]
1126/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
1127pub async fn run_agent_loop(
1128    provider: Arc<dyn Provider>,
1129    model: &str,
1130    system_prompt: &str,
1131    user_prompt: &str,
1132    tools: Vec<crate::provider::ToolDefinition>,
1133    registry: Arc<ToolRegistry>,
1134    max_steps: usize,
1135    timeout_secs: u64,
1136    event_tx: Option<mpsc::Sender<SwarmEvent>>,
1137    subtask_id: String,
1138) -> Result<(String, usize, usize, AgentLoopExit)> {
1139    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
1140    let temperature = 0.7;
1141
1142    tracing::info!(
1143        model = %model,
1144        max_steps = max_steps,
1145        timeout_secs = timeout_secs,
1146        "Sub-agent starting agentic loop"
1147    );
1148    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1149    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1150
1151    // Initialize conversation with system and user messages
1152    let mut messages = vec![
1153        Message {
1154            role: Role::System,
1155            content: vec![ContentPart::Text {
1156                text: system_prompt.to_string(),
1157            }],
1158        },
1159        Message {
1160            role: Role::User,
1161            content: vec![ContentPart::Text {
1162                text: user_prompt.to_string(),
1163            }],
1164        },
1165    ];
1166
1167    let mut steps = 0;
1168    let mut total_tool_calls = 0;
1169    let mut final_output = String::new();
1170
1171    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1172
1173    let exit_reason = loop {
1174        if steps >= max_steps {
1175            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1176            break AgentLoopExit::MaxStepsReached;
1177        }
1178
1179        if Instant::now() > deadline {
1180            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1181            break AgentLoopExit::TimedOut;
1182        }
1183
1184        steps += 1;
1185        tracing::info!(step = steps, "Sub-agent step starting");
1186
1187        // Check context size and truncate if approaching limit
1188        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1189
1190        let request = CompletionRequest {
1191            messages: messages.clone(),
1192            tools: tools.clone(),
1193            model: model.to_string(),
1194            temperature: Some(temperature),
1195            top_p: None,
1196            max_tokens: Some(8192),
1197            stop: Vec::new(),
1198        };
1199
1200        let step_start = Instant::now();
1201        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1202        let step_duration = step_start.elapsed();
1203
1204        tracing::info!(
1205            step = steps,
1206            duration_ms = step_duration.as_millis() as u64,
1207            finish_reason = ?response.finish_reason,
1208            prompt_tokens = response.usage.prompt_tokens,
1209            completion_tokens = response.usage.completion_tokens,
1210            "Sub-agent step completed LLM call"
1211        );
1212
1213        // Extract text from response
1214        let mut text_parts = Vec::new();
1215        let mut tool_calls = Vec::new();
1216
1217        for part in &response.message.content {
1218            match part {
1219                ContentPart::Text { text } => {
1220                    text_parts.push(text.clone());
1221                }
1222                ContentPart::ToolCall {
1223                    id,
1224                    name,
1225                    arguments,
1226                } => {
1227                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1228                }
1229                _ => {}
1230            }
1231        }
1232
1233        // Log assistant output
1234        if !text_parts.is_empty() {
1235            final_output = text_parts.join("\n");
1236            tracing::info!(
1237                step = steps,
1238                output_len = final_output.len(),
1239                "Sub-agent text output"
1240            );
1241            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1242
1243            // Emit assistant message event for TUI detail view
1244            if let Some(ref tx) = event_tx {
1245                let preview = if final_output.len() > 500 {
1246                    let mut end = 500;
1247                    while end > 0 && !final_output.is_char_boundary(end) {
1248                        end -= 1;
1249                    }
1250                    format!("{}...", &final_output[..end])
1251                } else {
1252                    final_output.clone()
1253                };
1254                let _ = tx.try_send(SwarmEvent::AgentMessage {
1255                    subtask_id: subtask_id.clone(),
1256                    entry: AgentMessageEntry {
1257                        role: "assistant".to_string(),
1258                        content: preview,
1259                        is_tool_call: false,
1260                    },
1261                });
1262            }
1263        }
1264
1265        // Log tool calls
1266        if !tool_calls.is_empty() {
1267            tracing::info!(
1268                step = steps,
1269                num_tool_calls = tool_calls.len(),
1270                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1271                "Sub-agent requesting tool calls"
1272            );
1273        }
1274
1275        // Add assistant message to history
1276        messages.push(response.message.clone());
1277
1278        // If no tool calls or stop, we're done
1279        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1280            tracing::info!(
1281                steps = steps,
1282                total_tool_calls = total_tool_calls,
1283                "Sub-agent finished"
1284            );
1285            break AgentLoopExit::Completed;
1286        }
1287
1288        // Execute tool calls
1289        let mut tool_results = Vec::new();
1290
1291        for (call_id, tool_name, arguments) in tool_calls {
1292            total_tool_calls += 1;
1293
1294            // Emit tool call event
1295            if let Some(ref tx) = event_tx {
1296                let _ = tx.try_send(SwarmEvent::AgentToolCall {
1297                    subtask_id: subtask_id.clone(),
1298                    tool_name: tool_name.clone(),
1299                });
1300            }
1301
1302            tracing::info!(
1303                step = steps,
1304                tool_call_id = %call_id,
1305                tool = %tool_name,
1306                "Executing tool"
1307            );
1308            tracing::debug!(
1309                tool = %tool_name,
1310                arguments = %arguments,
1311                "Tool call arguments"
1312            );
1313
1314            let tool_start = Instant::now();
1315            let mut tool_success = true;
1316            let result = if let Some(tool) = registry.get(&tool_name) {
1317                // Parse arguments as JSON
1318                let args: serde_json::Value =
1319                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
1320                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1321                        serde_json::json!({})
1322                    });
1323
1324                match tool.execute(args).await {
1325                    Ok(r) => {
1326                        if r.success {
1327                            tracing::info!(
1328                                tool = %tool_name,
1329                                duration_ms = tool_start.elapsed().as_millis() as u64,
1330                                success = true,
1331                                "Tool execution completed"
1332                            );
1333                            r.output
1334                        } else {
1335                            tool_success = false;
1336                            tracing::warn!(
1337                                tool = %tool_name,
1338                                error = %r.output,
1339                                "Tool returned error"
1340                            );
1341                            format!("Tool error: {}", r.output)
1342                        }
1343                    }
1344                    Err(e) => {
1345                        tool_success = false;
1346                        tracing::error!(
1347                            tool = %tool_name,
1348                            error = %e,
1349                            "Tool execution failed"
1350                        );
1351                        format!("Tool execution failed: {}", e)
1352                    }
1353                }
1354            } else {
1355                tool_success = false;
1356                tracing::error!(tool = %tool_name, "Unknown tool requested");
1357                format!("Unknown tool: {}", tool_name)
1358            };
1359
1360            // Emit detailed tool call event
1361            if let Some(ref tx) = event_tx {
1362                let input_preview = if arguments.len() > 200 {
1363                    let mut end = 200;
1364                    while end > 0 && !arguments.is_char_boundary(end) {
1365                        end -= 1;
1366                    }
1367                    format!("{}...", &arguments[..end])
1368                } else {
1369                    arguments.clone()
1370                };
1371                let output_preview = if result.len() > 500 {
1372                    let mut end = 500;
1373                    while end > 0 && !result.is_char_boundary(end) {
1374                        end -= 1;
1375                    }
1376                    format!("{}...", &result[..end])
1377                } else {
1378                    result.clone()
1379                };
1380                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1381                    subtask_id: subtask_id.clone(),
1382                    detail: AgentToolCallDetail {
1383                        tool_name: tool_name.clone(),
1384                        input_preview,
1385                        output_preview,
1386                        success: tool_success,
1387                    },
1388                });
1389            }
1390
1391            tracing::debug!(
1392                tool = %tool_name,
1393                result_len = result.len(),
1394                "Tool result"
1395            );
1396
1397            // Process large results with RLM or truncate smaller ones
1398            let result = if result.len() > RLM_THRESHOLD_CHARS {
1399                // Use RLM for very large results
1400                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1401                    .await
1402            } else {
1403                // Simple truncation for medium results
1404                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1405            };
1406
1407            tool_results.push((call_id, tool_name, result));
1408        }
1409
1410        // Add tool results to conversation
1411        for (call_id, _tool_name, result) in tool_results {
1412            messages.push(Message {
1413                role: Role::Tool,
1414                content: vec![ContentPart::ToolResult {
1415                    tool_call_id: call_id,
1416                    content: result,
1417                }],
1418            });
1419        }
1420
1421        // Reset deadline after each successful step — agent is making progress
1422        deadline = Instant::now() + Duration::from_secs(timeout_secs);
1423    };
1424
1425    Ok((final_output, steps, total_tool_calls, exit_reason))
1426}