Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    BranchObservation, BranchRuntimeState, CacheConfig, CacheStats, CollapseController,
8    CollapsePolicy, DecompositionStrategy, ExecutionMode, StageStats, SwarmCache, SwarmConfig,
9    SwarmResult,
10    kubernetes_executor::{
11        RemoteSubtaskPayload, SWARM_SUBTASK_PAYLOAD_ENV, encode_payload, latest_probe_from_logs,
12        probe_changed_files_set, result_from_logs,
13    },
14    orchestrator::Orchestrator,
15    result_store::ResultStore,
16    subtask::{SubTask, SubTaskResult, SubTaskStatus},
17};
18use crate::bus::{AgentBus, BusMessage};
19use crate::k8s::{K8sManager, SubagentPodSpec, SubagentPodState};
20use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
21
22// Re-export swarm types for convenience
23pub use super::SwarmMessage;
24use crate::{
25    agent::Agent,
26    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
27    rlm::RlmExecutor,
28    swarm::{SwarmArtifact, SwarmStats},
29    telemetry::SwarmTelemetryCollector,
30    tool::ToolRegistry,
31    worktree::{WorktreeInfo, WorktreeManager},
32};
33use anyhow::Result;
34use futures::stream::{FuturesUnordered, StreamExt};
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::Instant;
38use tokio::sync::{RwLock, mpsc};
39use tokio::task::AbortHandle;
40use tokio::time::{Duration, MissedTickBehavior, timeout};
41
42/// Default context limit (256k tokens - conservative for most models)
43const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
44
45/// Reserve tokens for the response generation
46const RESPONSE_RESERVE_TOKENS: usize = 8_192;
47
48/// Safety margin before we start truncating (90% of limit)
49const TRUNCATION_THRESHOLD: f64 = 0.85;
50
51/// Estimate token count from text (rough heuristic: ~4 chars per token)
52fn estimate_tokens(text: &str) -> usize {
53    // This is a rough estimate - actual tokenization varies by model
54    // Most tokenizers average 3-5 chars per token for English text
55    // We use 3.5 to be conservative
56    (text.len() as f64 / 3.5).ceil() as usize
57}
58
59/// Estimate total tokens in a message
60fn estimate_message_tokens(message: &Message) -> usize {
61    let mut tokens = 4; // Role overhead
62
63    for part in &message.content {
64        tokens += match part {
65            ContentPart::Text { text } => estimate_tokens(text),
66            ContentPart::ToolCall {
67                id,
68                name,
69                arguments,
70                ..
71            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
72            ContentPart::ToolResult {
73                tool_call_id,
74                content,
75            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
76            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
77            ContentPart::Thinking { text } => estimate_tokens(text),
78        };
79    }
80
81    tokens
82}
83
84/// Estimate total tokens in all messages
85fn estimate_total_tokens(messages: &[Message]) -> usize {
86    messages.iter().map(estimate_message_tokens).sum()
87}
88
89/// Truncate messages to fit within context limit
90///
91/// Strategy:
92/// 1. First, aggressively truncate large tool results
93/// 2. Keep system message (first) and user message (second) always
94/// 3. Keep the most recent assistant + tool result pairs together
95/// 4. Drop oldest middle messages in matched pairs
96fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
97    let target_tokens =
98        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
99
100    let current_tokens = estimate_total_tokens(messages);
101    if current_tokens <= target_tokens {
102        return;
103    }
104
105    tracing::warn!(
106        current_tokens = current_tokens,
107        target_tokens = target_tokens,
108        context_limit = context_limit,
109        "Context approaching limit, truncating conversation history"
110    );
111
112    // FIRST: Aggressively truncate large tool results (this is the main culprit)
113    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
114
115    let after_tool_truncation = estimate_total_tokens(messages);
116    if after_tool_truncation <= target_tokens {
117        tracing::info!(
118            old_tokens = current_tokens,
119            new_tokens = after_tool_truncation,
120            "Truncated large tool results, context now within limits"
121        );
122        return;
123    }
124
125    // Minimum: keep first 2 (system + initial user) and last 4 messages
126    if messages.len() <= 6 {
127        tracing::warn!(
128            tokens = after_tool_truncation,
129            target = target_tokens,
130            "Cannot truncate further - conversation too short"
131        );
132        return;
133    }
134
135    // Remove messages from the middle, keeping first 2 and last 4
136    // But we need to remove in pairs to maintain tool_call_id references
137    let keep_start = 2;
138    let keep_end = 4;
139    let removable_count = messages.len() - keep_start - keep_end;
140
141    if removable_count == 0 {
142        return;
143    }
144
145    // Remove all middle messages
146    let removed_messages: Vec<_> = messages
147        .drain(keep_start..keep_start + removable_count)
148        .collect();
149    let summary = summarize_removed_messages(&removed_messages);
150
151    // Insert a summary message where we removed content
152    messages.insert(
153        keep_start,
154        Message {
155            role: Role::User,
156            content: vec![ContentPart::Text {
157                text: format!(
158                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
159                    removed_messages.len(),
160                    summary
161                ),
162            }],
163        },
164    );
165
166    let new_tokens = estimate_total_tokens(messages);
167    tracing::info!(
168        removed_messages = removed_messages.len(),
169        old_tokens = current_tokens,
170        new_tokens = new_tokens,
171        "Truncated conversation history"
172    );
173}
174
175/// Summarize removed messages for context preservation
176fn summarize_removed_messages(messages: &[Message]) -> String {
177    let mut summary = String::new();
178    let mut tool_calls: Vec<String> = Vec::new();
179
180    for msg in messages {
181        for part in &msg.content {
182            if let ContentPart::ToolCall { name, .. } = part {
183                if !tool_calls.contains(name) {
184                    tool_calls.push(name.clone());
185                }
186            }
187        }
188    }
189
190    if !tool_calls.is_empty() {
191        summary.push_str(&format!(
192            "Tools used in truncated history: {}",
193            tool_calls.join(", ")
194        ));
195    }
196
197    summary
198}
199
200/// Truncate large tool results that exceed a reasonable size
201fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
202    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
203    let mut truncated_count = 0;
204    let mut saved_tokens = 0usize;
205
206    for message in messages.iter_mut() {
207        for part in message.content.iter_mut() {
208            if let ContentPart::ToolResult { content, .. } = part {
209                let tokens = estimate_tokens(content);
210                if tokens > max_tokens_per_result {
211                    let old_len = content.len();
212                    *content = truncate_single_result(content, char_limit);
213                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
214                    if content.len() < old_len {
215                        truncated_count += 1;
216                    }
217                }
218            }
219        }
220    }
221
222    if truncated_count > 0 {
223        tracing::info!(
224            truncated_count = truncated_count,
225            saved_tokens = saved_tokens,
226            max_tokens_per_result = max_tokens_per_result,
227            "Truncated large tool results"
228        );
229    }
230}
231
232/// Truncate a single result string to a maximum character limit
233fn truncate_single_result(content: &str, max_chars: usize) -> String {
234    if content.len() <= max_chars {
235        return content.to_string();
236    }
237
238    // Find a valid char boundary at or before max_chars
239    let safe_limit = {
240        let mut limit = max_chars.min(content.len());
241        while limit > 0 && !content.is_char_boundary(limit) {
242            limit -= 1;
243        }
244        limit
245    };
246
247    // Try to find a good break point (newline) near the limit
248    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
249
250    let truncated = format!(
251        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
252        &content[..break_point],
253        content.len(),
254        break_point
255    );
256
257    tracing::debug!(
258        original_len = content.len(),
259        truncated_len = truncated.len(),
260        "Truncated large result"
261    );
262
263    truncated
264}
265
266/// Threshold for when to use RLM vs truncation (in characters)
267const RLM_THRESHOLD_CHARS: usize = 50_000;
268
269/// Max chars for simple truncation (below RLM threshold)
270const SIMPLE_TRUNCATE_CHARS: usize = 6000;
271
272/// Collapse controller sampling interval for branch health.
273const COLLAPSE_SAMPLE_SECS: u64 = 5;
274const SWARM_FALLBACK_PROMPT_ENV: &str = "CODETETHER_SWARM_FALLBACK_PROMPT";
275const SWARM_FALLBACK_MODEL_ENV: &str = "CODETETHER_SWARM_FALLBACK_MODEL";
276const K8S_PASSTHROUGH_ENV_VARS: &[&str] = &[
277    "VAULT_ADDR",
278    "VAULT_TOKEN",
279    "VAULT_MOUNT",
280    "VAULT_SECRETS_PATH",
281    "VAULT_NAMESPACE",
282    "CODETETHER_AUTH_TOKEN",
283];
284
285#[derive(Debug, Clone)]
286struct ActiveK8sBranch {
287    branch: String,
288    started_at: Instant,
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
292pub enum AgentLoopExit {
293    Completed,
294    MaxStepsReached,
295    TimedOut,
296}
297
298fn compute_resource_health(pod_state: Option<&SubagentPodState>) -> (f32, u32) {
299    let Some(pod_state) = pod_state else {
300        return (0.2, 1);
301    };
302
303    let reason = pod_state
304        .reason
305        .as_deref()
306        .unwrap_or_default()
307        .to_ascii_lowercase();
308    let phase = pod_state.phase.to_ascii_lowercase();
309
310    if reason.contains("oomkilled") {
311        return (0.0, 3);
312    }
313    if reason.contains("imagepullbackoff") || reason.contains("errimagepull") {
314        return (0.0, 3);
315    }
316    if reason.contains("crashloopbackoff") {
317        return (0.1, 2);
318    }
319    if phase == "failed" {
320        return (0.1, 2);
321    }
322
323    let mut score = 1.0f32;
324    let mut unhealthy_signals = 0u32;
325
326    if !pod_state.ready {
327        score -= 0.2;
328    }
329    if !reason.is_empty() {
330        score -= 0.3;
331        unhealthy_signals += 1;
332    }
333    if pod_state.restart_count > 0 {
334        score -= (pod_state.restart_count.min(3) as f32) * 0.2;
335        unhealthy_signals += 1;
336    }
337
338    (score.clamp(0.0, 1.0), unhealthy_signals)
339}
340
341/// Process a large tool result using RLM to intelligently summarize it
342async fn process_large_result_with_rlm(
343    content: &str,
344    tool_name: &str,
345    provider: Arc<dyn Provider>,
346    model: &str,
347) -> String {
348    if content.len() <= SIMPLE_TRUNCATE_CHARS {
349        return content.to_string();
350    }
351
352    // For medium-sized content, just truncate
353    if content.len() <= RLM_THRESHOLD_CHARS {
354        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
355    }
356
357    // For very large content, use RLM to intelligently summarize
358    tracing::info!(
359        tool = %tool_name,
360        content_len = content.len(),
361        "Using RLM to process large tool result"
362    );
363
364    let query = format!(
365        "Summarize the key information from this {} output. \
366         Focus on: errors, warnings, important findings, and actionable items. \
367         Be concise but thorough.",
368        tool_name
369    );
370
371    let mut executor =
372        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
373
374    match executor.analyze(&query).await {
375        Ok(result) => {
376            tracing::info!(
377                tool = %tool_name,
378                original_len = content.len(),
379                summary_len = result.answer.len(),
380                iterations = result.iterations,
381                "RLM summarized large result"
382            );
383
384            format!(
385                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
386                tool_name,
387                content.len(),
388                result.answer.len(),
389                result.answer
390            )
391        }
392        Err(e) => {
393            tracing::warn!(
394                tool = %tool_name,
395                error = %e,
396                "RLM analysis failed, falling back to truncation"
397            );
398            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
399        }
400    }
401}
402
403/// The swarm executor runs subtasks in parallel
404pub struct SwarmExecutor {
405    config: SwarmConfig,
406    /// Optional agent for handling swarm-level coordination (reserved for future use)
407    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
408    /// Optional event channel for TUI real-time updates
409    event_tx: Option<mpsc::Sender<SwarmEvent>>,
410    /// Telemetry collector for swarm execution metrics
411    telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
412    /// Cache for avoiding duplicate subtask execution
413    cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
414    /// Shared result store for sub-agent result sharing
415    result_store: Arc<ResultStore>,
416    /// Optional agent bus for inter-agent communication
417    bus: Option<Arc<AgentBus>>,
418}
419
420impl SwarmExecutor {
421    /// Create a new executor
422    pub fn new(config: SwarmConfig) -> Self {
423        Self {
424            config,
425            coordinator_agent: None,
426            event_tx: None,
427            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
428            cache: None,
429            result_store: ResultStore::new_arc(),
430            bus: None,
431        }
432    }
433
434    /// Create a new executor with caching enabled
435    pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
436        let cache = SwarmCache::new(cache_config).await?;
437        Ok(Self {
438            config,
439            coordinator_agent: None,
440            event_tx: None,
441            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
442            cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
443            result_store: ResultStore::new_arc(),
444            bus: None,
445        })
446    }
447
448    /// Set a pre-initialized cache
449    pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
450        self.cache = Some(cache);
451        self
452    }
453
454    /// Set an agent bus for inter-agent communication
455    pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
456        self.bus = Some(bus);
457        self
458    }
459
460    /// Get the agent bus if set
461    pub fn bus(&self) -> Option<&Arc<AgentBus>> {
462        self.bus.as_ref()
463    }
464
465    /// Set an event channel for real-time TUI updates
466    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
467        self.event_tx = Some(tx);
468        self
469    }
470
471    /// Set a coordinator agent for swarm-level coordination
472    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
473        tracing::debug!("Setting coordinator agent for swarm execution");
474        self.coordinator_agent = Some(agent);
475        self
476    }
477
478    /// Set a telemetry collector for swarm execution metrics
479    pub fn with_telemetry(
480        mut self,
481        telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
482    ) -> Self {
483        self.telemetry = telemetry;
484        self
485    }
486
487    /// Get the telemetry collector as an Arc
488    pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
489        Arc::clone(&self.telemetry)
490    }
491    /// Get the coordinator agent if set
492    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
493        tracing::debug!(
494            has_coordinator = self.coordinator_agent.is_some(),
495            "Getting coordinator agent"
496        );
497        self.coordinator_agent.as_ref()
498    }
499
500    /// Get the shared result store
501    pub fn result_store(&self) -> &Arc<ResultStore> {
502        &self.result_store
503    }
504
505    /// Get cache statistics if caching is enabled
506    pub async fn cache_stats(&self) -> Option<CacheStats> {
507        if let Some(ref cache) = self.cache {
508            let cache_guard = cache.lock().await;
509            Some(cache_guard.stats().clone())
510        } else {
511            None
512        }
513    }
514
515    /// Clear the cache if enabled
516    pub async fn clear_cache(&self) -> Result<()> {
517        if let Some(ref cache) = self.cache {
518            let mut cache_guard = cache.lock().await;
519            cache_guard.clear().await?;
520        }
521        Ok(())
522    }
523
524    /// Send an event to the TUI if channel is connected (non-blocking)
525    fn try_send_event(&self, event: SwarmEvent) {
526        // Also emit on the agent bus if connected
527        if let Some(ref bus) = self.bus {
528            let handle = bus.handle("swarm-executor");
529            match &event {
530                SwarmEvent::Started { task, .. } => {
531                    handle.send(
532                        "broadcast",
533                        BusMessage::AgentReady {
534                            agent_id: "swarm-executor".to_string(),
535                            capabilities: vec![format!("executing:{task}")],
536                        },
537                    );
538                }
539                SwarmEvent::Complete { success, .. } => {
540                    let state = if *success {
541                        crate::a2a::types::TaskState::Completed
542                    } else {
543                        crate::a2a::types::TaskState::Failed
544                    };
545                    handle.send_task_update("swarm", state, None);
546                }
547                _ => {} // Other events are TUI-specific
548            }
549        }
550
551        if let Some(ref tx) = self.event_tx {
552            let _ = tx.try_send(event);
553        }
554    }
555
556    /// Execute a task using the swarm
557    pub async fn execute(
558        &self,
559        task: &str,
560        strategy: DecompositionStrategy,
561    ) -> Result<SwarmResult> {
562        let start_time = Instant::now();
563
564        // Create orchestrator
565        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
566
567        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
568
569        // Decompose the task
570        let subtasks = orchestrator.decompose(task, strategy).await?;
571
572        if subtasks.is_empty() {
573            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
574            return Ok(SwarmResult {
575                success: false,
576                result: String::new(),
577                subtask_results: Vec::new(),
578                stats: SwarmStats::default(),
579                artifacts: Vec::new(),
580                error: Some("No subtasks generated".to_string()),
581            });
582        }
583
584        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
585
586        self.try_send_event(SwarmEvent::Started {
587            task: task.to_string(),
588            total_subtasks: subtasks.len(),
589        });
590
591        // Emit decomposition event for TUI
592        self.try_send_event(SwarmEvent::Decomposed {
593            subtasks: subtasks
594                .iter()
595                .map(|s| SubTaskInfo {
596                    id: s.id.clone(),
597                    name: s.name.clone(),
598                    status: SubTaskStatus::Pending,
599                    stage: s.stage,
600                    dependencies: s.dependencies.clone(),
601                    agent_name: s.specialty.clone(),
602                    current_tool: None,
603                    steps: 0,
604                    max_steps: self.config.max_steps_per_subagent,
605                    tool_call_history: Vec::new(),
606                    messages: Vec::new(),
607                    output: None,
608                    error: None,
609                })
610                .collect(),
611        });
612
613        // Execute stages in order
614        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
615        let mut all_results: Vec<SubTaskResult> = Vec::new();
616        let artifacts: Vec<SwarmArtifact> = Vec::new();
617
618        // Initialize telemetry for this swarm execution
619        let swarm_id = uuid::Uuid::new_v4().to_string();
620        let strategy_str = format!("{:?}", strategy);
621        self.telemetry.lock().await.start_swarm(
622            &swarm_id,
623            subtasks.len(),
624            &strategy_str,
625        );
626
627        // Shared state for completed results
628        let completed_results: Arc<RwLock<HashMap<String, String>>> =
629            Arc::new(RwLock::new(HashMap::new()));
630
631        for stage in 0..=max_stage {
632            let stage_start = Instant::now();
633
634            let stage_subtasks: Vec<SubTask> = orchestrator
635                .subtasks_for_stage(stage)
636                .into_iter()
637                .cloned()
638                .collect();
639
640            tracing::debug!(
641                "Stage {} has {} subtasks (max_stage={})",
642                stage,
643                stage_subtasks.len(),
644                max_stage
645            );
646
647            if stage_subtasks.is_empty() {
648                continue;
649            }
650
651            tracing::info!(
652                provider_name = %orchestrator.provider(),
653                "Executing stage {} with {} subtasks",
654                stage,
655                stage_subtasks.len()
656            );
657
658            // Execute all subtasks in this stage in parallel
659            let stage_results = self
660                .execute_stage(
661                    &orchestrator,
662                    stage_subtasks,
663                    completed_results.clone(),
664                    &swarm_id,
665                )
666                .await?;
667
668            // Update completed results for next stage
669            {
670                let mut completed = completed_results.write().await;
671                for result in &stage_results {
672                    completed.insert(result.subtask_id.clone(), result.result.clone());
673                    // Also publish to shared ResultStore for richer querying
674                    let tags = vec![
675                        format!("stage:{stage}"),
676                        format!("subtask:{}", result.subtask_id),
677                    ];
678                    let _ = self
679                        .result_store
680                        .publish(
681                            &result.subtask_id,
682                            &result.subagent_id,
683                            &result.result,
684                            tags,
685                            None,
686                        )
687                        .await;
688                }
689            }
690
691            // Update orchestrator stats
692            let stage_time = stage_start.elapsed().as_millis() as u64;
693            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
694            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
695
696            orchestrator.stats_mut().stages.push(StageStats {
697                stage,
698                subagent_count: stage_results.len(),
699                max_steps,
700                total_steps,
701                execution_time_ms: stage_time,
702            });
703
704            // Mark subtasks as completed
705            for result in &stage_results {
706                orchestrator.complete_subtask(&result.subtask_id, result.clone());
707            }
708
709            // Emit stage complete event
710            let stage_completed = stage_results.iter().filter(|r| r.success).count();
711            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
712            self.try_send_event(SwarmEvent::StageComplete {
713                stage,
714                completed: stage_completed,
715                failed: stage_failed,
716            });
717
718            all_results.extend(stage_results);
719        }
720
721        // Get provider name before mutable borrow
722        let provider_name = orchestrator.provider().to_string();
723
724        // Record overall execution latency
725        self.telemetry
726            .lock()
727            .await
728            .record_swarm_latency("total_execution", start_time.elapsed());
729
730        // Calculate final stats
731        let stats = orchestrator.stats_mut();
732        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
733        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
734        stats.calculate_critical_path();
735        stats.calculate_speedup();
736
737        // Aggregate results
738        let success = all_results.iter().all(|r| r.success);
739
740        // Complete telemetry collection
741        let telemetry_metrics = self.telemetry.lock().await.complete_swarm(success).await;
742        let result = self.aggregate_results(&all_results).await?;
743
744        tracing::info!(
745            provider_name = %provider_name,
746            "Swarm execution complete: {} subtasks, {:.1}x speedup",
747            all_results.len(),
748            stats.speedup_factor
749        );
750
751        let final_stats = orchestrator.stats().clone();
752        self.try_send_event(SwarmEvent::Complete {
753            success,
754            stats: final_stats.clone(),
755        });
756
757        Ok(SwarmResult {
758            success,
759            result,
760            subtask_results: all_results,
761            stats: final_stats,
762            artifacts,
763            error: None,
764        })
765    }
766
767    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
768    async fn execute_stage(
769        &self,
770        orchestrator: &Orchestrator,
771        subtasks: Vec<SubTask>,
772        completed_results: Arc<RwLock<HashMap<String, String>>>,
773        swarm_id: &str,
774    ) -> Result<Vec<SubTaskResult>> {
775        if self.config.execution_mode == ExecutionMode::KubernetesPod {
776            return self
777                .execute_stage_kubernetes(orchestrator, subtasks, completed_results, swarm_id)
778                .await;
779        }
780
781        let mut handles: FuturesUnordered<
782            tokio::task::JoinHandle<(String, Result<SubTaskResult, anyhow::Error>)>,
783        > = FuturesUnordered::new();
784        let mut abort_handles: HashMap<String, AbortHandle> = HashMap::new();
785        let mut task_ids: HashMap<tokio::task::Id, String> = HashMap::new();
786        let mut active_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
787        let mut all_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
788        let mut cached_results: Vec<SubTaskResult> = Vec::new();
789        let mut completed_entries: Vec<(SubTaskResult, Option<WorktreeInfo>)> = Vec::new();
790        let mut kill_reasons: HashMap<String, String> = HashMap::new();
791        let mut promoted_subtask_id: Option<String> = None;
792
793        // Rate limiting: semaphore for max concurrent requests
794        let semaphore = Arc::new(tokio::sync::Semaphore::new(
795            self.config.max_concurrent_requests,
796        ));
797        let delay_ms = self.config.request_delay_ms;
798
799        // Get provider info for tool registry
800        let model = orchestrator.model().to_string();
801        let provider_name = orchestrator.provider().to_string();
802        let providers = orchestrator.providers();
803        let provider = providers
804            .get(&provider_name)
805            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
806
807        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
808
809        // Create base tool registry with provider for ralph and batch tool
810        let base_tool_registry =
811            ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
812        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
813        // Include 'swarm_share' definition so LLMs know about it (registered per-agent below)
814        let mut tool_definitions: Vec<_> = base_tool_registry
815            .definitions()
816            .into_iter()
817            .filter(|t| t.name != "question")
818            .collect();
819
820        // Add swarm_share tool definition so LLMs know it's available
821        let swarm_share_def = crate::provider::ToolDefinition {
822            name: "swarm_share".to_string(),
823            description: "Share results with other sub-agents in the swarm. Actions: publish \
824                          (share a result), get (retrieve a result by key), query_tags (find \
825                          results by tags), query_prefix (find results by key prefix), list \
826                          (show all shared results)."
827                .to_string(),
828            parameters: serde_json::json!({
829                "type": "object",
830                "properties": {
831                    "action": {
832                        "type": "string",
833                        "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
834                        "description": "Action to perform"
835                    },
836                    "key": {
837                        "type": "string",
838                        "description": "Result key (for publish/get)"
839                    },
840                    "value": {
841                        "description": "Result value to publish (any JSON value)"
842                    },
843                    "tags": {
844                        "type": "array",
845                        "items": {"type": "string"},
846                        "description": "Tags for publish or query_tags"
847                    },
848                    "prefix": {
849                        "type": "string",
850                        "description": "Key prefix for query_prefix"
851                    }
852                },
853                "required": ["action"]
854            }),
855        };
856        tool_definitions.push(swarm_share_def);
857
858        // Clone the result store for sub-agent sharing
859        let result_store = Arc::clone(&self.result_store);
860
861        // Create worktree manager if enabled
862        let worktree_manager = if self.config.worktree_enabled {
863            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
864                std::env::current_dir()
865                    .map(|p| p.to_string_lossy().to_string())
866                    .unwrap_or_else(|_| ".".to_string())
867            });
868
869            let mgr = WorktreeManager::new(&working_dir);
870            tracing::info!(
871                working_dir = %working_dir,
872                "Worktree isolation enabled for parallel sub-agents"
873            );
874            Some(Arc::new(mgr) as Arc<WorktreeManager>)
875        } else {
876            None
877        };
878
879        for (idx, subtask) in subtasks.into_iter().enumerate() {
880            let model = model.clone();
881            let _provider_name = provider_name.clone();
882            let provider = Arc::clone(&provider);
883
884            // Check cache first
885            if let Some(ref cache) = self.cache {
886                let mut cache_guard = cache.lock().await;
887                if let Some(cached_result) = cache_guard.get(&subtask).await {
888                    tracing::info!(
889                        subtask_id = %subtask.id,
890                        task_name = %subtask.name,
891                        "Cache hit for subtask, skipping execution"
892                    );
893                    self.try_send_event(SwarmEvent::SubTaskUpdate {
894                        id: subtask.id.clone(),
895                        name: subtask.name.clone(),
896                        status: SubTaskStatus::Completed,
897                        agent_name: Some("cached".to_string()),
898                    });
899                    cached_results.push(cached_result);
900                    continue;
901                }
902            }
903
904            // Get context from dependencies
905            let context = {
906                let completed = completed_results.read().await;
907                let mut dep_context = String::new();
908                for dep_id in &subtask.dependencies {
909                    if let Some(result) = completed.get(dep_id) {
910                        dep_context.push_str(&format!(
911                            "\n--- Result from dependency {} ---\n{}\n",
912                            dep_id, result
913                        ));
914                    }
915                }
916                dep_context
917            };
918
919            let instruction = subtask.instruction.clone();
920            let subtask_name = subtask.name.clone();
921            let specialty = subtask.specialty.clone().unwrap_or_default();
922            let subtask_id = subtask.id.clone();
923            let subtask_id_for_handle = subtask_id.clone();
924            let max_steps = self.config.max_steps_per_subagent;
925            let timeout_secs = self.config.subagent_timeout_secs;
926
927            // Create worktree for this sub-agent before spawning so the collapse controller
928            // can monitor live branches while execution is in-flight.
929            let worktree_info = if let Some(ref mgr) = worktree_manager {
930                let task_slug = subtask_id.replace("-", "_");
931                match mgr.create(&task_slug).await {
932                    Ok(wt) => {
933                        if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
934                            tracing::warn!(
935                                subtask_id = %subtask_id,
936                                error = %e,
937                                "Failed to inject workspace stub into worktree"
938                            );
939                        }
940                        tracing::info!(
941                            subtask_id = %subtask_id,
942                            worktree_path = %wt.path.display(),
943                            worktree_branch = %wt.branch,
944                            "Created worktree for sub-agent"
945                        );
946                        active_worktrees.insert(subtask_id.clone(), wt.clone());
947                        all_worktrees.insert(subtask_id.clone(), wt.clone());
948                        Some(wt)
949                    }
950                    Err(e) => {
951                        tracing::warn!(
952                            subtask_id = %subtask_id,
953                            error = %e,
954                            "Failed to create worktree, using shared directory"
955                        );
956                        None
957                    }
958                }
959            } else {
960                None
961            };
962
963            let working_dir = worktree_info
964                .as_ref()
965                .map(|wt| wt.path.display().to_string())
966                .unwrap_or_else(|| ".".to_string());
967            let working_dir_path = worktree_info.as_ref().map(|wt| wt.path.clone());
968
969            // Clone for the async block
970            let tools = tool_definitions.clone();
971            let _base_registry = Arc::clone(&base_tool_registry);
972            let agent_result_store = Arc::clone(&result_store);
973            let sem = Arc::clone(&semaphore);
974            let stagger_delay = delay_ms * idx as u64; // Stagger start times
975            let event_tx = self.event_tx.clone();
976
977            // Generate sub-agent execution ID
978            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
979
980            // Log telemetry for this sub-agent
981            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
982
983            // Spawn the subtask execution with agentic tool loop
984            let handle = tokio::spawn(async move {
985                // Rate limiting: stagger start and acquire semaphore
986                if stagger_delay > 0 {
987                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
988                }
989                let _permit = match sem.acquire().await {
990                    Ok(permit) => permit,
991                    Err(_) => {
992                        return (
993                            subtask_id.clone(),
994                            Err(anyhow::anyhow!("Swarm execution cancelled")),
995                        );
996                    }
997                };
998
999                let _agent_start = Instant::now();
1000
1001                let start = Instant::now();
1002
1003                // Load AGENTS.md from working directory
1004                let working_path = std::path::Path::new(&working_dir);
1005                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
1006                    .map(|(content, _)| {
1007                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
1008                    })
1009                    .unwrap_or_default();
1010
1011                // Build the system prompt for this sub-agent
1012                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
1013                let system_prompt = format!(
1014                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
1015
1016WORKING DIRECTORY: {}
1017All file operations should be relative to this directory.
1018
1019IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
1020
1021Available tools:
1022- read: Read file contents
1023- write: Write/create files
1024- edit: Edit existing files (search and replace)
1025- multiedit: Make multiple edits at once
1026- glob: Find files by pattern
1027- grep: Search file contents
1028- bash: Run shell commands (use cwd: \"{}\" parameter)
1029- webfetch: Fetch web pages
1030- prd: Generate structured PRD for complex tasks
1031- ralph: Run autonomous agent loop on a PRD
1032- swarm_share: Share results with other sub-agents running in parallel
1033
1034SHARING RESULTS:
1035Use swarm_share to collaborate with other sub-agents:
1036- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
1037- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
1038- swarm_share({{action: 'list'}}) to see all shared results
1039- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
1040
1041COMPLEX TASKS:
1042If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
10431. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
10442. Break down into user stories with acceptance criteria
10453. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
10464. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
1047
1048NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
1049
1050When done, provide a brief summary of what you accomplished.{agents_md_content}",
1051                    specialty,
1052                    subtask_id,
1053                    working_dir,
1054                    working_dir,
1055                    prd_filename,
1056                    prd_filename,
1057                    prd_filename
1058                );
1059
1060                let user_prompt = if context.is_empty() {
1061                    format!("Complete this task:\n\n{}", instruction)
1062                } else {
1063                    format!(
1064                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
1065                        instruction, context
1066                    )
1067                };
1068
1069                // Emit AgentStarted event
1070                if let Some(ref tx) = event_tx {
1071                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1072                        id: subtask_id.clone(),
1073                        name: subtask_name.clone(),
1074                        status: SubTaskStatus::Running,
1075                        agent_name: Some(format!("agent-{}", subtask_id)),
1076                    });
1077                    let _ = tx.try_send(SwarmEvent::AgentStarted {
1078                        subtask_id: subtask_id.clone(),
1079                        agent_name: format!("agent-{}", subtask_id),
1080                        specialty: specialty.clone(),
1081                    });
1082                }
1083
1084                // Run the agentic loop
1085                // Create per-agent registry with SwarmShareTool for this subtask
1086                let mut agent_registry =
1087                    ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1088                agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1089                    Arc::clone(&agent_result_store),
1090                    subtask_id.clone(),
1091                )));
1092                let registry = Arc::new(agent_registry);
1093
1094                let result = run_agent_loop(
1095                    provider,
1096                    &model,
1097                    &system_prompt,
1098                    &user_prompt,
1099                    tools,
1100                    registry,
1101                    max_steps,
1102                    timeout_secs,
1103                    event_tx.clone(),
1104                    subtask_id.clone(),
1105                    None,
1106                    working_dir_path.clone(),
1107                )
1108                .await;
1109
1110                let task_result = match result {
1111                    Ok((output, steps, tool_calls, exit_reason)) => {
1112                        let (success, status, error) = match exit_reason {
1113                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1114                            AgentLoopExit::MaxStepsReached => (
1115                                false,
1116                                SubTaskStatus::Failed,
1117                                Some(format!("Sub-agent hit max steps ({max_steps})")),
1118                            ),
1119                            AgentLoopExit::TimedOut => (
1120                                false,
1121                                SubTaskStatus::TimedOut,
1122                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
1123                            ),
1124                        };
1125
1126                        // Emit completion events
1127                        if let Some(ref tx) = event_tx {
1128                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1129                                id: subtask_id.clone(),
1130                                name: subtask_name.clone(),
1131                                status,
1132                                agent_name: Some(format!("agent-{}", subtask_id)),
1133                            });
1134                            if let Some(ref message) = error {
1135                                let _ = tx.try_send(SwarmEvent::AgentError {
1136                                    subtask_id: subtask_id.clone(),
1137                                    error: message.clone(),
1138                                });
1139                            }
1140                            let _ = tx.try_send(SwarmEvent::AgentOutput {
1141                                subtask_id: subtask_id.clone(),
1142                                output: output.clone(),
1143                            });
1144                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1145                                subtask_id: subtask_id.clone(),
1146                                success,
1147                                steps,
1148                            });
1149                        }
1150                        Ok(SubTaskResult {
1151                            subtask_id: subtask_id.clone(),
1152                            subagent_id: format!("agent-{}", subtask_id),
1153                            success,
1154                            result: output,
1155                            steps,
1156                            tool_calls,
1157                            execution_time_ms: start.elapsed().as_millis() as u64,
1158                            error,
1159                            artifacts: Vec::new(),
1160                        })
1161                    }
1162                    Err(e) => {
1163                        // Emit error events
1164                        if let Some(ref tx) = event_tx {
1165                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1166                                id: subtask_id.clone(),
1167                                name: subtask_name.clone(),
1168                                status: SubTaskStatus::Failed,
1169                                agent_name: Some(format!("agent-{}", subtask_id)),
1170                            });
1171                            let _ = tx.try_send(SwarmEvent::AgentError {
1172                                subtask_id: subtask_id.clone(),
1173                                error: e.to_string(),
1174                            });
1175                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1176                                subtask_id: subtask_id.clone(),
1177                                success: false,
1178                                steps: 0,
1179                            });
1180                        }
1181                        Ok(SubTaskResult {
1182                            subtask_id: subtask_id.clone(),
1183                            subagent_id: format!("agent-{}", subtask_id),
1184                            success: false,
1185                            result: String::new(),
1186                            steps: 0,
1187                            tool_calls: 0,
1188                            execution_time_ms: start.elapsed().as_millis() as u64,
1189                            error: Some(e.to_string()),
1190                            artifacts: Vec::new(),
1191                        })
1192                    }
1193                };
1194
1195                (subtask_id.clone(), task_result)
1196            });
1197
1198            let abort_handle = handle.abort_handle();
1199            abort_handles.insert(subtask_id_for_handle.clone(), abort_handle);
1200            task_ids.insert(handle.id(), subtask_id_for_handle.clone());
1201            handles.push(handle);
1202        }
1203
1204        // Deterministic collapse loop while branches are still running.
1205        let mut collapse_controller = if worktree_manager.is_some() && active_worktrees.len() > 1 {
1206            Some(CollapseController::new(CollapsePolicy::default()))
1207        } else {
1208            None
1209        };
1210        let mut collapse_tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1211        collapse_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1212        // Consume the immediate first tick so sampling starts after the first interval.
1213        let _ = collapse_tick.tick().await;
1214
1215        while !handles.is_empty() {
1216            tokio::select! {
1217                maybe_join = handles.next() => {
1218                    let Some(joined) = maybe_join else {
1219                        continue;
1220                    };
1221                    match joined {
1222                        Ok((subtask_id, Ok(result))) => {
1223                            abort_handles.remove(&subtask_id);
1224                            let wt = active_worktrees.remove(&subtask_id).or_else(|| all_worktrees.get(&subtask_id).cloned());
1225                            completed_entries.push((result, wt));
1226                        }
1227                        Ok((subtask_id, Err(e))) => {
1228                            abort_handles.remove(&subtask_id);
1229                            active_worktrees.remove(&subtask_id);
1230                            let wt = all_worktrees.get(&subtask_id).cloned();
1231                            completed_entries.push((
1232                                SubTaskResult {
1233                                    subtask_id: subtask_id.clone(),
1234                                    subagent_id: format!("agent-{subtask_id}"),
1235                                    success: false,
1236                                    result: String::new(),
1237                                    steps: 0,
1238                                    tool_calls: 0,
1239                                    execution_time_ms: 0,
1240                                    error: Some(e.to_string()),
1241                                    artifacts: Vec::new(),
1242                                },
1243                                wt,
1244                            ));
1245                        }
1246                        Err(e) => {
1247                            let subtask_id = task_ids
1248                                .remove(&e.id())
1249                                .unwrap_or_else(|| "unknown".to_string());
1250                            abort_handles.remove(&subtask_id);
1251                            active_worktrees.remove(&subtask_id);
1252                            let wt = all_worktrees.get(&subtask_id).cloned();
1253                            completed_entries.push((
1254                                SubTaskResult {
1255                                    subtask_id: subtask_id.clone(),
1256                                    subagent_id: format!("agent-{subtask_id}"),
1257                                    success: false,
1258                                    result: String::new(),
1259                                    steps: 0,
1260                                    tool_calls: 0,
1261                                    execution_time_ms: 0,
1262                                    error: Some(format!("Task join error: {e}")),
1263                                    artifacts: Vec::new(),
1264                                },
1265                                wt,
1266                            ));
1267                        }
1268                    }
1269                }
1270                _ = collapse_tick.tick(), if collapse_controller.is_some() && !active_worktrees.is_empty() => {
1271                    let branches: Vec<BranchRuntimeState> = active_worktrees
1272                        .iter()
1273                        .map(|(subtask_id, wt)| BranchRuntimeState {
1274                            subtask_id: subtask_id.clone(),
1275                            branch: wt.branch.clone(),
1276                            worktree_path: wt.path.clone(),
1277                        })
1278                        .collect();
1279
1280                    if let Some(controller) = collapse_controller.as_mut() {
1281                        match controller.sample(&branches) {
1282                            Ok(tick) => {
1283                                if promoted_subtask_id != tick.promoted_subtask_id {
1284                                    promoted_subtask_id = tick.promoted_subtask_id.clone();
1285                                    if let Some(ref promoted) = promoted_subtask_id {
1286                                        tracing::info!(
1287                                            subtask_id = %promoted,
1288                                            "Collapse controller promoted branch"
1289                                        );
1290                                        if let Some(audit) = crate::audit::try_audit_log() {
1291                                            audit.log_with_correlation(
1292                                                crate::audit::AuditCategory::Swarm,
1293                                                "collapse_promote_branch",
1294                                                crate::audit::AuditOutcome::Success,
1295                                                Some("collapse-controller".to_string()),
1296                                                Some(serde_json::json!({
1297                                                    "swarm_id": swarm_id,
1298                                                    "subtask_id": promoted,
1299                                                })),
1300                                                None,
1301                                                None,
1302                                                Some(swarm_id.to_string()),
1303                                                None,
1304                                            ).await;
1305                                        }
1306                                    }
1307                                }
1308
1309                                for kill in tick.kills {
1310                                    if kill_reasons.contains_key(&kill.subtask_id) {
1311                                        continue;
1312                                    }
1313                                    let Some(abort_handle) = abort_handles.get(&kill.subtask_id) else {
1314                                        continue;
1315                                    };
1316
1317                                    abort_handle.abort();
1318                                    abort_handles.remove(&kill.subtask_id);
1319                                    active_worktrees.remove(&kill.subtask_id);
1320                                    kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1321
1322                                    tracing::warn!(
1323                                        subtask_id = %kill.subtask_id,
1324                                        branch = %kill.branch,
1325                                        reason = %kill.reason,
1326                                        "Collapse controller killed branch"
1327                                    );
1328
1329                                    if let Some(ref tx) = self.event_tx {
1330                                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1331                                            id: kill.subtask_id.clone(),
1332                                            name: kill.subtask_id.clone(),
1333                                            status: SubTaskStatus::Cancelled,
1334                                            agent_name: Some(format!("agent-{}", kill.subtask_id)),
1335                                        });
1336                                        let _ = tx.try_send(SwarmEvent::AgentError {
1337                                            subtask_id: kill.subtask_id.clone(),
1338                                            error: format!("Cancelled by collapse controller: {}", kill.reason),
1339                                        });
1340                                    }
1341
1342                                    if let Some(audit) = crate::audit::try_audit_log() {
1343                                        audit.log_with_correlation(
1344                                            crate::audit::AuditCategory::Swarm,
1345                                            "collapse_kill_branch",
1346                                            crate::audit::AuditOutcome::Success,
1347                                            Some("collapse-controller".to_string()),
1348                                            Some(serde_json::json!({
1349                                                "swarm_id": swarm_id,
1350                                                "subtask_id": kill.subtask_id,
1351                                                "branch": kill.branch,
1352                                                "reason": kill.reason,
1353                                            })),
1354                                            None,
1355                                            None,
1356                                            Some(swarm_id.to_string()),
1357                                            None,
1358                                        ).await;
1359                                    }
1360                                }
1361                            }
1362                            Err(e) => {
1363                                tracing::warn!(error = %e, "Collapse controller sampling failed");
1364                            }
1365                        }
1366                    }
1367                }
1368            }
1369        }
1370
1371        // Merge in deterministic order: promoted branch first when present.
1372        if let Some(ref promoted) = promoted_subtask_id {
1373            completed_entries.sort_by_key(|(result, _)| {
1374                if &result.subtask_id == promoted {
1375                    0usize
1376                } else {
1377                    1usize
1378                }
1379            });
1380        }
1381
1382        let mut results = cached_results;
1383        let auto_merge = self.config.worktree_auto_merge;
1384
1385        for (mut result, worktree_info) in completed_entries {
1386            // Handle worktree merge/cleanup if applicable
1387            if let Some(wt) = worktree_info {
1388                if let Some(reason) = kill_reasons.get(&result.subtask_id) {
1389                    result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1390                    result.result.push_str(&format!(
1391                        "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1392                    ));
1393                    if let Some(ref mgr) = worktree_manager {
1394                        if let Err(e) = mgr.cleanup(&wt.name).await {
1395                            tracing::warn!(error = %e, "Failed to cleanup killed worktree");
1396                        }
1397                    }
1398                } else if result.success && auto_merge {
1399                    if let Some(ref mgr) = worktree_manager {
1400                        match mgr.merge(&wt.name).await {
1401                            Ok(merge_result) => {
1402                                if merge_result.success {
1403                                    tracing::info!(
1404                                        subtask_id = %result.subtask_id,
1405                                        files_changed = merge_result.files_changed,
1406                                        "Merged worktree changes successfully"
1407                                    );
1408                                    result.result.push_str(&format!(
1409                                        "\n\n--- Merge Result ---\n{}",
1410                                        merge_result.summary
1411                                    ));
1412                                } else if merge_result.aborted {
1413                                    tracing::warn!(
1414                                        subtask_id = %result.subtask_id,
1415                                        summary = %merge_result.summary,
1416                                        "Merge was aborted"
1417                                    );
1418                                    result.result.push_str(&format!(
1419                                        "\n\n--- Merge Aborted ---\n{}",
1420                                        merge_result.summary
1421                                    ));
1422                                } else {
1423                                    tracing::warn!(
1424                                        subtask_id = %result.subtask_id,
1425                                        conflicts = ?merge_result.conflicts,
1426                                        "Merge had conflicts"
1427                                    );
1428                                    result.result.push_str(&format!(
1429                                        "\n\n--- Merge Conflicts ---\n{}",
1430                                        merge_result.summary
1431                                    ));
1432                                }
1433                                if let Err(e) = mgr.cleanup(&wt.name).await {
1434                                    tracing::warn!(error = %e, "Failed to cleanup worktree");
1435                                }
1436                            }
1437                            Err(e) => {
1438                                tracing::error!(
1439                                    subtask_id = %result.subtask_id,
1440                                    error = %e,
1441                                    "Failed to merge worktree"
1442                                );
1443                            }
1444                        }
1445                    }
1446                } else if !result.success {
1447                    tracing::info!(
1448                        subtask_id = %result.subtask_id,
1449                        worktree_path = %wt.path.display(),
1450                        "Keeping worktree for debugging (task failed)"
1451                    );
1452                }
1453            }
1454
1455            // Cache successful result
1456            if result.success {
1457                if let Some(ref cache_arc) = self.cache {
1458                    let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1459                        cache_arc.lock().await;
1460                    let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1461                    if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1462                        tracing::warn!(
1463                            subtask_id = %result.subtask_id,
1464                            error = %e,
1465                            "Failed to cache subtask result"
1466                        );
1467                    }
1468                }
1469            }
1470
1471            results.push(result);
1472        }
1473
1474        Ok(results)
1475    }
1476
1477    async fn execute_stage_kubernetes(
1478        &self,
1479        orchestrator: &Orchestrator,
1480        subtasks: Vec<SubTask>,
1481        completed_results: Arc<RwLock<HashMap<String, String>>>,
1482        swarm_id: &str,
1483    ) -> Result<Vec<SubTaskResult>> {
1484        let k8s = K8sManager::new().await;
1485        if !k8s.is_available() {
1486            anyhow::bail!(
1487                "Kubernetes execution mode requested but K8s client is unavailable in this environment"
1488            );
1489        }
1490
1491        let provider_name = orchestrator.provider().to_string();
1492        let model = orchestrator.model().to_string();
1493        let pod_budget = self.config.k8s_pod_budget.max(1);
1494        let mut pending: VecDeque<SubTask> = subtasks.into_iter().collect();
1495        let mut active: HashMap<String, ActiveK8sBranch> = HashMap::new();
1496        let mut subtask_names: HashMap<String, String> = HashMap::new();
1497        let mut results: Vec<SubTaskResult> = Vec::new();
1498        let mut kill_reasons: HashMap<String, String> = HashMap::new();
1499        let mut promoted_subtask_id: Option<String> = None;
1500        let mut collapse_controller = CollapseController::new(CollapsePolicy::default());
1501
1502        let mut tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1503        tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1504        let _ = tick.tick().await;
1505
1506        loop {
1507            while active.len() < pod_budget {
1508                let Some(subtask) = pending.pop_front() else {
1509                    break;
1510                };
1511
1512                if let Some(ref cache) = self.cache {
1513                    let mut cache_guard = cache.lock().await;
1514                    if let Some(cached_result) = cache_guard.get(&subtask).await {
1515                        tracing::info!(
1516                            subtask_id = %subtask.id,
1517                            task_name = %subtask.name,
1518                            "Cache hit for subtask, skipping Kubernetes execution"
1519                        );
1520                        self.try_send_event(SwarmEvent::SubTaskUpdate {
1521                            id: subtask.id.clone(),
1522                            name: subtask.name.clone(),
1523                            status: SubTaskStatus::Completed,
1524                            agent_name: Some("cached".to_string()),
1525                        });
1526                        results.push(cached_result);
1527                        continue;
1528                    }
1529                }
1530
1531                let context = {
1532                    let completed = completed_results.read().await;
1533                    let mut dep_context = String::new();
1534                    for dep_id in &subtask.dependencies {
1535                        if let Some(result) = completed.get(dep_id) {
1536                            dep_context.push_str(&format!(
1537                                "\n--- Result from dependency {} ---\n{}\n",
1538                                dep_id, result
1539                            ));
1540                        }
1541                    }
1542                    dep_context
1543                };
1544
1545                let payload = RemoteSubtaskPayload {
1546                    swarm_id: swarm_id.to_string(),
1547                    subtask_id: subtask.id.clone(),
1548                    subtask_name: subtask.name.clone(),
1549                    specialty: subtask.specialty.clone().unwrap_or_default(),
1550                    instruction: subtask.instruction.clone(),
1551                    context: context.clone(),
1552                    provider: provider_name.clone(),
1553                    model: model.clone(),
1554                    max_steps: self.config.max_steps_per_subagent,
1555                    timeout_secs: self.config.subagent_timeout_secs,
1556                    working_dir: self.config.working_dir.clone(),
1557                    probe_interval_secs: COLLAPSE_SAMPLE_SECS,
1558                };
1559                let payload_b64 = match encode_payload(&payload) {
1560                    Ok(payload) => payload,
1561                    Err(error) => {
1562                        let error_text = format!("Failed to encode remote payload: {error}");
1563                        tracing::error!(subtask_id = %subtask.id, error = %error, "K8s payload encoding failed");
1564                        self.try_send_event(SwarmEvent::SubTaskUpdate {
1565                            id: subtask.id.clone(),
1566                            name: subtask.name.clone(),
1567                            status: SubTaskStatus::Failed,
1568                            agent_name: Some("k8s-encoder".to_string()),
1569                        });
1570                        self.try_send_event(SwarmEvent::AgentError {
1571                            subtask_id: subtask.id.clone(),
1572                            error: error_text.clone(),
1573                        });
1574                        results.push(SubTaskResult {
1575                            subtask_id: subtask.id.clone(),
1576                            subagent_id: format!("agent-{}", subtask.id),
1577                            success: false,
1578                            result: String::new(),
1579                            steps: 0,
1580                            tool_calls: 0,
1581                            execution_time_ms: 0,
1582                            error: Some(error_text),
1583                            artifacts: Vec::new(),
1584                        });
1585                        continue;
1586                    }
1587                };
1588
1589                let mut env_vars = HashMap::new();
1590                env_vars.insert(SWARM_SUBTASK_PAYLOAD_ENV.to_string(), payload_b64);
1591                for key in K8S_PASSTHROUGH_ENV_VARS {
1592                    if let Ok(value) = std::env::var(key)
1593                        && !value.trim().is_empty()
1594                    {
1595                        env_vars.insert((*key).to_string(), value);
1596                    }
1597                }
1598                let fallback_prompt = if context.trim().is_empty() {
1599                    format!(
1600                        "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1601Return only the final subtask answer.",
1602                        subtask.id, subtask.instruction
1603                    )
1604                } else {
1605                    format!(
1606                        "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1607Dependency context:\n{}\n\nReturn only the final subtask answer.",
1608                        subtask.id, subtask.instruction, context
1609                    )
1610                };
1611                env_vars.insert(SWARM_FALLBACK_PROMPT_ENV.to_string(), fallback_prompt);
1612                env_vars.insert(SWARM_FALLBACK_MODEL_ENV.to_string(), model.clone());
1613
1614                let mut labels = HashMap::new();
1615                labels.insert("codetether.run/swarm-id".to_string(), swarm_id.to_string());
1616                labels.insert(
1617                    "codetether.run/stage".to_string(),
1618                    subtask.stage.to_string(),
1619                );
1620
1621                let spec = SubagentPodSpec {
1622                    image: self.config.k8s_subagent_image.clone(),
1623                    env_vars,
1624                    labels,
1625                    command: Some(vec!["sh".to_string(), "-lc".to_string()]),
1626                    args: Some(vec![
1627                        format!(
1628                            "if codetether help swarm-subagent >/dev/null 2>&1; then \
1629exec codetether swarm-subagent --payload-env {payload_env}; \
1630else \
1631exec codetether run \"$${fallback_prompt_env}\" --model \"$${fallback_model_env}\"; \
1632fi",
1633                            payload_env = SWARM_SUBTASK_PAYLOAD_ENV,
1634                            fallback_prompt_env = SWARM_FALLBACK_PROMPT_ENV,
1635                            fallback_model_env = SWARM_FALLBACK_MODEL_ENV,
1636                        )
1637                        .replace("$$", "$"),
1638                    ]),
1639                };
1640
1641                if let Err(error) = k8s.spawn_subagent_pod_with_spec(&subtask.id, spec).await {
1642                    let error_text = format!("Failed to spawn Kubernetes pod: {error}");
1643                    tracing::error!(subtask_id = %subtask.id, error = %error, "K8s sub-agent pod spawn failed");
1644                    self.try_send_event(SwarmEvent::SubTaskUpdate {
1645                        id: subtask.id.clone(),
1646                        name: subtask.name.clone(),
1647                        status: SubTaskStatus::Failed,
1648                        agent_name: Some("k8s-spawn".to_string()),
1649                    });
1650                    self.try_send_event(SwarmEvent::AgentError {
1651                        subtask_id: subtask.id.clone(),
1652                        error: error_text.clone(),
1653                    });
1654                    results.push(SubTaskResult {
1655                        subtask_id: subtask.id.clone(),
1656                        subagent_id: format!("agent-{}", subtask.id),
1657                        success: false,
1658                        result: String::new(),
1659                        steps: 0,
1660                        tool_calls: 0,
1661                        execution_time_ms: 0,
1662                        error: Some(error_text),
1663                        artifacts: Vec::new(),
1664                    });
1665                    continue;
1666                }
1667
1668                let branch = K8sManager::subagent_pod_name(&subtask.id);
1669                subtask_names.insert(subtask.id.clone(), subtask.name.clone());
1670                active.insert(
1671                    subtask.id.clone(),
1672                    ActiveK8sBranch {
1673                        branch: branch.clone(),
1674                        started_at: Instant::now(),
1675                    },
1676                );
1677
1678                self.try_send_event(SwarmEvent::SubTaskUpdate {
1679                    id: subtask.id.clone(),
1680                    name: subtask.name.clone(),
1681                    status: SubTaskStatus::Running,
1682                    agent_name: Some(format!("k8s-{branch}")),
1683                });
1684                self.try_send_event(SwarmEvent::AgentStarted {
1685                    subtask_id: subtask.id.clone(),
1686                    agent_name: format!("k8s-{branch}"),
1687                    specialty: subtask
1688                        .specialty
1689                        .clone()
1690                        .unwrap_or_else(|| "generalist".to_string()),
1691                });
1692
1693                tracing::info!(
1694                    subtask_id = %subtask.id,
1695                    pod = %branch,
1696                    "Spawned Kubernetes sub-agent pod"
1697                );
1698            }
1699
1700            if pending.is_empty() && active.is_empty() {
1701                break;
1702            }
1703
1704            tick.tick().await;
1705
1706            let active_ids: Vec<String> = active.keys().cloned().collect();
1707            let mut finished_results: Vec<SubTaskResult> = Vec::new();
1708            for subtask_id in active_ids {
1709                let Some(active_state) = active.get(&subtask_id).cloned() else {
1710                    continue;
1711                };
1712
1713                if active_state.started_at.elapsed()
1714                    > Duration::from_secs(self.config.subagent_timeout_secs)
1715                {
1716                    let reason = format!(
1717                        "Timed out after {}s in Kubernetes pod",
1718                        self.config.subagent_timeout_secs
1719                    );
1720                    kill_reasons.insert(subtask_id.clone(), reason.clone());
1721                    if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1722                        tracing::warn!(
1723                            subtask_id = %subtask_id,
1724                            error = %error,
1725                            "Failed deleting timed-out Kubernetes pod"
1726                        );
1727                    }
1728                    active.remove(&subtask_id);
1729                    finished_results.push(SubTaskResult {
1730                        subtask_id: subtask_id.clone(),
1731                        subagent_id: format!("agent-{subtask_id}"),
1732                        success: false,
1733                        result: String::new(),
1734                        steps: 0,
1735                        tool_calls: 0,
1736                        execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1737                        error: Some(reason),
1738                        artifacts: Vec::new(),
1739                    });
1740                    continue;
1741                }
1742
1743                let pod_state = match k8s.get_subagent_pod_state(&subtask_id).await {
1744                    Ok(state) => state,
1745                    Err(error) => {
1746                        tracing::warn!(
1747                            subtask_id = %subtask_id,
1748                            error = %error,
1749                            "Failed to query Kubernetes pod state for sub-agent"
1750                        );
1751                        continue;
1752                    }
1753                };
1754                let Some(pod_state) = pod_state else {
1755                    active.remove(&subtask_id);
1756                    finished_results.push(SubTaskResult {
1757                        subtask_id: subtask_id.clone(),
1758                        subagent_id: format!("agent-{subtask_id}"),
1759                        success: false,
1760                        result: String::new(),
1761                        steps: 0,
1762                        tool_calls: 0,
1763                        execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1764                        error: Some("Sub-agent pod disappeared".to_string()),
1765                        artifacts: Vec::new(),
1766                    });
1767                    continue;
1768                };
1769
1770                let phase = pod_state.phase.to_ascii_lowercase();
1771                let finished = pod_state.terminated || phase == "succeeded" || phase == "failed";
1772                if !finished {
1773                    continue;
1774                }
1775
1776                let logs = k8s
1777                    .subagent_logs(&subtask_id, 10_000)
1778                    .await
1779                    .unwrap_or_default();
1780                let mut result = result_from_logs(&logs).unwrap_or_else(|| SubTaskResult {
1781                    subtask_id: subtask_id.clone(),
1782                    subagent_id: format!("agent-{subtask_id}"),
1783                    success: pod_state.exit_code.unwrap_or(1) == 0,
1784                    result: logs,
1785                    steps: 0,
1786                    tool_calls: 0,
1787                    execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1788                    error: if pod_state.exit_code.unwrap_or(1) == 0 {
1789                        None
1790                    } else {
1791                        Some(
1792                            pod_state
1793                                .reason
1794                                .clone()
1795                                .unwrap_or_else(|| "Remote sub-agent failed".to_string()),
1796                        )
1797                    },
1798                    artifacts: Vec::new(),
1799                });
1800
1801                if let Some(reason) = kill_reasons.get(&subtask_id) {
1802                    result.success = false;
1803                    result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1804                    result.result.push_str(&format!(
1805                        "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1806                    ));
1807                }
1808
1809                active.remove(&subtask_id);
1810                if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1811                    tracing::warn!(
1812                        subtask_id = %subtask_id,
1813                        error = %error,
1814                        "Failed deleting completed Kubernetes pod"
1815                    );
1816                }
1817                finished_results.push(result);
1818            }
1819
1820            for result in finished_results {
1821                if result.success {
1822                    completed_results
1823                        .write()
1824                        .await
1825                        .insert(result.subtask_id.clone(), result.result.clone());
1826                }
1827                if result.success {
1828                    if let Some(ref cache_arc) = self.cache {
1829                        let mut cache_guard = cache_arc.lock().await;
1830                        let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1831                        let _ = cache_guard.put(&cache_subtask, &result).await;
1832                    }
1833                }
1834
1835                self.try_send_event(SwarmEvent::SubTaskUpdate {
1836                    id: result.subtask_id.clone(),
1837                    name: subtask_names
1838                        .get(&result.subtask_id)
1839                        .cloned()
1840                        .unwrap_or_else(|| result.subtask_id.clone()),
1841                    status: if result.success {
1842                        SubTaskStatus::Completed
1843                    } else {
1844                        SubTaskStatus::Failed
1845                    },
1846                    agent_name: Some(format!("k8s-{}", result.subtask_id)),
1847                });
1848                if let Some(ref error) = result.error {
1849                    self.try_send_event(SwarmEvent::AgentError {
1850                        subtask_id: result.subtask_id.clone(),
1851                        error: error.clone(),
1852                    });
1853                }
1854                self.try_send_event(SwarmEvent::AgentOutput {
1855                    subtask_id: result.subtask_id.clone(),
1856                    output: result.result.clone(),
1857                });
1858                self.try_send_event(SwarmEvent::AgentComplete {
1859                    subtask_id: result.subtask_id.clone(),
1860                    success: result.success,
1861                    steps: result.steps,
1862                });
1863                results.push(result);
1864            }
1865
1866            if active.len() > 1 {
1867                let mut observations = Vec::with_capacity(active.len());
1868                for (subtask_id, state) in &active {
1869                    let pod_state = match k8s.get_subagent_pod_state(subtask_id).await {
1870                        Ok(state) => state,
1871                        Err(error) => {
1872                            tracing::warn!(
1873                                subtask_id = %subtask_id,
1874                                error = %error,
1875                                "Failed to query pod state while sampling branch observation"
1876                            );
1877                            None
1878                        }
1879                    };
1880                    let (resource_health_score, infra_unhealthy_signals) =
1881                        compute_resource_health(pod_state.as_ref());
1882
1883                    let logs = k8s.subagent_logs(subtask_id, 500).await.unwrap_or_default();
1884                    if let Some(probe) = latest_probe_from_logs(&logs) {
1885                        let compile_ok = pod_state
1886                            .as_ref()
1887                            .map(|p| probe.compile_ok && p.phase.to_ascii_lowercase() != "failed")
1888                            .unwrap_or(probe.compile_ok);
1889                        observations.push(BranchObservation {
1890                            subtask_id: subtask_id.clone(),
1891                            branch: state.branch.clone(),
1892                            compile_ok,
1893                            changed_files: probe_changed_files_set(&probe),
1894                            changed_lines: probe.changed_lines,
1895                            resource_health_score,
1896                            infra_unhealthy_signals,
1897                        });
1898                        continue;
1899                    }
1900                    let compile_ok = pod_state
1901                        .as_ref()
1902                        .map(|p| p.phase.to_ascii_lowercase() != "failed")
1903                        .unwrap_or(false);
1904                    observations.push(BranchObservation {
1905                        subtask_id: subtask_id.clone(),
1906                        branch: state.branch.clone(),
1907                        compile_ok,
1908                        changed_files: std::collections::HashSet::new(),
1909                        changed_lines: 0,
1910                        resource_health_score,
1911                        infra_unhealthy_signals,
1912                    });
1913                }
1914
1915                let tick = collapse_controller.sample_observations(&observations);
1916                if promoted_subtask_id != tick.promoted_subtask_id {
1917                    promoted_subtask_id = tick.promoted_subtask_id.clone();
1918                    if let Some(ref promoted) = promoted_subtask_id {
1919                        tracing::info!(subtask_id = %promoted, "Collapse controller promoted branch");
1920                        if let Some(audit) = crate::audit::try_audit_log() {
1921                            audit
1922                                .log_with_correlation(
1923                                    crate::audit::AuditCategory::Swarm,
1924                                    "collapse_promote_branch",
1925                                    crate::audit::AuditOutcome::Success,
1926                                    Some("collapse-controller".to_string()),
1927                                    Some(serde_json::json!({
1928                                        "swarm_id": swarm_id,
1929                                        "subtask_id": promoted,
1930                                        "execution_mode": "kubernetes_pod",
1931                                    })),
1932                                    None,
1933                                    None,
1934                                    Some(swarm_id.to_string()),
1935                                    None,
1936                                )
1937                                .await;
1938                        }
1939                    }
1940                }
1941
1942                for kill in tick.kills {
1943                    if kill_reasons.contains_key(&kill.subtask_id) {
1944                        continue;
1945                    }
1946                    if !active.contains_key(&kill.subtask_id) {
1947                        continue;
1948                    }
1949
1950                    if let Err(error) = k8s.delete_subagent_pod(&kill.subtask_id).await {
1951                        tracing::warn!(
1952                            subtask_id = %kill.subtask_id,
1953                            error = %error,
1954                            "Failed deleting Kubernetes pod after collapse kill"
1955                        );
1956                    }
1957                    kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1958                    let elapsed_ms = active
1959                        .remove(&kill.subtask_id)
1960                        .map(|s| s.started_at.elapsed().as_millis() as u64)
1961                        .unwrap_or(0);
1962
1963                    tracing::warn!(
1964                        subtask_id = %kill.subtask_id,
1965                        branch = %kill.branch,
1966                        reason = %kill.reason,
1967                        "Collapse controller killed Kubernetes branch"
1968                    );
1969
1970                    if let Some(audit) = crate::audit::try_audit_log() {
1971                        audit
1972                            .log_with_correlation(
1973                                crate::audit::AuditCategory::Swarm,
1974                                "collapse_kill_branch",
1975                                crate::audit::AuditOutcome::Success,
1976                                Some("collapse-controller".to_string()),
1977                                Some(serde_json::json!({
1978                                    "swarm_id": swarm_id,
1979                                    "subtask_id": kill.subtask_id.clone(),
1980                                    "branch": kill.branch.clone(),
1981                                    "reason": kill.reason.clone(),
1982                                    "execution_mode": "kubernetes_pod",
1983                                })),
1984                                None,
1985                                None,
1986                                Some(swarm_id.to_string()),
1987                                None,
1988                            )
1989                            .await;
1990                    }
1991
1992                    self.try_send_event(SwarmEvent::SubTaskUpdate {
1993                        id: kill.subtask_id.clone(),
1994                        name: subtask_names
1995                            .get(&kill.subtask_id)
1996                            .cloned()
1997                            .unwrap_or_else(|| kill.subtask_id.clone()),
1998                        status: SubTaskStatus::Cancelled,
1999                        agent_name: Some(format!("agent-{}", kill.subtask_id)),
2000                    });
2001                    self.try_send_event(SwarmEvent::AgentError {
2002                        subtask_id: kill.subtask_id.clone(),
2003                        error: format!("Cancelled by collapse controller: {}", kill.reason),
2004                    });
2005
2006                    results.push(SubTaskResult {
2007                        subtask_id: kill.subtask_id.clone(),
2008                        subagent_id: format!("agent-{}", kill.subtask_id),
2009                        success: false,
2010                        result: format!(
2011                            "\n\n--- Collapse Controller ---\nBranch terminated: {}",
2012                            kill.reason
2013                        ),
2014                        steps: 0,
2015                        tool_calls: 0,
2016                        execution_time_ms: elapsed_ms,
2017                        error: Some(format!("Cancelled by collapse controller: {}", kill.reason)),
2018                        artifacts: Vec::new(),
2019                    });
2020                }
2021            }
2022        }
2023
2024        if let Some(ref promoted) = promoted_subtask_id {
2025            results.sort_by_key(|result| {
2026                if &result.subtask_id == promoted {
2027                    0usize
2028                } else {
2029                    1usize
2030                }
2031            });
2032        }
2033
2034        if !active.is_empty() {
2035            let residual_ids: Vec<String> = active.keys().cloned().collect();
2036            for subtask_id in residual_ids {
2037                if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
2038                    tracing::warn!(
2039                        subtask_id = %subtask_id,
2040                        error = %error,
2041                        "Failed deleting residual Kubernetes pod at stage end"
2042                    );
2043                }
2044            }
2045        }
2046
2047        Ok(results)
2048    }
2049
2050    /// Aggregate results from all subtasks into a final result
2051    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
2052        let mut aggregated = String::new();
2053
2054        for (i, result) in results.iter().enumerate() {
2055            if result.success {
2056                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
2057            } else {
2058                aggregated.push_str(&format!(
2059                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
2060                    i + 1,
2061                    result.error.as_deref().unwrap_or("Unknown error")
2062                ));
2063            }
2064        }
2065
2066        Ok(aggregated)
2067    }
2068
2069    /// Execute a single task without decomposition (for simple cases)
2070    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
2071        self.execute(task, DecompositionStrategy::None).await
2072    }
2073}
2074
2075/// Builder for swarm execution
2076pub struct SwarmExecutorBuilder {
2077    config: SwarmConfig,
2078}
2079
2080impl SwarmExecutorBuilder {
2081    pub fn new() -> Self {
2082        Self {
2083            config: SwarmConfig::default(),
2084        }
2085    }
2086
2087    pub fn max_subagents(mut self, max: usize) -> Self {
2088        self.config.max_subagents = max;
2089        self
2090    }
2091
2092    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
2093        self.config.max_steps_per_subagent = max;
2094        self
2095    }
2096
2097    pub fn max_total_steps(mut self, max: usize) -> Self {
2098        self.config.max_total_steps = max;
2099        self
2100    }
2101
2102    pub fn timeout_secs(mut self, secs: u64) -> Self {
2103        self.config.subagent_timeout_secs = secs;
2104        self
2105    }
2106
2107    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
2108        self.config.parallel_enabled = enabled;
2109        self
2110    }
2111
2112    pub fn build(self) -> SwarmExecutor {
2113        SwarmExecutor::new(self.config)
2114    }
2115}
2116
2117impl Default for SwarmExecutorBuilder {
2118    fn default() -> Self {
2119        Self::new()
2120    }
2121}
2122
2123/// Run the agentic loop for a sub-agent with tool execution
2124#[allow(clippy::too_many_arguments)]
2125/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
2126/// Resolve relative file paths in tool call arguments to use the given working directory.
2127///
2128/// When sub-agents run in worktrees, the file tools (read, write, edit, etc.) resolve
2129/// relative paths from the process CWD (the main repo), not the worktree. This function
2130/// rewrites relative paths to absolute paths within the worktree before tool execution.
2131fn resolve_tool_paths(
2132    tool_name: &str,
2133    args: &mut serde_json::Value,
2134    working_dir: &std::path::Path,
2135) {
2136    match tool_name {
2137        "read" | "write" | "list" | "grep" | "codesearch" => {
2138            if let Some(path) = args.get("path").and_then(|v| v.as_str()).map(String::from) {
2139                if !std::path::Path::new(&path).is_absolute() {
2140                    args["path"] = serde_json::json!(working_dir.join(&path).display().to_string());
2141                }
2142            }
2143        }
2144        "edit" => {
2145            if let Some(path) = args
2146                .get("filePath")
2147                .and_then(|v| v.as_str())
2148                .map(String::from)
2149            {
2150                if !std::path::Path::new(&path).is_absolute() {
2151                    args["filePath"] =
2152                        serde_json::json!(working_dir.join(&path).display().to_string());
2153                }
2154            }
2155        }
2156        "glob" => {
2157            if let Some(pattern) = args
2158                .get("pattern")
2159                .and_then(|v| v.as_str())
2160                .map(String::from)
2161            {
2162                if !std::path::Path::new(&pattern).is_absolute() && !pattern.starts_with("*") {
2163                    args["pattern"] =
2164                        serde_json::json!(working_dir.join(&pattern).display().to_string());
2165                }
2166            }
2167        }
2168        "multiedit" => {
2169            if let Some(edits) = args.get_mut("edits").and_then(|v| v.as_array_mut()) {
2170                for edit in edits.iter_mut() {
2171                    if let Some(file) = edit.get("file").and_then(|v| v.as_str()).map(String::from)
2172                    {
2173                        if !std::path::Path::new(&file).is_absolute() {
2174                            edit["file"] =
2175                                serde_json::json!(working_dir.join(&file).display().to_string());
2176                        }
2177                    }
2178                }
2179            }
2180        }
2181        "patch" => {
2182            if let Some(path) = args.get("file").and_then(|v| v.as_str()).map(String::from) {
2183                if !std::path::Path::new(&path).is_absolute() {
2184                    args["file"] = serde_json::json!(working_dir.join(&path).display().to_string());
2185                }
2186            }
2187        }
2188        "bash" => {
2189            // If bash has no cwd, set it to the working directory
2190            if args.get("cwd").and_then(|v| v.as_str()).is_none() {
2191                args["cwd"] = serde_json::json!(working_dir.display().to_string());
2192            }
2193        }
2194        _ => {}
2195    }
2196}
2197
2198pub async fn run_agent_loop(
2199    provider: Arc<dyn Provider>,
2200    model: &str,
2201    system_prompt: &str,
2202    user_prompt: &str,
2203    tools: Vec<crate::provider::ToolDefinition>,
2204    registry: Arc<ToolRegistry>,
2205    max_steps: usize,
2206    timeout_secs: u64,
2207    event_tx: Option<mpsc::Sender<SwarmEvent>>,
2208    subtask_id: String,
2209    bus: Option<Arc<AgentBus>>,
2210    working_dir: Option<std::path::PathBuf>,
2211) -> Result<(String, usize, usize, AgentLoopExit)> {
2212    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
2213    let temperature = 0.7;
2214
2215    tracing::info!(
2216        model = %model,
2217        max_steps = max_steps,
2218        timeout_secs = timeout_secs,
2219        "Sub-agent starting agentic loop"
2220    );
2221    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
2222    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
2223
2224    // Initialize conversation with system and user messages
2225    let mut messages = vec![
2226        Message {
2227            role: Role::System,
2228            content: vec![ContentPart::Text {
2229                text: system_prompt.to_string(),
2230            }],
2231        },
2232        Message {
2233            role: Role::User,
2234            content: vec![ContentPart::Text {
2235                text: user_prompt.to_string(),
2236            }],
2237        },
2238    ];
2239
2240    let mut steps = 0;
2241    let mut total_tool_calls = 0;
2242    let mut final_output = String::new();
2243
2244    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
2245
2246    let exit_reason = loop {
2247        if steps >= max_steps {
2248            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
2249            break AgentLoopExit::MaxStepsReached;
2250        }
2251
2252        if Instant::now() > deadline {
2253            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
2254            break AgentLoopExit::TimedOut;
2255        }
2256
2257        steps += 1;
2258        tracing::info!(step = steps, "Sub-agent step starting");
2259
2260        // Check context size and truncate if approaching limit
2261        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
2262
2263        let request = CompletionRequest {
2264            messages: messages.clone(),
2265            tools: tools.clone(),
2266            model: model.to_string(),
2267            temperature: Some(temperature),
2268            top_p: None,
2269            max_tokens: Some(8192),
2270            stop: Vec::new(),
2271        };
2272
2273        let step_start = Instant::now();
2274        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
2275        let step_duration = step_start.elapsed();
2276
2277        tracing::info!(
2278            step = steps,
2279            duration_ms = step_duration.as_millis() as u64,
2280            finish_reason = ?response.finish_reason,
2281            prompt_tokens = response.usage.prompt_tokens,
2282            completion_tokens = response.usage.completion_tokens,
2283            "Sub-agent step completed LLM call"
2284        );
2285
2286        // Extract text from response
2287        let mut text_parts = Vec::new();
2288        let mut thinking_parts = Vec::new();
2289        let mut tool_calls = Vec::new();
2290
2291        for part in &response.message.content {
2292            match part {
2293                ContentPart::Text { text } => {
2294                    text_parts.push(text.clone());
2295                }
2296                ContentPart::Thinking { text } if !text.is_empty() => {
2297                    thinking_parts.push(text.clone());
2298                }
2299                ContentPart::ToolCall {
2300                    id,
2301                    name,
2302                    arguments,
2303                    ..
2304                } => {
2305                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
2306                }
2307                _ => {}
2308            }
2309        }
2310
2311        // Publish thinking/reasoning to bus for training data capture
2312        if !thinking_parts.is_empty() {
2313            if let Some(ref bus) = bus {
2314                let thinking_text = thinking_parts.join("\n");
2315                let handle = bus.handle(&subtask_id);
2316                handle.send(
2317                    format!("agent.{subtask_id}.thinking"),
2318                    BusMessage::AgentThinking {
2319                        agent_id: subtask_id.clone(),
2320                        thinking: thinking_text,
2321                        step: steps,
2322                    },
2323                );
2324            }
2325        }
2326
2327        // Log assistant output
2328        if !text_parts.is_empty() {
2329            let step_output = text_parts.join("\n");
2330            if !final_output.is_empty() {
2331                final_output.push('\n');
2332            }
2333            final_output.push_str(&step_output);
2334            tracing::info!(
2335                step = steps,
2336                output_len = final_output.len(),
2337                "Sub-agent text output"
2338            );
2339            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
2340
2341            // Emit assistant message event for TUI detail view
2342            if let Some(ref tx) = event_tx {
2343                let preview = if step_output.len() > 500 {
2344                    let mut end = 500;
2345                    while end > 0 && !step_output.is_char_boundary(end) {
2346                        end -= 1;
2347                    }
2348                    format!("{}...", &step_output[..end])
2349                } else {
2350                    step_output.clone()
2351                };
2352                let _ = tx.try_send(SwarmEvent::AgentMessage {
2353                    subtask_id: subtask_id.clone(),
2354                    entry: AgentMessageEntry {
2355                        role: "assistant".to_string(),
2356                        content: preview,
2357                        is_tool_call: false,
2358                    },
2359                });
2360            }
2361        }
2362
2363        // Log tool calls
2364        if !tool_calls.is_empty() {
2365            tracing::info!(
2366                step = steps,
2367                num_tool_calls = tool_calls.len(),
2368                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
2369                "Sub-agent requesting tool calls"
2370            );
2371        }
2372
2373        // Add assistant message to history
2374        messages.push(response.message.clone());
2375
2376        // If no tool calls or stop, we're done
2377        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
2378            tracing::info!(
2379                steps = steps,
2380                total_tool_calls = total_tool_calls,
2381                "Sub-agent finished"
2382            );
2383            break AgentLoopExit::Completed;
2384        }
2385
2386        // Execute tool calls
2387        let mut tool_results = Vec::new();
2388
2389        for (call_id, tool_name, arguments) in tool_calls {
2390            total_tool_calls += 1;
2391
2392            // Emit tool call event
2393            if let Some(ref tx) = event_tx {
2394                let _ = tx.try_send(SwarmEvent::AgentToolCall {
2395                    subtask_id: subtask_id.clone(),
2396                    tool_name: tool_name.clone(),
2397                });
2398            }
2399
2400            tracing::info!(
2401                step = steps,
2402                tool_call_id = %call_id,
2403                tool = %tool_name,
2404                "Executing tool"
2405            );
2406            tracing::debug!(
2407                tool = %tool_name,
2408                arguments = %arguments,
2409                "Tool call arguments"
2410            );
2411
2412            let tool_start = Instant::now();
2413            let mut tool_success = true;
2414            let result = if let Some(tool) = registry.get(&tool_name) {
2415                // Parse arguments as JSON
2416                let mut args: serde_json::Value =
2417                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
2418                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
2419                        serde_json::json!({})
2420                    });
2421
2422                // Resolve relative file paths to the working directory (critical for worktree isolation)
2423                if let Some(ref wd) = working_dir {
2424                    resolve_tool_paths(&tool_name, &mut args, wd);
2425                }
2426
2427                match tool.execute(args).await {
2428                    Ok(r) => {
2429                        if r.success {
2430                            tracing::info!(
2431                                tool = %tool_name,
2432                                duration_ms = tool_start.elapsed().as_millis() as u64,
2433                                success = true,
2434                                "Tool execution completed"
2435                            );
2436                            r.output
2437                        } else {
2438                            tool_success = false;
2439                            tracing::warn!(
2440                                tool = %tool_name,
2441                                error = %r.output,
2442                                "Tool returned error"
2443                            );
2444                            format!("Tool error: {}", r.output)
2445                        }
2446                    }
2447                    Err(e) => {
2448                        tool_success = false;
2449                        tracing::error!(
2450                            tool = %tool_name,
2451                            error = %e,
2452                            "Tool execution failed"
2453                        );
2454                        format!("Tool execution failed: {}", e)
2455                    }
2456                }
2457            } else {
2458                tool_success = false;
2459                tracing::error!(tool = %tool_name, "Unknown tool requested");
2460                format!("Unknown tool: {}", tool_name)
2461            };
2462
2463            // Emit detailed tool call event
2464            if let Some(ref tx) = event_tx {
2465                let input_preview = if arguments.len() > 200 {
2466                    let mut end = 200;
2467                    while end > 0 && !arguments.is_char_boundary(end) {
2468                        end -= 1;
2469                    }
2470                    format!("{}...", &arguments[..end])
2471                } else {
2472                    arguments.clone()
2473                };
2474                let output_preview = if result.len() > 500 {
2475                    let mut end = 500;
2476                    while end > 0 && !result.is_char_boundary(end) {
2477                        end -= 1;
2478                    }
2479                    format!("{}...", &result[..end])
2480                } else {
2481                    result.clone()
2482                };
2483                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
2484                    subtask_id: subtask_id.clone(),
2485                    detail: AgentToolCallDetail {
2486                        tool_name: tool_name.clone(),
2487                        input_preview,
2488                        output_preview,
2489                        success: tool_success,
2490                    },
2491                });
2492            }
2493
2494            tracing::debug!(
2495                tool = %tool_name,
2496                result_len = result.len(),
2497                "Tool result"
2498            );
2499
2500            // Publish full, untruncated tool output to the bus so other
2501            // agents and the bus log see the complete result.
2502            if let Some(ref bus) = bus {
2503                let handle = bus.handle(&subtask_id);
2504                handle.send(
2505                    format!("tools.{tool_name}"),
2506                    BusMessage::ToolOutputFull {
2507                        agent_id: subtask_id.clone(),
2508                        tool_name: tool_name.clone(),
2509                        output: result.clone(),
2510                        success: tool_success,
2511                        step: steps,
2512                    },
2513                );
2514            }
2515
2516            // Process large results with RLM or truncate smaller ones
2517            let result = if result.len() > RLM_THRESHOLD_CHARS {
2518                // Use RLM for very large results
2519                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
2520                    .await
2521            } else {
2522                // Simple truncation for medium results
2523                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
2524            };
2525
2526            tool_results.push((call_id, tool_name, result));
2527        }
2528
2529        // Add tool results to conversation
2530        for (call_id, _tool_name, result) in tool_results {
2531            messages.push(Message {
2532                role: Role::Tool,
2533                content: vec![ContentPart::ToolResult {
2534                    tool_call_id: call_id,
2535                    content: result,
2536                }],
2537            });
2538        }
2539
2540        // Reset deadline after each successful step — agent is making progress
2541        deadline = Instant::now() + Duration::from_secs(timeout_secs);
2542    };
2543
2544    Ok((final_output, steps, total_tool_calls, exit_reason))
2545}