Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8    orchestrator::Orchestrator,
9    subtask::{SubTask, SubTaskResult, SubTaskStatus},
10};
11use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
12
13// Re-export swarm types for convenience
14pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
15use crate::{
16    agent::Agent,
17    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
18    rlm::RlmExecutor,
19    swarm::{SwarmArtifact, SwarmStats},
20    telemetry::SwarmTelemetryCollector,
21    tool::ToolRegistry,
22    worktree::{WorktreeInfo, WorktreeManager},
23};
24use anyhow::Result;
25use std::collections::HashMap;
26use std::sync::Arc;
27use std::time::Instant;
28use tokio::sync::{RwLock, mpsc};
29use tokio::time::{Duration, timeout};
30
31/// Default context limit (256k tokens - conservative for most models)
32const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
33
34/// Reserve tokens for the response generation
35const RESPONSE_RESERVE_TOKENS: usize = 8_192;
36
37/// Safety margin before we start truncating (90% of limit)
38const TRUNCATION_THRESHOLD: f64 = 0.85;
39
40/// Estimate token count from text (rough heuristic: ~4 chars per token)
41fn estimate_tokens(text: &str) -> usize {
42    // This is a rough estimate - actual tokenization varies by model
43    // Most tokenizers average 3-5 chars per token for English text
44    // We use 3.5 to be conservative
45    (text.len() as f64 / 3.5).ceil() as usize
46}
47
48/// Estimate total tokens in a message
49fn estimate_message_tokens(message: &Message) -> usize {
50    let mut tokens = 4; // Role overhead
51
52    for part in &message.content {
53        tokens += match part {
54            ContentPart::Text { text } => estimate_tokens(text),
55            ContentPart::ToolCall {
56                id,
57                name,
58                arguments,
59            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
60            ContentPart::ToolResult {
61                tool_call_id,
62                content,
63            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
64            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
65        };
66    }
67
68    tokens
69}
70
71/// Estimate total tokens in all messages
72fn estimate_total_tokens(messages: &[Message]) -> usize {
73    messages.iter().map(estimate_message_tokens).sum()
74}
75
76/// Truncate messages to fit within context limit
77///
78/// Strategy:
79/// 1. First, aggressively truncate large tool results
80/// 2. Keep system message (first) and user message (second) always
81/// 3. Keep the most recent assistant + tool result pairs together
82/// 4. Drop oldest middle messages in matched pairs
83fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
84    let target_tokens =
85        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
86
87    let current_tokens = estimate_total_tokens(messages);
88    if current_tokens <= target_tokens {
89        return;
90    }
91
92    tracing::warn!(
93        current_tokens = current_tokens,
94        target_tokens = target_tokens,
95        context_limit = context_limit,
96        "Context approaching limit, truncating conversation history"
97    );
98
99    // FIRST: Aggressively truncate large tool results (this is the main culprit)
100    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
101
102    let after_tool_truncation = estimate_total_tokens(messages);
103    if after_tool_truncation <= target_tokens {
104        tracing::info!(
105            old_tokens = current_tokens,
106            new_tokens = after_tool_truncation,
107            "Truncated large tool results, context now within limits"
108        );
109        return;
110    }
111
112    // Minimum: keep first 2 (system + initial user) and last 4 messages
113    if messages.len() <= 6 {
114        tracing::warn!(
115            tokens = after_tool_truncation,
116            target = target_tokens,
117            "Cannot truncate further - conversation too short"
118        );
119        return;
120    }
121
122    // Remove messages from the middle, keeping first 2 and last 4
123    // But we need to remove in pairs to maintain tool_call_id references
124    let keep_start = 2;
125    let keep_end = 4;
126    let removable_count = messages.len() - keep_start - keep_end;
127
128    if removable_count == 0 {
129        return;
130    }
131
132    // Remove all middle messages
133    let removed_messages: Vec<_> = messages
134        .drain(keep_start..keep_start + removable_count)
135        .collect();
136    let summary = summarize_removed_messages(&removed_messages);
137
138    // Insert a summary message where we removed content
139    messages.insert(
140        keep_start,
141        Message {
142            role: Role::User,
143            content: vec![ContentPart::Text {
144                text: format!(
145                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
146                    removed_messages.len(),
147                    summary
148                ),
149            }],
150        },
151    );
152
153    let new_tokens = estimate_total_tokens(messages);
154    tracing::info!(
155        removed_messages = removed_messages.len(),
156        old_tokens = current_tokens,
157        new_tokens = new_tokens,
158        "Truncated conversation history"
159    );
160}
161
162/// Summarize removed messages for context preservation
163fn summarize_removed_messages(messages: &[Message]) -> String {
164    let mut summary = String::new();
165    let mut tool_calls: Vec<String> = Vec::new();
166
167    for msg in messages {
168        for part in &msg.content {
169            if let ContentPart::ToolCall { name, .. } = part {
170                if !tool_calls.contains(name) {
171                    tool_calls.push(name.clone());
172                }
173            }
174        }
175    }
176
177    if !tool_calls.is_empty() {
178        summary.push_str(&format!(
179            "Tools used in truncated history: {}",
180            tool_calls.join(", ")
181        ));
182    }
183
184    summary
185}
186
187/// Truncate large tool results that exceed a reasonable size
188fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
189    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
190    let mut truncated_count = 0;
191    let mut saved_tokens = 0usize;
192
193    for message in messages.iter_mut() {
194        for part in message.content.iter_mut() {
195            if let ContentPart::ToolResult { content, .. } = part {
196                let tokens = estimate_tokens(content);
197                if tokens > max_tokens_per_result {
198                    let old_len = content.len();
199                    *content = truncate_single_result(content, char_limit);
200                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
201                    if content.len() < old_len {
202                        truncated_count += 1;
203                    }
204                }
205            }
206        }
207    }
208
209    if truncated_count > 0 {
210        tracing::info!(
211            truncated_count = truncated_count,
212            saved_tokens = saved_tokens,
213            max_tokens_per_result = max_tokens_per_result,
214            "Truncated large tool results"
215        );
216    }
217}
218
219/// Truncate a single result string to a maximum character limit
220fn truncate_single_result(content: &str, max_chars: usize) -> String {
221    if content.len() <= max_chars {
222        return content.to_string();
223    }
224
225    // Find a valid char boundary at or before max_chars
226    let safe_limit = {
227        let mut limit = max_chars.min(content.len());
228        while limit > 0 && !content.is_char_boundary(limit) {
229            limit -= 1;
230        }
231        limit
232    };
233
234    // Try to find a good break point (newline) near the limit
235    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
236
237    let truncated = format!(
238        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
239        &content[..break_point],
240        content.len(),
241        break_point
242    );
243
244    tracing::debug!(
245        original_len = content.len(),
246        truncated_len = truncated.len(),
247        "Truncated large result"
248    );
249
250    truncated
251}
252
253/// Threshold for when to use RLM vs truncation (in characters)
254const RLM_THRESHOLD_CHARS: usize = 50_000;
255
256/// Max chars for simple truncation (below RLM threshold)
257const SIMPLE_TRUNCATE_CHARS: usize = 6000;
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum AgentLoopExit {
261    Completed,
262    MaxStepsReached,
263    TimedOut,
264}
265
266/// Process a large tool result using RLM to intelligently summarize it
267async fn process_large_result_with_rlm(
268    content: &str,
269    tool_name: &str,
270    provider: Arc<dyn Provider>,
271    model: &str,
272) -> String {
273    if content.len() <= SIMPLE_TRUNCATE_CHARS {
274        return content.to_string();
275    }
276
277    // For medium-sized content, just truncate
278    if content.len() <= RLM_THRESHOLD_CHARS {
279        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
280    }
281
282    // For very large content, use RLM to intelligently summarize
283    tracing::info!(
284        tool = %tool_name,
285        content_len = content.len(),
286        "Using RLM to process large tool result"
287    );
288
289    let query = format!(
290        "Summarize the key information from this {} output. \
291         Focus on: errors, warnings, important findings, and actionable items. \
292         Be concise but thorough.",
293        tool_name
294    );
295
296    let mut executor =
297        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
298
299    match executor.analyze(&query).await {
300        Ok(result) => {
301            tracing::info!(
302                tool = %tool_name,
303                original_len = content.len(),
304                summary_len = result.answer.len(),
305                iterations = result.iterations,
306                "RLM summarized large result"
307            );
308
309            format!(
310                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
311                tool_name,
312                content.len(),
313                result.answer.len(),
314                result.answer
315            )
316        }
317        Err(e) => {
318            tracing::warn!(
319                tool = %tool_name,
320                error = %e,
321                "RLM analysis failed, falling back to truncation"
322            );
323            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
324        }
325    }
326}
327
328/// The swarm executor runs subtasks in parallel
329pub struct SwarmExecutor {
330    config: SwarmConfig,
331    /// Optional agent for handling swarm-level coordination (reserved for future use)
332    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
333    /// Optional event channel for TUI real-time updates
334    event_tx: Option<mpsc::Sender<SwarmEvent>>,
335    /// Telemetry collector for swarm execution metrics
336    telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
337}
338
339impl SwarmExecutor {
340    /// Create a new executor
341    pub fn new(config: SwarmConfig) -> Self {
342        Self {
343            config,
344            coordinator_agent: None,
345            event_tx: None,
346            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
347        }
348    }
349
350    /// Set an event channel for real-time TUI updates
351    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
352        self.event_tx = Some(tx);
353        self
354    }
355
356    /// Set a coordinator agent for swarm-level coordination
357    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
358        tracing::debug!("Setting coordinator agent for swarm execution");
359        self.coordinator_agent = Some(agent);
360        self
361    }
362
363    /// Set a telemetry collector for swarm execution metrics
364    pub fn with_telemetry(
365        mut self,
366        telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
367    ) -> Self {
368        self.telemetry = telemetry;
369        self
370    }
371
372    /// Get the telemetry collector as an Arc
373    pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
374        Arc::clone(&self.telemetry)
375    }
376    /// Get the coordinator agent if set
377    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
378        self.coordinator_agent.as_ref()
379    }
380
381    /// Send an event to the TUI if channel is connected (non-blocking)
382    fn try_send_event(&self, event: SwarmEvent) {
383        if let Some(ref tx) = self.event_tx {
384            let _ = tx.try_send(event);
385        }
386    }
387
388    /// Execute a task using the swarm
389    pub async fn execute(
390        &self,
391        task: &str,
392        strategy: DecompositionStrategy,
393    ) -> Result<SwarmResult> {
394        let start_time = Instant::now();
395
396        // Create orchestrator
397        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
398
399        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
400
401        // Decompose the task
402        let subtasks = orchestrator.decompose(task, strategy).await?;
403
404        if subtasks.is_empty() {
405            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
406            return Ok(SwarmResult {
407                success: false,
408                result: String::new(),
409                subtask_results: Vec::new(),
410                stats: SwarmStats::default(),
411                artifacts: Vec::new(),
412                error: Some("No subtasks generated".to_string()),
413            });
414        }
415
416        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
417
418        self.try_send_event(SwarmEvent::Started {
419            task: task.to_string(),
420            total_subtasks: subtasks.len(),
421        });
422
423        // Emit decomposition event for TUI
424        self.try_send_event(SwarmEvent::Decomposed {
425            subtasks: subtasks
426                .iter()
427                .map(|s| SubTaskInfo {
428                    id: s.id.clone(),
429                    name: s.name.clone(),
430                    status: SubTaskStatus::Pending,
431                    stage: s.stage,
432                    dependencies: s.dependencies.clone(),
433                    agent_name: s.specialty.clone(),
434                    current_tool: None,
435                    steps: 0,
436                    max_steps: self.config.max_steps_per_subagent,
437                    tool_call_history: Vec::new(),
438                    messages: Vec::new(),
439                    output: None,
440                    error: None,
441                })
442                .collect(),
443        });
444
445        // Execute stages in order
446        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
447        let mut all_results: Vec<SubTaskResult> = Vec::new();
448        let artifacts: Vec<SwarmArtifact> = Vec::new();
449
450        // Initialize telemetry for this swarm execution
451        let swarm_id = uuid::Uuid::new_v4().to_string();
452        self.telemetry.lock().await.start_swarm(
453            swarm_id.clone(),
454            subtasks.len(),
455            &format!("{:?}", strategy),
456        );
457
458        // Shared state for completed results
459        let completed_results: Arc<RwLock<HashMap<String, String>>> =
460            Arc::new(RwLock::new(HashMap::new()));
461
462        for stage in 0..=max_stage {
463            let stage_start = Instant::now();
464
465            let stage_subtasks: Vec<SubTask> = orchestrator
466                .subtasks_for_stage(stage)
467                .into_iter()
468                .cloned()
469                .collect();
470
471            tracing::debug!(
472                "Stage {} has {} subtasks (max_stage={})",
473                stage,
474                stage_subtasks.len(),
475                max_stage
476            );
477
478            if stage_subtasks.is_empty() {
479                continue;
480            }
481
482            tracing::info!(
483                provider_name = %orchestrator.provider(),
484                "Executing stage {} with {} subtasks",
485                stage,
486                stage_subtasks.len()
487            );
488
489            // Execute all subtasks in this stage in parallel
490            let stage_results = self
491                .execute_stage(
492                    &orchestrator,
493                    stage_subtasks,
494                    completed_results.clone(),
495                    &swarm_id,
496                )
497                .await?;
498
499            // Update completed results for next stage
500            {
501                let mut completed = completed_results.write().await;
502                for result in &stage_results {
503                    completed.insert(result.subtask_id.clone(), result.result.clone());
504                }
505            }
506
507            // Update orchestrator stats
508            let stage_time = stage_start.elapsed().as_millis() as u64;
509            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
510            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
511
512            orchestrator.stats_mut().stages.push(StageStats {
513                stage,
514                subagent_count: stage_results.len(),
515                max_steps,
516                total_steps,
517                execution_time_ms: stage_time,
518            });
519
520            // Mark subtasks as completed
521            for result in &stage_results {
522                orchestrator.complete_subtask(&result.subtask_id, result.clone());
523            }
524
525            // Emit stage complete event
526            let stage_completed = stage_results.iter().filter(|r| r.success).count();
527            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
528            self.try_send_event(SwarmEvent::StageComplete {
529                stage,
530                completed: stage_completed,
531                failed: stage_failed,
532            });
533
534            all_results.extend(stage_results);
535        }
536
537        // Get provider name before mutable borrow
538        let provider_name = orchestrator.provider().to_string();
539
540        // Record overall execution latency
541        self.telemetry
542            .lock()
543            .await
544            .record_swarm_latency("total_execution", start_time.elapsed());
545
546        // Calculate final stats
547        let stats = orchestrator.stats_mut();
548        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
549        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
550        stats.calculate_critical_path();
551        stats.calculate_speedup();
552
553        // Aggregate results
554        let success = all_results.iter().all(|r| r.success);
555
556        // Complete telemetry collection
557        let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
558        let result = self.aggregate_results(&all_results).await?;
559
560        tracing::info!(
561            provider_name = %provider_name,
562            "Swarm execution complete: {} subtasks, {:.1}x speedup",
563            all_results.len(),
564            stats.speedup_factor
565        );
566
567        let final_stats = orchestrator.stats().clone();
568        self.try_send_event(SwarmEvent::Complete {
569            success,
570            stats: final_stats.clone(),
571        });
572
573        Ok(SwarmResult {
574            success,
575            result,
576            subtask_results: all_results,
577            stats: final_stats,
578            artifacts,
579            error: None,
580        })
581    }
582
583    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
584    async fn execute_stage(
585        &self,
586        orchestrator: &Orchestrator,
587        subtasks: Vec<SubTask>,
588        completed_results: Arc<RwLock<HashMap<String, String>>>,
589        swarm_id: &str,
590    ) -> Result<Vec<SubTaskResult>> {
591        let mut handles: Vec<(
592            String,
593            tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
594        )> = Vec::new();
595
596        // Rate limiting: semaphore for max concurrent requests
597        let semaphore = Arc::new(tokio::sync::Semaphore::new(
598            self.config.max_concurrent_requests,
599        ));
600        let delay_ms = self.config.request_delay_ms;
601
602        // Get provider info for tool registry
603        let model = orchestrator.model().to_string();
604        let provider_name = orchestrator.provider().to_string();
605        let providers = orchestrator.providers();
606        let provider = providers
607            .get(&provider_name)
608            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
609
610        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
611
612        // Create shared tool registry with provider for ralph and batch tool
613        let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
614        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
615        let tool_definitions: Vec<_> = tool_registry
616            .definitions()
617            .into_iter()
618            .filter(|t| t.name != "question")
619            .collect();
620
621        // Create worktree manager if enabled
622        let worktree_manager = if self.config.worktree_enabled {
623            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
624                std::env::current_dir()
625                    .map(|p| p.to_string_lossy().to_string())
626                    .unwrap_or_else(|_| ".".to_string())
627            });
628
629            match WorktreeManager::new(&working_dir) {
630                Ok(mgr) => {
631                    tracing::info!(
632                        working_dir = %working_dir,
633                        "Worktree isolation enabled for parallel sub-agents"
634                    );
635                    Some(Arc::new(mgr) as Arc<WorktreeManager>)
636                }
637                Err(e) => {
638                    tracing::warn!(
639                        error = %e,
640                        "Failed to create worktree manager, falling back to shared directory"
641                    );
642                    None
643                }
644            }
645        } else {
646            None
647        };
648
649        for (idx, subtask) in subtasks.into_iter().enumerate() {
650            let model = model.clone();
651            let _provider_name = provider_name.clone();
652            let provider = Arc::clone(&provider);
653
654            // Get context from dependencies
655            let context = {
656                let completed = completed_results.read().await;
657                let mut dep_context = String::new();
658                for dep_id in &subtask.dependencies {
659                    if let Some(result) = completed.get(dep_id) {
660                        dep_context.push_str(&format!(
661                            "\n--- Result from dependency {} ---\n{}\n",
662                            dep_id, result
663                        ));
664                    }
665                }
666                dep_context
667            };
668
669            let instruction = subtask.instruction.clone();
670            let subtask_name = subtask.name.clone();
671            let specialty = subtask.specialty.clone().unwrap_or_default();
672            let subtask_id = subtask.id.clone();
673            let subtask_id_for_handle = subtask_id.clone();
674            let max_steps = self.config.max_steps_per_subagent;
675            let timeout_secs = self.config.subagent_timeout_secs;
676
677            // Clone for the async block
678            let tools = tool_definitions.clone();
679            let registry = Arc::clone(&tool_registry);
680            let sem = Arc::clone(&semaphore);
681            let stagger_delay = delay_ms * idx as u64; // Stagger start times
682            let worktree_mgr = worktree_manager.clone();
683            let event_tx = self.event_tx.clone();
684
685            // Generate sub-agent execution ID
686            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
687
688            // Log telemetry for this sub-agent
689            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
690
691            // Spawn the subtask execution with agentic tool loop
692            let handle = tokio::spawn(async move {
693                // Rate limiting: stagger start and acquire semaphore
694                if stagger_delay > 0 {
695                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
696                }
697                let _permit = sem
698                    .acquire()
699                    .await
700                    .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
701
702                let _agent_start = Instant::now();
703
704                let start = Instant::now();
705
706                // Create worktree for this sub-agent if enabled
707                let worktree_info = if let Some(ref mgr) = worktree_mgr {
708                    let task_slug = subtask_id.replace("-", "_");
709                    match mgr.create(&task_slug) {
710                        Ok(wt) => {
711                            tracing::info!(
712                                subtask_id = %subtask_id,
713                                worktree_path = %wt.path.display(),
714                                worktree_branch = %wt.branch,
715                                "Created worktree for sub-agent"
716                            );
717                            Some(wt)
718                        }
719                        Err(e) => {
720                            tracing::warn!(
721                                subtask_id = %subtask_id,
722                                error = %e,
723                                "Failed to create worktree, using shared directory"
724                            );
725                            None
726                        }
727                    }
728                } else {
729                    None
730                };
731
732                // Determine working directory
733                let working_dir = worktree_info
734                    .as_ref()
735                    .map(|wt| wt.path.display().to_string())
736                    .unwrap_or_else(|| ".".to_string());
737
738                // Load AGENTS.md from working directory
739                let working_path = std::path::Path::new(&working_dir);
740                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
741                    .map(|(content, _)| {
742                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
743                    })
744                    .unwrap_or_default();
745
746                // Build the system prompt for this sub-agent
747                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
748                let system_prompt = format!(
749                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
750
751WORKING DIRECTORY: {}
752All file operations should be relative to this directory.
753
754IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
755
756Available tools:
757- read: Read file contents
758- write: Write/create files  
759- edit: Edit existing files (search and replace)
760- multiedit: Make multiple edits at once
761- glob: Find files by pattern
762- grep: Search file contents
763- bash: Run shell commands (use cwd: \"{}\" parameter)
764- webfetch: Fetch web pages
765- prd: Generate structured PRD for complex tasks
766- ralph: Run autonomous agent loop on a PRD
767
768COMPLEX TASKS:
769If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
7701. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
7712. Break down into user stories with acceptance criteria
7723. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
7734. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
774
775NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
776
777When done, provide a brief summary of what you accomplished.{agents_md_content}",
778                    specialty,
779                    subtask_id,
780                    working_dir,
781                    working_dir,
782                    prd_filename,
783                    prd_filename,
784                    prd_filename
785                );
786
787                let user_prompt = if context.is_empty() {
788                    format!("Complete this task:\n\n{}", instruction)
789                } else {
790                    format!(
791                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
792                        instruction, context
793                    )
794                };
795
796                // Emit AgentStarted event
797                if let Some(ref tx) = event_tx {
798                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
799                        id: subtask_id.clone(),
800                        name: subtask_name.clone(),
801                        status: SubTaskStatus::Running,
802                        agent_name: Some(format!("agent-{}", subtask_id)),
803                    });
804                    let _ = tx.try_send(SwarmEvent::AgentStarted {
805                        subtask_id: subtask_id.clone(),
806                        agent_name: format!("agent-{}", subtask_id),
807                        specialty: specialty.clone(),
808                    });
809                }
810
811                // Run the agentic loop
812                let result = run_agent_loop(
813                    provider,
814                    &model,
815                    &system_prompt,
816                    &user_prompt,
817                    tools,
818                    registry,
819                    max_steps,
820                    timeout_secs,
821                    event_tx.clone(),
822                    subtask_id.clone(),
823                )
824                .await;
825
826                match result {
827                    Ok((output, steps, tool_calls, exit_reason)) => {
828                        let (success, status, error) = match exit_reason {
829                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
830                            AgentLoopExit::MaxStepsReached => (
831                                false,
832                                SubTaskStatus::Failed,
833                                Some(format!("Sub-agent hit max steps ({max_steps})")),
834                            ),
835                            AgentLoopExit::TimedOut => (
836                                false,
837                                SubTaskStatus::TimedOut,
838                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
839                            ),
840                        };
841
842                        // Emit completion events
843                        if let Some(ref tx) = event_tx {
844                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
845                                id: subtask_id.clone(),
846                                name: subtask_name.clone(),
847                                status,
848                                agent_name: Some(format!("agent-{}", subtask_id)),
849                            });
850                            if let Some(ref message) = error {
851                                let _ = tx.try_send(SwarmEvent::AgentError {
852                                    subtask_id: subtask_id.clone(),
853                                    error: message.clone(),
854                                });
855                            }
856                            let _ = tx.try_send(SwarmEvent::AgentOutput {
857                                subtask_id: subtask_id.clone(),
858                                output: output.clone(),
859                            });
860                            let _ = tx.try_send(SwarmEvent::AgentComplete {
861                                subtask_id: subtask_id.clone(),
862                                success,
863                                steps,
864                            });
865                        }
866                        Ok((
867                            SubTaskResult {
868                                subtask_id: subtask_id.clone(),
869                                subagent_id: format!("agent-{}", subtask_id),
870                                success,
871                                result: output,
872                                steps,
873                                tool_calls,
874                                execution_time_ms: start.elapsed().as_millis() as u64,
875                                error,
876                                artifacts: Vec::new(),
877                            },
878                            worktree_info,
879                        ))
880                    }
881                    Err(e) => {
882                        // Emit error events
883                        if let Some(ref tx) = event_tx {
884                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
885                                id: subtask_id.clone(),
886                                name: subtask_name.clone(),
887                                status: SubTaskStatus::Failed,
888                                agent_name: Some(format!("agent-{}", subtask_id)),
889                            });
890                            let _ = tx.try_send(SwarmEvent::AgentError {
891                                subtask_id: subtask_id.clone(),
892                                error: e.to_string(),
893                            });
894                            let _ = tx.try_send(SwarmEvent::AgentComplete {
895                                subtask_id: subtask_id.clone(),
896                                success: false,
897                                steps: 0,
898                            });
899                        }
900                        Ok((
901                            SubTaskResult {
902                                subtask_id: subtask_id.clone(),
903                                subagent_id: format!("agent-{}", subtask_id),
904                                success: false,
905                                result: String::new(),
906                                steps: 0,
907                                tool_calls: 0,
908                                execution_time_ms: start.elapsed().as_millis() as u64,
909                                error: Some(e.to_string()),
910                                artifacts: Vec::new(),
911                            },
912                            worktree_info,
913                        ))
914                    }
915                }
916            });
917
918            handles.push((subtask_id_for_handle, handle));
919        }
920
921        // Wait for all handles and handle worktree merging
922        let mut results = Vec::new();
923        let auto_merge = self.config.worktree_auto_merge;
924
925        for (subtask_id, handle) in handles {
926            match handle.await {
927                Ok(Ok((mut result, worktree_info))) => {
928                    // Handle worktree merge if applicable
929                    if let Some(wt) = worktree_info {
930                        if result.success && auto_merge {
931                            if let Some(ref mgr) = worktree_manager {
932                                match mgr.merge(&wt) {
933                                    Ok(merge_result) => {
934                                        if merge_result.success {
935                                            tracing::info!(
936                                                subtask_id = %result.subtask_id,
937                                                files_changed = merge_result.files_changed,
938                                                "Merged worktree changes successfully"
939                                            );
940                                            result.result.push_str(&format!(
941                                                "\n\n--- Merge Result ---\n{}",
942                                                merge_result.summary
943                                            ));
944                                        } else if merge_result.aborted {
945                                            // Merge was aborted due to non-conflict failure
946                                            tracing::warn!(
947                                                subtask_id = %result.subtask_id,
948                                                summary = %merge_result.summary,
949                                                "Merge was aborted"
950                                            );
951                                            result.result.push_str(&format!(
952                                                "\n\n--- Merge Aborted ---\n{}",
953                                                merge_result.summary
954                                            ));
955                                        } else {
956                                            tracing::warn!(
957                                                subtask_id = %result.subtask_id,
958                                                conflicts = ?merge_result.conflicts,
959                                                "Merge had conflicts"
960                                            );
961                                            result.result.push_str(&format!(
962                                                "\n\n--- Merge Conflicts ---\n{}",
963                                                merge_result.summary
964                                            ));
965                                        }
966
967                                        // Cleanup worktree after merge
968                                        if let Err(e) = mgr.cleanup(&wt) {
969                                            tracing::warn!(
970                                                error = %e,
971                                                "Failed to cleanup worktree"
972                                            );
973                                        }
974                                    }
975                                    Err(e) => {
976                                        tracing::error!(
977                                            subtask_id = %result.subtask_id,
978                                            error = %e,
979                                            "Failed to merge worktree"
980                                        );
981                                    }
982                                }
983                            }
984                        } else if !result.success {
985                            // Keep worktree for debugging on failure
986                            tracing::info!(
987                                subtask_id = %result.subtask_id,
988                                worktree_path = %wt.path.display(),
989                                "Keeping worktree for debugging (task failed)"
990                            );
991                        }
992                    }
993
994                    results.push(result);
995                }
996                Ok(Err(e)) => {
997                    tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
998                    if let Some(ref tx) = self.event_tx {
999                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1000                            id: subtask_id.clone(),
1001                            name: subtask_id.clone(),
1002                            status: SubTaskStatus::Failed,
1003                            agent_name: Some(format!("agent-{}", subtask_id)),
1004                        });
1005                        let _ = tx.try_send(SwarmEvent::AgentError {
1006                            subtask_id: subtask_id.clone(),
1007                            error: e.to_string(),
1008                        });
1009                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1010                            subtask_id: subtask_id.clone(),
1011                            success: false,
1012                            steps: 0,
1013                        });
1014                    }
1015                    results.push(SubTaskResult {
1016                        subtask_id: subtask_id.clone(),
1017                        subagent_id: format!("agent-{}", subtask_id),
1018                        success: false,
1019                        result: String::new(),
1020                        steps: 0,
1021                        tool_calls: 0,
1022                        execution_time_ms: 0,
1023                        error: Some(e.to_string()),
1024                        artifacts: Vec::new(),
1025                    });
1026                }
1027                Err(e) => {
1028                    tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1029                    if let Some(ref tx) = self.event_tx {
1030                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1031                            id: subtask_id.clone(),
1032                            name: subtask_id.clone(),
1033                            status: SubTaskStatus::Failed,
1034                            agent_name: Some(format!("agent-{}", subtask_id)),
1035                        });
1036                        let _ = tx.try_send(SwarmEvent::AgentError {
1037                            subtask_id: subtask_id.clone(),
1038                            error: format!("Task join error: {}", e),
1039                        });
1040                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1041                            subtask_id: subtask_id.clone(),
1042                            success: false,
1043                            steps: 0,
1044                        });
1045                    }
1046                    results.push(SubTaskResult {
1047                        subtask_id: subtask_id.clone(),
1048                        subagent_id: format!("agent-{}", subtask_id),
1049                        success: false,
1050                        result: String::new(),
1051                        steps: 0,
1052                        tool_calls: 0,
1053                        execution_time_ms: 0,
1054                        error: Some(format!("Task join error: {}", e)),
1055                        artifacts: Vec::new(),
1056                    });
1057                }
1058            }
1059        }
1060
1061        Ok(results)
1062    }
1063
1064    /// Aggregate results from all subtasks into a final result
1065    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1066        let mut aggregated = String::new();
1067
1068        for (i, result) in results.iter().enumerate() {
1069            if result.success {
1070                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1071            } else {
1072                aggregated.push_str(&format!(
1073                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1074                    i + 1,
1075                    result.error.as_deref().unwrap_or("Unknown error")
1076                ));
1077            }
1078        }
1079
1080        Ok(aggregated)
1081    }
1082
1083    /// Execute a single task without decomposition (for simple cases)
1084    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1085        self.execute(task, DecompositionStrategy::None).await
1086    }
1087}
1088
1089/// Builder for swarm execution
1090pub struct SwarmExecutorBuilder {
1091    config: SwarmConfig,
1092}
1093
1094impl SwarmExecutorBuilder {
1095    pub fn new() -> Self {
1096        Self {
1097            config: SwarmConfig::default(),
1098        }
1099    }
1100
1101    pub fn max_subagents(mut self, max: usize) -> Self {
1102        self.config.max_subagents = max;
1103        self
1104    }
1105
1106    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1107        self.config.max_steps_per_subagent = max;
1108        self
1109    }
1110
1111    pub fn max_total_steps(mut self, max: usize) -> Self {
1112        self.config.max_total_steps = max;
1113        self
1114    }
1115
1116    pub fn timeout_secs(mut self, secs: u64) -> Self {
1117        self.config.subagent_timeout_secs = secs;
1118        self
1119    }
1120
1121    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1122        self.config.parallel_enabled = enabled;
1123        self
1124    }
1125
1126    pub fn build(self) -> SwarmExecutor {
1127        SwarmExecutor::new(self.config)
1128    }
1129}
1130
1131impl Default for SwarmExecutorBuilder {
1132    fn default() -> Self {
1133        Self::new()
1134    }
1135}
1136
1137/// Run the agentic loop for a sub-agent with tool execution
1138#[allow(clippy::too_many_arguments)]
1139/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
1140pub async fn run_agent_loop(
1141    provider: Arc<dyn Provider>,
1142    model: &str,
1143    system_prompt: &str,
1144    user_prompt: &str,
1145    tools: Vec<crate::provider::ToolDefinition>,
1146    registry: Arc<ToolRegistry>,
1147    max_steps: usize,
1148    timeout_secs: u64,
1149    event_tx: Option<mpsc::Sender<SwarmEvent>>,
1150    subtask_id: String,
1151) -> Result<(String, usize, usize, AgentLoopExit)> {
1152    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
1153    let temperature = 0.7;
1154
1155    tracing::info!(
1156        model = %model,
1157        max_steps = max_steps,
1158        timeout_secs = timeout_secs,
1159        "Sub-agent starting agentic loop"
1160    );
1161    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1162    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1163
1164    // Initialize conversation with system and user messages
1165    let mut messages = vec![
1166        Message {
1167            role: Role::System,
1168            content: vec![ContentPart::Text {
1169                text: system_prompt.to_string(),
1170            }],
1171        },
1172        Message {
1173            role: Role::User,
1174            content: vec![ContentPart::Text {
1175                text: user_prompt.to_string(),
1176            }],
1177        },
1178    ];
1179
1180    let mut steps = 0;
1181    let mut total_tool_calls = 0;
1182    let mut final_output = String::new();
1183
1184    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1185
1186    let exit_reason = loop {
1187        if steps >= max_steps {
1188            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1189            break AgentLoopExit::MaxStepsReached;
1190        }
1191
1192        if Instant::now() > deadline {
1193            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1194            break AgentLoopExit::TimedOut;
1195        }
1196
1197        steps += 1;
1198        tracing::info!(step = steps, "Sub-agent step starting");
1199
1200        // Check context size and truncate if approaching limit
1201        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1202
1203        let request = CompletionRequest {
1204            messages: messages.clone(),
1205            tools: tools.clone(),
1206            model: model.to_string(),
1207            temperature: Some(temperature),
1208            top_p: None,
1209            max_tokens: Some(8192),
1210            stop: Vec::new(),
1211        };
1212
1213        let step_start = Instant::now();
1214        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1215        let step_duration = step_start.elapsed();
1216
1217        tracing::info!(
1218            step = steps,
1219            duration_ms = step_duration.as_millis() as u64,
1220            finish_reason = ?response.finish_reason,
1221            prompt_tokens = response.usage.prompt_tokens,
1222            completion_tokens = response.usage.completion_tokens,
1223            "Sub-agent step completed LLM call"
1224        );
1225
1226        // Extract text from response
1227        let mut text_parts = Vec::new();
1228        let mut tool_calls = Vec::new();
1229
1230        for part in &response.message.content {
1231            match part {
1232                ContentPart::Text { text } => {
1233                    text_parts.push(text.clone());
1234                }
1235                ContentPart::ToolCall {
1236                    id,
1237                    name,
1238                    arguments,
1239                } => {
1240                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1241                }
1242                _ => {}
1243            }
1244        }
1245
1246        // Log assistant output
1247        if !text_parts.is_empty() {
1248            let step_output = text_parts.join("\n");
1249            if !final_output.is_empty() {
1250                final_output.push('\n');
1251            }
1252            final_output.push_str(&step_output);
1253            tracing::info!(
1254                step = steps,
1255                output_len = final_output.len(),
1256                "Sub-agent text output"
1257            );
1258            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1259
1260            // Emit assistant message event for TUI detail view
1261            if let Some(ref tx) = event_tx {
1262                let preview = if step_output.len() > 500 {
1263                    let mut end = 500;
1264                    while end > 0 && !step_output.is_char_boundary(end) {
1265                        end -= 1;
1266                    }
1267                    format!("{}...", &step_output[..end])
1268                } else {
1269                    step_output.clone()
1270                };
1271                let _ = tx.try_send(SwarmEvent::AgentMessage {
1272                    subtask_id: subtask_id.clone(),
1273                    entry: AgentMessageEntry {
1274                        role: "assistant".to_string(),
1275                        content: preview,
1276                        is_tool_call: false,
1277                    },
1278                });
1279            }
1280        }
1281
1282        // Log tool calls
1283        if !tool_calls.is_empty() {
1284            tracing::info!(
1285                step = steps,
1286                num_tool_calls = tool_calls.len(),
1287                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1288                "Sub-agent requesting tool calls"
1289            );
1290        }
1291
1292        // Add assistant message to history
1293        messages.push(response.message.clone());
1294
1295        // If no tool calls or stop, we're done
1296        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1297            tracing::info!(
1298                steps = steps,
1299                total_tool_calls = total_tool_calls,
1300                "Sub-agent finished"
1301            );
1302            break AgentLoopExit::Completed;
1303        }
1304
1305        // Execute tool calls
1306        let mut tool_results = Vec::new();
1307
1308        for (call_id, tool_name, arguments) in tool_calls {
1309            total_tool_calls += 1;
1310
1311            // Emit tool call event
1312            if let Some(ref tx) = event_tx {
1313                let _ = tx.try_send(SwarmEvent::AgentToolCall {
1314                    subtask_id: subtask_id.clone(),
1315                    tool_name: tool_name.clone(),
1316                });
1317            }
1318
1319            tracing::info!(
1320                step = steps,
1321                tool_call_id = %call_id,
1322                tool = %tool_name,
1323                "Executing tool"
1324            );
1325            tracing::debug!(
1326                tool = %tool_name,
1327                arguments = %arguments,
1328                "Tool call arguments"
1329            );
1330
1331            let tool_start = Instant::now();
1332            let mut tool_success = true;
1333            let result = if let Some(tool) = registry.get(&tool_name) {
1334                // Parse arguments as JSON
1335                let args: serde_json::Value =
1336                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
1337                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1338                        serde_json::json!({})
1339                    });
1340
1341                match tool.execute(args).await {
1342                    Ok(r) => {
1343                        if r.success {
1344                            tracing::info!(
1345                                tool = %tool_name,
1346                                duration_ms = tool_start.elapsed().as_millis() as u64,
1347                                success = true,
1348                                "Tool execution completed"
1349                            );
1350                            r.output
1351                        } else {
1352                            tool_success = false;
1353                            tracing::warn!(
1354                                tool = %tool_name,
1355                                error = %r.output,
1356                                "Tool returned error"
1357                            );
1358                            format!("Tool error: {}", r.output)
1359                        }
1360                    }
1361                    Err(e) => {
1362                        tool_success = false;
1363                        tracing::error!(
1364                            tool = %tool_name,
1365                            error = %e,
1366                            "Tool execution failed"
1367                        );
1368                        format!("Tool execution failed: {}", e)
1369                    }
1370                }
1371            } else {
1372                tool_success = false;
1373                tracing::error!(tool = %tool_name, "Unknown tool requested");
1374                format!("Unknown tool: {}", tool_name)
1375            };
1376
1377            // Emit detailed tool call event
1378            if let Some(ref tx) = event_tx {
1379                let input_preview = if arguments.len() > 200 {
1380                    let mut end = 200;
1381                    while end > 0 && !arguments.is_char_boundary(end) {
1382                        end -= 1;
1383                    }
1384                    format!("{}...", &arguments[..end])
1385                } else {
1386                    arguments.clone()
1387                };
1388                let output_preview = if result.len() > 500 {
1389                    let mut end = 500;
1390                    while end > 0 && !result.is_char_boundary(end) {
1391                        end -= 1;
1392                    }
1393                    format!("{}...", &result[..end])
1394                } else {
1395                    result.clone()
1396                };
1397                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1398                    subtask_id: subtask_id.clone(),
1399                    detail: AgentToolCallDetail {
1400                        tool_name: tool_name.clone(),
1401                        input_preview,
1402                        output_preview,
1403                        success: tool_success,
1404                    },
1405                });
1406            }
1407
1408            tracing::debug!(
1409                tool = %tool_name,
1410                result_len = result.len(),
1411                "Tool result"
1412            );
1413
1414            // Process large results with RLM or truncate smaller ones
1415            let result = if result.len() > RLM_THRESHOLD_CHARS {
1416                // Use RLM for very large results
1417                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1418                    .await
1419            } else {
1420                // Simple truncation for medium results
1421                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1422            };
1423
1424            tool_results.push((call_id, tool_name, result));
1425        }
1426
1427        // Add tool results to conversation
1428        for (call_id, _tool_name, result) in tool_results {
1429            messages.push(Message {
1430                role: Role::Tool,
1431                content: vec![ContentPart::ToolResult {
1432                    tool_call_id: call_id,
1433                    content: result,
1434                }],
1435            });
1436        }
1437
1438        // Reset deadline after each successful step — agent is making progress
1439        deadline = Instant::now() + Duration::from_secs(timeout_secs);
1440    };
1441
1442    Ok((final_output, steps, total_tool_calls, exit_reason))
1443}