Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    BranchObservation, BranchRuntimeState, CacheConfig, CacheStats, CollapseController,
8    CollapsePolicy, DecompositionStrategy, ExecutionMode, StageStats, SwarmCache, SwarmConfig,
9    SwarmResult, k8s_result,
10    kubernetes_executor::{
11        RemoteSubtaskPayload, SWARM_SUBTASK_PAYLOAD_ENV, encode_payload, latest_probe_from_logs,
12        probe_changed_files_set,
13    },
14    orchestrator::Orchestrator,
15    result_store::ResultStore,
16    subtask::{SubTask, SubTaskResult, SubTaskStatus},
17    tool_policy,
18};
19use crate::bus::{AgentBus, BusMessage};
20use crate::k8s::{K8sManager, SubagentPodSpec, SubagentPodState};
21use crate::session::delegation::DelegationState;
22use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
23use std::sync::Mutex as StdMutex;
24
25// Re-export swarm types for convenience
26pub use super::SwarmMessage;
27use crate::{
28    agent::Agent,
29    provenance::{ExecutionOrigin, ExecutionProvenance},
30    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
31    rlm::RlmExecutor,
32    session::helper::runtime::enrich_tool_input_with_runtime_context,
33    swarm::{SwarmArtifact, SwarmStats},
34    telemetry::SwarmTelemetryCollector,
35    tool::ToolRegistry,
36    worktree::{WorktreeInfo, WorktreeManager},
37};
38use anyhow::Result;
39use futures::stream::{self, FuturesUnordered, StreamExt};
40use std::collections::{HashMap, VecDeque};
41use std::sync::Arc;
42use std::time::Instant;
43use tokio::sync::{RwLock, mpsc};
44use tokio::task::AbortHandle;
45use tokio::time::{Duration, MissedTickBehavior, timeout};
46#[path = "path_guard/mod.rs"]
47mod path_guard;
48
49/// Default context limit (256k tokens - conservative for most models)
50const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
51
52/// Reserve tokens for the response generation
53const RESPONSE_RESERVE_TOKENS: usize = 8_192;
54
55/// Safety margin before we start truncating (90% of limit)
56const TRUNCATION_THRESHOLD: f64 = 0.85;
57
58/// Estimate token count from text (rough heuristic: ~4 chars per token)
59fn estimate_tokens(text: &str) -> usize {
60    // This is a rough estimate - actual tokenization varies by model
61    // Most tokenizers average 3-5 chars per token for English text
62    // We use 3.5 to be conservative
63    (text.len() as f64 / 3.5).ceil() as usize
64}
65
66/// Estimate total tokens in a message
67fn estimate_message_tokens(message: &Message) -> usize {
68    let mut tokens = 4; // Role overhead
69
70    for part in &message.content {
71        tokens += match part {
72            ContentPart::Text { text } => estimate_tokens(text),
73            ContentPart::ToolCall {
74                id,
75                name,
76                arguments,
77                ..
78            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
79            ContentPart::ToolResult {
80                tool_call_id,
81                content,
82            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
83            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
84            ContentPart::Thinking { text } => estimate_tokens(text),
85        };
86    }
87
88    tokens
89}
90
91/// Estimate total tokens in all messages
92fn estimate_total_tokens(messages: &[Message]) -> usize {
93    messages.iter().map(estimate_message_tokens).sum()
94}
95
96/// Truncate messages to fit within context limit
97///
98/// Strategy:
99/// 1. First, aggressively truncate large tool results
100/// 2. Keep system message (first) and user message (second) always
101/// 3. Keep the most recent assistant + tool result pairs together
102/// 4. Drop oldest middle messages in matched pairs
103fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
104    let target_tokens =
105        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
106
107    let current_tokens = estimate_total_tokens(messages);
108    if current_tokens <= target_tokens {
109        return;
110    }
111
112    tracing::warn!(
113        current_tokens = current_tokens,
114        target_tokens = target_tokens,
115        context_limit = context_limit,
116        "Context approaching limit, truncating conversation history"
117    );
118
119    // FIRST: Aggressively truncate large tool results (this is the main culprit)
120    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
121
122    let after_tool_truncation = estimate_total_tokens(messages);
123    if after_tool_truncation <= target_tokens {
124        tracing::info!(
125            old_tokens = current_tokens,
126            new_tokens = after_tool_truncation,
127            "Truncated large tool results, context now within limits"
128        );
129        return;
130    }
131
132    // Minimum: keep first 2 (system + initial user) and last 4 messages
133    if messages.len() <= 6 {
134        tracing::warn!(
135            tokens = after_tool_truncation,
136            target = target_tokens,
137            "Cannot truncate further - conversation too short"
138        );
139        return;
140    }
141
142    // Remove messages from the middle, keeping first 2 and last 4
143    // But we need to remove in pairs to maintain tool_call_id references
144    let keep_start = 2;
145    let keep_end = 4;
146    let removable_count = messages.len() - keep_start - keep_end;
147
148    if removable_count == 0 {
149        return;
150    }
151
152    // Remove all middle messages
153    let removed_messages: Vec<_> = messages
154        .drain(keep_start..keep_start + removable_count)
155        .collect();
156    let summary = summarize_removed_messages(&removed_messages);
157
158    // Insert a summary message where we removed content
159    messages.insert(
160        keep_start,
161        Message {
162            role: Role::User,
163            content: vec![ContentPart::Text {
164                text: format!(
165                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
166                    removed_messages.len(),
167                    summary
168                ),
169            }],
170        },
171    );
172
173    let new_tokens = estimate_total_tokens(messages);
174    tracing::info!(
175        removed_messages = removed_messages.len(),
176        old_tokens = current_tokens,
177        new_tokens = new_tokens,
178        "Truncated conversation history"
179    );
180}
181
182/// Summarize removed messages for context preservation
183fn summarize_removed_messages(messages: &[Message]) -> String {
184    let mut summary = String::new();
185    let mut tool_calls: Vec<String> = Vec::new();
186
187    for msg in messages {
188        for part in &msg.content {
189            if let ContentPart::ToolCall { name, .. } = part
190                && !tool_calls.contains(name)
191            {
192                tool_calls.push(name.clone());
193            }
194        }
195    }
196
197    if !tool_calls.is_empty() {
198        summary.push_str(&format!(
199            "Tools used in truncated history: {}",
200            tool_calls.join(", ")
201        ));
202    }
203
204    summary
205}
206
207/// Truncate large tool results that exceed a reasonable size
208fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
209    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
210    let mut truncated_count = 0;
211    let mut saved_tokens = 0usize;
212
213    for message in messages.iter_mut() {
214        for part in message.content.iter_mut() {
215            if let ContentPart::ToolResult { content, .. } = part {
216                let tokens = estimate_tokens(content);
217                if tokens > max_tokens_per_result {
218                    let old_len = content.len();
219                    *content = truncate_single_result(content, char_limit);
220                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
221                    if content.len() < old_len {
222                        truncated_count += 1;
223                    }
224                }
225            }
226        }
227    }
228
229    if truncated_count > 0 {
230        tracing::info!(
231            truncated_count = truncated_count,
232            saved_tokens = saved_tokens,
233            max_tokens_per_result = max_tokens_per_result,
234            "Truncated large tool results"
235        );
236    }
237}
238
239/// Truncate a single result string to a maximum character limit
240fn truncate_single_result(content: &str, max_chars: usize) -> String {
241    if content.len() <= max_chars {
242        return content.to_string();
243    }
244
245    // Find a valid char boundary at or before max_chars
246    let safe_limit = {
247        let mut limit = max_chars.min(content.len());
248        while limit > 0 && !content.is_char_boundary(limit) {
249            limit -= 1;
250        }
251        limit
252    };
253
254    // Try to find a good break point (newline) near the limit
255    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
256
257    let truncated = format!(
258        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
259        &content[..break_point],
260        content.len(),
261        break_point
262    );
263
264    tracing::debug!(
265        original_len = content.len(),
266        truncated_len = truncated.len(),
267        "Truncated large result"
268    );
269
270    truncated
271}
272
273/// Threshold for when to use RLM vs truncation (in characters)
274const RLM_THRESHOLD_CHARS: usize = 50_000;
275
276/// Max chars for simple truncation (below RLM threshold)
277const SIMPLE_TRUNCATE_CHARS: usize = 6000;
278
279/// Collapse controller sampling interval for branch health.
280const COLLAPSE_SAMPLE_SECS: u64 = 5;
281const K8S_PASSTHROUGH_ENV_VARS: &[&str] = &[
282    "VAULT_ADDR",
283    "VAULT_TOKEN",
284    "VAULT_MOUNT",
285    "VAULT_SECRETS_PATH",
286    "VAULT_NAMESPACE",
287    "CODETETHER_AUTH_TOKEN",
288];
289
290#[derive(Debug, Clone)]
291struct ActiveK8sBranch {
292    branch: String,
293    started_at: Instant,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum AgentLoopExit {
298    Completed,
299    MaxStepsReached,
300    TimedOut,
301}
302
303/// Calculate the delay for exponential backoff
304///
305/// Uses the formula: min(initial_delay * multiplier^attempt, max_delay)
306fn calculate_backoff_delay(
307    attempt: u32,
308    initial_delay_ms: u64,
309    max_delay_ms: u64,
310    multiplier: f64,
311) -> Duration {
312    let delay_ms =
313        (initial_delay_ms as f64 * multiplier.powi(attempt as i32)).min(max_delay_ms as f64);
314    Duration::from_millis(delay_ms as u64)
315}
316
317fn compute_resource_health(pod_state: Option<&SubagentPodState>) -> (f32, u32) {
318    let Some(pod_state) = pod_state else {
319        return (0.2, 1);
320    };
321
322    let reason = pod_state
323        .reason
324        .as_deref()
325        .unwrap_or_default()
326        .to_ascii_lowercase();
327    let phase = pod_state.phase.to_ascii_lowercase();
328
329    if reason.contains("oomkilled") {
330        return (0.0, 3);
331    }
332    if reason.contains("imagepullbackoff") || reason.contains("errimagepull") {
333        return (0.0, 3);
334    }
335    if reason.contains("crashloopbackoff") {
336        return (0.1, 2);
337    }
338    if phase == "failed" {
339        return (0.1, 2);
340    }
341
342    let mut score = 1.0f32;
343    let mut unhealthy_signals = 0u32;
344
345    if !pod_state.ready {
346        score -= 0.2;
347    }
348    if !reason.is_empty() {
349        score -= 0.3;
350        unhealthy_signals += 1;
351    }
352    if pod_state.restart_count > 0 {
353        score -= (pod_state.restart_count.min(3) as f32) * 0.2;
354        unhealthy_signals += 1;
355    }
356
357    (score.clamp(0.0, 1.0), unhealthy_signals)
358}
359
360/// Process a large tool result using RLM to intelligently summarize it
361async fn process_large_result_with_rlm(
362    content: &str,
363    tool_name: &str,
364    provider: Arc<dyn Provider>,
365    model: &str,
366) -> String {
367    if content.len() <= SIMPLE_TRUNCATE_CHARS {
368        return content.to_string();
369    }
370
371    // For medium-sized content, just truncate
372    if content.len() <= RLM_THRESHOLD_CHARS {
373        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
374    }
375
376    // For very large content, use RLM to intelligently summarize
377    tracing::info!(
378        tool = %tool_name,
379        content_len = content.len(),
380        "Using RLM to process large tool result"
381    );
382
383    let query = format!(
384        "Summarize the key information from this {} output. \
385         Focus on: errors, warnings, important findings, and actionable items. \
386         Be concise but thorough.",
387        tool_name
388    );
389
390    let mut executor =
391        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
392
393    match executor.analyze(&query).await {
394        Ok(result) => {
395            let bounded_answer = truncate_single_result(&result.answer, SIMPLE_TRUNCATE_CHARS * 2);
396            tracing::info!(
397                tool = %tool_name,
398                original_len = content.len(),
399                summary_len = bounded_answer.len(),
400                iterations = result.iterations,
401                "RLM summarized large result"
402            );
403
404            format!(
405                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
406                tool_name,
407                content.len(),
408                bounded_answer.len(),
409                bounded_answer
410            )
411        }
412        Err(e) => {
413            tracing::warn!(
414                tool = %tool_name,
415                error = %e,
416                "RLM analysis failed, falling back to truncation"
417            );
418            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
419        }
420    }
421}
422
423/// The swarm executor runs subtasks in parallel
424pub struct SwarmExecutor {
425    config: SwarmConfig,
426    /// Optional agent for handling swarm-level coordination (reserved for future use)
427    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
428    /// Optional event channel for TUI real-time updates
429    event_tx: Option<mpsc::Sender<SwarmEvent>>,
430    /// Telemetry collector for swarm execution metrics
431    telemetry: Arc<SwarmTelemetryCollector>,
432    /// Cache for avoiding duplicate subtask execution
433    cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
434    /// Shared result store for sub-agent result sharing
435    result_store: Arc<ResultStore>,
436    /// Optional agent bus for inter-agent communication
437    bus: Option<Arc<AgentBus>>,
438    /// Optional CADMAS-CTX delegation state for LCB provider selection.
439    ///
440    /// Wrapped in `Arc<Mutex<...>>` so subtask outcome updates (Phase C
441    /// step 28) can flow back into the same posterior the next dispatch
442    /// reads from, even when the caller only owns `&self`.
443    delegation: Option<Arc<StdMutex<DelegationState>>>,
444}
445
446impl SwarmExecutor {
447    /// Create a new executor
448    pub fn new(config: SwarmConfig) -> Self {
449        Self {
450            config,
451            coordinator_agent: None,
452            event_tx: None,
453            telemetry: Arc::new(SwarmTelemetryCollector::default()),
454            cache: None,
455            result_store: ResultStore::new_arc(),
456            bus: None,
457            delegation: None,
458        }
459    }
460
461    /// Create a new executor with caching enabled
462    pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
463        let cache = SwarmCache::new(cache_config).await?;
464        Ok(Self {
465            config,
466            coordinator_agent: None,
467            event_tx: None,
468            telemetry: Arc::new(SwarmTelemetryCollector::default()),
469            cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
470            result_store: ResultStore::new_arc(),
471            bus: None,
472            delegation: None,
473        })
474    }
475
476    /// Set a pre-initialized cache
477    pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
478        self.cache = Some(cache);
479        self
480    }
481
482    /// Set an agent bus for inter-agent communication
483    pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
484        self.bus = Some(bus);
485        self
486    }
487
488    /// Set CADMAS-CTX delegation state for LCB provider selection.
489    pub fn with_delegation(mut self, state: DelegationState) -> Self {
490        self.delegation = Some(Arc::new(StdMutex::new(state)));
491        self
492    }
493
494    /// Get the agent bus if set
495    pub fn bus(&self) -> Option<&Arc<AgentBus>> {
496        self.bus.as_ref()
497    }
498
499    /// Set an event channel for real-time TUI updates
500    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
501        self.event_tx = Some(tx);
502        self
503    }
504
505    /// Set a coordinator agent for swarm-level coordination
506    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
507        tracing::debug!("Setting coordinator agent for swarm execution");
508        self.coordinator_agent = Some(agent);
509        self
510    }
511
512    /// Set a telemetry collector for swarm execution metrics
513    pub fn with_telemetry(mut self, telemetry: Arc<SwarmTelemetryCollector>) -> Self {
514        self.telemetry = telemetry;
515        self
516    }
517
518    /// Get the telemetry collector as an Arc
519    pub fn telemetry_arc(&self) -> Arc<SwarmTelemetryCollector> {
520        Arc::clone(&self.telemetry)
521    }
522    /// Get the coordinator agent if set
523    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
524        tracing::debug!(
525            has_coordinator = self.coordinator_agent.is_some(),
526            "Getting coordinator agent"
527        );
528        self.coordinator_agent.as_ref()
529    }
530
531    /// Get the shared result store
532    pub fn result_store(&self) -> &Arc<ResultStore> {
533        &self.result_store
534    }
535
536    /// Get cache statistics if caching is enabled
537    pub async fn cache_stats(&self) -> Option<CacheStats> {
538        if let Some(ref cache) = self.cache {
539            let cache_guard = cache.lock().await;
540            Some(cache_guard.stats().clone())
541        } else {
542            None
543        }
544    }
545
546    /// Clear the cache if enabled
547    pub async fn clear_cache(&self) -> Result<()> {
548        if let Some(ref cache) = self.cache {
549            let mut cache_guard = cache.lock().await;
550            cache_guard.clear().await?;
551        }
552        Ok(())
553    }
554
555    /// Get the retry configuration
556    pub fn retry_config(&self) -> (u32, u64, u64, f64) {
557        (
558            self.config.max_retries,
559            self.config.base_delay_ms,
560            self.config.max_delay_ms,
561            2.0, // backoff multiplier
562        )
563    }
564
565    /// Check if retries are enabled
566    pub fn retries_enabled(&self) -> bool {
567        self.config.max_retries > 0
568    }
569
570    /// Send an event to the TUI if channel is connected (non-blocking)
571    fn try_send_event(&self, event: SwarmEvent) {
572        // Also emit on the agent bus if connected
573        if let Some(ref bus) = self.bus {
574            let handle = bus.handle("swarm-executor");
575            match &event {
576                SwarmEvent::Started { task, .. } => {
577                    handle.send(
578                        "broadcast",
579                        BusMessage::AgentReady {
580                            agent_id: "swarm-executor".to_string(),
581                            capabilities: vec![format!("executing:{task}")],
582                        },
583                    );
584                }
585                SwarmEvent::Complete { success, .. } => {
586                    let state = if *success {
587                        crate::a2a::types::TaskState::Completed
588                    } else {
589                        crate::a2a::types::TaskState::Failed
590                    };
591                    handle.send_task_update("swarm", state, None);
592                }
593                _ => {} // Other events are TUI-specific
594            }
595        }
596
597        if let Some(ref tx) = self.event_tx {
598            let _ = tx.try_send(event);
599        }
600    }
601
602    /// Execute a task using the swarm
603    pub async fn execute(
604        &self,
605        task: &str,
606        strategy: DecompositionStrategy,
607    ) -> Result<SwarmResult> {
608        let start_time = Instant::now();
609
610        // Create orchestrator
611        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
612
613        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
614
615        // Decompose the task
616        let subtasks = orchestrator.decompose(task, strategy).await?;
617
618        if subtasks.is_empty() {
619            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
620            return Ok(SwarmResult {
621                success: false,
622                result: String::new(),
623                subtask_results: Vec::new(),
624                stats: SwarmStats::default(),
625                artifacts: Vec::new(),
626                error: Some("No subtasks generated".to_string()),
627            });
628        }
629
630        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
631
632        self.try_send_event(SwarmEvent::Started {
633            task: task.to_string(),
634            total_subtasks: subtasks.len(),
635        });
636
637        // Emit decomposition event for TUI
638        self.try_send_event(SwarmEvent::Decomposed {
639            subtasks: subtasks
640                .iter()
641                .map(|s| SubTaskInfo {
642                    id: s.id.clone(),
643                    name: s.name.clone(),
644                    status: SubTaskStatus::Pending,
645                    stage: s.stage,
646                    dependencies: s.dependencies.clone(),
647                    agent_name: s.specialty.clone(),
648                    current_tool: None,
649                    steps: 0,
650                    max_steps: self.config.max_steps_per_subagent,
651                    tool_call_history: Vec::new(),
652                    messages: Vec::new(),
653                    output: None,
654                    error: None,
655                })
656                .collect(),
657        });
658
659        // Execute stages in order
660        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
661        let mut all_results: Vec<SubTaskResult> = Vec::new();
662        let artifacts: Vec<SwarmArtifact> = Vec::new();
663
664        // Initialize telemetry for this swarm execution
665        let swarm_id = uuid::Uuid::new_v4().to_string();
666        let strategy_str = format!("{:?}", strategy);
667        self.telemetry
668            .start_swarm(&swarm_id, subtasks.len(), &strategy_str)
669            .await;
670
671        // Shared state for completed results
672        let completed_results: Arc<RwLock<HashMap<String, String>>> =
673            Arc::new(RwLock::new(HashMap::new()));
674
675        for stage in 0..=max_stage {
676            let stage_start = Instant::now();
677
678            let stage_subtasks: Vec<SubTask> = orchestrator
679                .subtasks_for_stage(stage)
680                .into_iter()
681                .cloned()
682                .collect();
683
684            tracing::debug!(
685                "Stage {} has {} subtasks (max_stage={})",
686                stage,
687                stage_subtasks.len(),
688                max_stage
689            );
690
691            if stage_subtasks.is_empty() {
692                continue;
693            }
694
695            tracing::info!(
696                provider_name = %orchestrator.provider(),
697                "Executing stage {} with {} subtasks",
698                stage,
699                stage_subtasks.len()
700            );
701
702            // Execute all subtasks in this stage in parallel
703            let stage_results = self
704                .execute_stage(
705                    &orchestrator,
706                    stage_subtasks,
707                    completed_results.clone(),
708                    &swarm_id,
709                )
710                .await?;
711
712            // Update completed results for next stage
713            {
714                let mut completed = completed_results.write().await;
715                for result in &stage_results {
716                    completed.insert(result.subtask_id.clone(), result.result.clone());
717                    // Also publish to shared ResultStore for richer querying
718                    let tags = vec![
719                        format!("stage:{stage}"),
720                        format!("subtask:{}", result.subtask_id),
721                    ];
722                    let _ = self
723                        .result_store
724                        .publish(
725                            &result.subtask_id,
726                            &result.subagent_id,
727                            &result.result,
728                            tags,
729                            None,
730                        )
731                        .await;
732                }
733            }
734
735            // Update orchestrator stats
736            let stage_time = stage_start.elapsed().as_millis() as u64;
737            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
738            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
739
740            orchestrator.stats_mut().stages.push(StageStats {
741                stage,
742                subagent_count: stage_results.len(),
743                max_steps,
744                total_steps,
745                execution_time_ms: stage_time,
746            });
747
748            // Mark subtasks as completed
749            for result in &stage_results {
750                orchestrator.complete_subtask(&result.subtask_id, result.clone());
751            }
752
753            // Emit stage complete event
754            let stage_completed = stage_results.iter().filter(|r| r.success).count();
755            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
756            self.try_send_event(SwarmEvent::StageComplete {
757                stage,
758                completed: stage_completed,
759                failed: stage_failed,
760            });
761
762            all_results.extend(stage_results);
763        }
764
765        // Get provider name before mutable borrow
766        let provider_name = orchestrator.provider().to_string();
767
768        // Record overall execution latency
769        self.telemetry
770            .record_swarm_latency("total_execution", start_time.elapsed())
771            .await;
772
773        // Calculate final stats
774        let stats = orchestrator.stats_mut();
775        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
776        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
777        stats.calculate_critical_path();
778        stats.calculate_speedup();
779
780        // Aggregate results
781        let success = all_results.iter().all(|r| r.success);
782
783        // Complete telemetry collection (interior-mutable, no outer lock to hold)
784        let _telemetry_metrics = self.telemetry.complete_swarm(success).await;
785        let result = self.aggregate_results(&all_results).await?;
786
787        tracing::info!(
788            provider_name = %provider_name,
789            "Swarm execution complete: {} subtasks, {:.1}x speedup",
790            all_results.len(),
791            stats.speedup_factor
792        );
793
794        let final_stats = orchestrator.stats().clone();
795        self.try_send_event(SwarmEvent::Complete {
796            success,
797            stats: final_stats.clone(),
798        });
799
800        Ok(SwarmResult {
801            success,
802            result,
803            subtask_results: all_results,
804            stats: final_stats,
805            artifacts,
806            error: None,
807        })
808    }
809
810    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
811    async fn execute_stage(
812        &self,
813        orchestrator: &Orchestrator,
814        subtasks: Vec<SubTask>,
815        completed_results: Arc<RwLock<HashMap<String, String>>>,
816        swarm_id: &str,
817    ) -> Result<Vec<SubTaskResult>> {
818        if self.config.execution_mode == ExecutionMode::KubernetesPod {
819            return self
820                .execute_stage_kubernetes(orchestrator, subtasks, completed_results, swarm_id)
821                .await;
822        }
823
824        let mut handles: FuturesUnordered<
825            tokio::task::JoinHandle<(String, Result<SubTaskResult, anyhow::Error>)>,
826        > = FuturesUnordered::new();
827        let mut abort_handles: HashMap<String, AbortHandle> = HashMap::new();
828        let mut task_ids: HashMap<tokio::task::Id, String> = HashMap::new();
829        let mut active_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
830        let mut all_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
831        let mut cached_results: Vec<SubTaskResult> = Vec::new();
832        let mut completed_entries: Vec<(SubTaskResult, Option<WorktreeInfo>)> = Vec::new();
833        // Phase C step 28: per-subtask LCB dispatch records so the
834        // posterior gets the outcome update on completion.
835        let mut subtask_assignments: HashMap<String, (String, String)> = HashMap::new();
836        let mut kill_reasons: HashMap<String, String> = HashMap::new();
837        let mut promoted_subtask_id: Option<String> = None;
838
839        // Rate limiting: semaphore for max concurrent requests
840        let semaphore = Arc::new(tokio::sync::Semaphore::new(
841            self.config.max_concurrent_requests,
842        ));
843        let delay_ms = self.config.request_delay_ms;
844
845        // Get provider info for tool registry
846        let model = orchestrator.model().to_string();
847        let provider_name = orchestrator.provider().to_string();
848        let providers = orchestrator.providers();
849        let provider = providers
850            .get(&provider_name)
851            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
852
853        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
854
855        // Create base tool registry with provider for ralph and batch tool
856        let base_tool_registry =
857            ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
858        let tool_definitions = base_tool_registry.definitions();
859
860        // Clone the result store for sub-agent sharing
861        let result_store = Arc::clone(&self.result_store);
862
863        // Create worktree manager if enabled
864        let worktree_manager = if self.config.worktree_enabled {
865            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
866                std::env::current_dir()
867                    .map(|p| p.to_string_lossy().to_string())
868                    .unwrap_or_else(|_| ".".to_string())
869            });
870
871            let mgr = WorktreeManager::new(&working_dir);
872            tracing::info!(
873                working_dir = %working_dir,
874                "Worktree isolation enabled for parallel sub-agents"
875            );
876            Some(Arc::new(mgr) as Arc<WorktreeManager>)
877        } else {
878            None
879        };
880
881        // PRE-PASS 1 (cache): split subtasks into cached results and
882        // pending work. Cached ones short-circuit so we don't even
883        // consider them for worktree creation.
884        let mut pending_subtasks: Vec<SubTask> = Vec::with_capacity(subtasks.len());
885        for subtask in subtasks.into_iter() {
886            if let Some(ref cache) = self.cache {
887                let mut cache_guard = cache.lock().await;
888                if let Some(cached_result) = cache_guard.get(&subtask).await {
889                    tracing::info!(
890                        subtask_id = %subtask.id,
891                        task_name = %subtask.name,
892                        "Cache hit for subtask, skipping execution"
893                    );
894                    self.try_send_event(SwarmEvent::SubTaskUpdate {
895                        id: subtask.id.clone(),
896                        name: subtask.name.clone(),
897                        status: SubTaskStatus::Completed,
898                        agent_name: Some("cached".to_string()),
899                    });
900                    cached_results.push(cached_result);
901                    continue;
902                }
903            }
904            pending_subtasks.push(subtask);
905        }
906
907        // PRE-PASS 2 (worktrees): create worktrees for mutating non-cached
908        // subtasks concurrently. Mutating subtasks fail closed when their
909        // required worktree cannot be created; read-only subtasks continue in
910        // the shared directory.
911        let mut worktrees_by_id: HashMap<String, WorktreeInfo> = HashMap::new();
912        if let Some(ref mgr) = worktree_manager {
913            // Only provision worktrees for subtasks that actually
914            // need isolation. Read-only research/review/fact-check
915            // agents run against the shared working directory —
916            // saves git setup cost and avoids `.git/worktrees`
917            // lock contention at high fan-out.
918            let ids: Vec<String> = pending_subtasks
919                .iter()
920                .filter(|s| s.needs_worktree())
921                .map(|s| s.id.clone())
922                .collect();
923            let skipped = pending_subtasks.len().saturating_sub(ids.len());
924            if skipped > 0 {
925                tracing::info!(
926                    skipped,
927                    total = pending_subtasks.len(),
928                    "Skipping worktree creation for read-only sub-agents"
929                );
930            }
931            worktrees_by_id = precreate_worktrees(Arc::clone(mgr), ids).await;
932            for (sid, wt) in &worktrees_by_id {
933                active_worktrees.insert(sid.clone(), wt.clone());
934                all_worktrees.insert(sid.clone(), wt.clone());
935            }
936            let mut runnable = Vec::with_capacity(pending_subtasks.len());
937            for subtask in pending_subtasks.into_iter() {
938                if subtask.needs_worktree() && !worktrees_by_id.contains_key(&subtask.id) {
939                    cached_results.push(required_worktree_failure(&subtask));
940                } else {
941                    runnable.push(subtask);
942                }
943            }
944            pending_subtasks = runnable;
945        }
946
947        for (idx, subtask) in pending_subtasks.into_iter().enumerate() {
948            // LCB delegation: pick best provider for this subtask's specialty.
949            let (chosen_provider, chosen_model) = if let Some(ref state) = self.delegation {
950                match state.lock() {
951                    Ok(guard) => super::delegation::choose_provider_for_subtask(
952                        &providers,
953                        &guard,
954                        subtask.specialty.as_deref().unwrap_or("General"),
955                        &provider_name,
956                        &model,
957                    ),
958                    Err(e) => {
959                        tracing::warn!("delegation state mutex poisoned; recovering");
960                        let guard = e.into_inner();
961                        super::delegation::choose_provider_for_subtask(
962                            &providers,
963                            &guard,
964                            subtask.specialty.as_deref().unwrap_or("General"),
965                            &provider_name,
966                            &model,
967                        )
968                    }
969                }
970            } else {
971                (provider_name.clone(), model.clone())
972            };
973            let subtask_provider = providers
974                .get(&chosen_provider)
975                .unwrap_or_else(|| Arc::clone(&provider));
976            let model = chosen_model;
977            subtask_assignments.insert(
978                subtask.id.clone(),
979                (
980                    chosen_provider.clone(),
981                    subtask
982                        .specialty
983                        .clone()
984                        .unwrap_or_else(|| "General".into()),
985                ),
986            );
987            let _provider_name = chosen_provider;
988            let provider = subtask_provider;
989
990            // Get context from dependencies
991            let context = {
992                let completed = completed_results.read().await;
993                let mut dep_context = String::new();
994                for dep_id in &subtask.dependencies {
995                    if let Some(result) = completed.get(dep_id) {
996                        dep_context.push_str(&format!(
997                            "\n--- Result from dependency {} ---\n{}\n",
998                            dep_id, result
999                        ));
1000                    }
1001                }
1002                dep_context
1003            };
1004
1005            let instruction = subtask.instruction.clone();
1006            let subtask_name = subtask.name.clone();
1007            let specialty = subtask.specialty.clone().unwrap_or_default();
1008            let read_only_task = tool_policy::is_read_only_task(&subtask);
1009            let subtask_id = subtask.id.clone();
1010            let subtask_id_for_handle = subtask_id.clone();
1011            let max_steps = self.config.max_steps_per_subagent;
1012            let timeout_secs = self.config.subagent_timeout_secs;
1013
1014            // Retry configuration
1015            let max_retries = self.config.max_retries;
1016            let base_delay_ms = self.config.base_delay_ms;
1017            let max_delay_ms = self.config.max_delay_ms;
1018
1019            // Look up the pre-created worktree. `None` means worktrees are
1020            // disabled or this is a read-only subtask that does not need one.
1021            let worktree_info = worktrees_by_id.remove(&subtask_id);
1022
1023            let working_dir = worktree_info
1024                .as_ref()
1025                .map(|wt| wt.path.display().to_string())
1026                .unwrap_or_else(|| ".".to_string());
1027            let working_dir_path = worktree_info.as_ref().map(|wt| wt.path.clone());
1028
1029            // Clone for the async block
1030            let tools = tool_policy::definitions(&tool_definitions, read_only_task);
1031            let _base_registry = Arc::clone(&base_tool_registry);
1032            let agent_result_store = Arc::clone(&result_store);
1033            let sem = Arc::clone(&semaphore);
1034            let stagger_delay = delay_ms * idx as u64; // Stagger start times
1035            let event_tx = self.event_tx.clone();
1036
1037            // Generate sub-agent execution ID
1038            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
1039
1040            // Log telemetry for this sub-agent
1041            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
1042
1043            // Spawn the subtask execution with agentic tool loop
1044            let handle = tokio::spawn(async move {
1045                // Rate limiting: stagger start and acquire semaphore
1046                if stagger_delay > 0 {
1047                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
1048                }
1049                let _permit = match sem.acquire().await {
1050                    Ok(permit) => permit,
1051                    Err(_) => {
1052                        return (
1053                            subtask_id.clone(),
1054                            Err(anyhow::anyhow!("Swarm execution cancelled")),
1055                        );
1056                    }
1057                };
1058
1059                let _agent_start = Instant::now();
1060
1061                let start = Instant::now();
1062
1063                // Load AGENTS.md from working directory
1064                let working_path = std::path::Path::new(&working_dir);
1065                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
1066                    .map(|(content, _)| {
1067                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
1068                    })
1069                    .unwrap_or_default();
1070
1071                // Build the system prompt for this sub-agent
1072                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
1073                let system_prompt = tool_policy::system_prompt(tool_policy::SystemPromptInput {
1074                    specialty: &specialty,
1075                    subtask_id: &subtask_id,
1076                    working_dir: &working_dir,
1077                    model: &model,
1078                    prd_filename: &prd_filename,
1079                    agents_md: &agents_md_content,
1080                    read_only: read_only_task,
1081                });
1082
1083                let user_prompt = if context.is_empty() {
1084                    format!("Complete this task:\n\n{}", instruction)
1085                } else {
1086                    format!(
1087                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
1088                        instruction, context
1089                    )
1090                };
1091
1092                // Emit AgentStarted event
1093                if let Some(ref tx) = event_tx {
1094                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1095                        id: subtask_id.clone(),
1096                        name: subtask_name.clone(),
1097                        status: SubTaskStatus::Running,
1098                        agent_name: Some(format!("agent-{}", subtask_id)),
1099                    });
1100                    let _ = tx.try_send(SwarmEvent::AgentStarted {
1101                        subtask_id: subtask_id.clone(),
1102                        agent_name: format!("agent-{}", subtask_id),
1103                        specialty: specialty.clone(),
1104                    });
1105                }
1106
1107                // Run the agentic loop
1108                // Create per-agent registry with SwarmShareTool for this subtask
1109                let mut agent_registry =
1110                    ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1111                agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1112                    Arc::clone(&agent_result_store),
1113                    subtask_id.clone(),
1114                )));
1115                let batch_tool = Arc::new(crate::tool::batch::BatchTool::new());
1116                agent_registry.register(batch_tool.clone());
1117                let mut search_providers = crate::provider::ProviderRegistry::new();
1118                search_providers.register(Arc::clone(&provider));
1119                agent_registry.register_search(Arc::new(search_providers));
1120                tool_policy::restrict_registry(&mut agent_registry, read_only_task);
1121                let registry = Arc::new(agent_registry);
1122                batch_tool.set_registry(Arc::downgrade(&registry));
1123
1124                // Execute with exponential backoff retry
1125                let mut attempt = 0u32;
1126                let mut result: Result<(String, usize, usize, AgentLoopExit), anyhow::Error> =
1127                    Err(anyhow::anyhow!("Not executed"));
1128
1129                while attempt <= max_retries {
1130                    let _attempt_start = Instant::now();
1131
1132                    // Run the agent loop
1133                    result = run_agent_loop(
1134                        Arc::clone(&provider),
1135                        &model,
1136                        &system_prompt,
1137                        &user_prompt,
1138                        tools.clone(),
1139                        Arc::clone(&registry),
1140                        max_steps,
1141                        timeout_secs,
1142                        event_tx.clone(),
1143                        subtask_id.clone(),
1144                        None,
1145                        working_dir_path.clone(),
1146                    )
1147                    .await;
1148
1149                    // Check if the attempt succeeded
1150                    match &result {
1151                        Ok((_, _, _, exit_reason)) => {
1152                            if *exit_reason == AgentLoopExit::Completed {
1153                                // Success - no need to retry
1154                                tracing::info!(
1155                                    subtask_id = %subtask_id,
1156                                    attempt = attempt + 1,
1157                                    "Sub-agent completed successfully on first attempt"
1158                                );
1159                                break;
1160                            }
1161                            // Failed but not an error - check if we should retry
1162                            let should_retry = attempt < max_retries;
1163                            if should_retry {
1164                                let delay = calculate_backoff_delay(
1165                                    attempt,
1166                                    base_delay_ms,
1167                                    max_delay_ms,
1168                                    2.0,
1169                                );
1170                                tracing::warn!(
1171                                    subtask_id = %subtask_id,
1172                                    attempt = attempt + 1,
1173                                    max_retries = max_retries,
1174                                    exit_reason = ?exit_reason,
1175                                    delay_ms = delay.as_millis(),
1176                                    "Sub-agent did not complete, retrying with backoff"
1177                                );
1178                                tokio::time::sleep(delay).await;
1179                            } else {
1180                                tracing::warn!(
1181                                    subtask_id = %subtask_id,
1182                                    attempt = attempt + 1,
1183                                    max_retries = max_retries,
1184                                    exit_reason = ?exit_reason,
1185                                    "Sub-agent did not complete, retries exhausted"
1186                                );
1187                            }
1188                        }
1189                        Err(e) => {
1190                            // Error occurred - retry with backoff if we have retries left
1191                            let should_retry = attempt < max_retries;
1192                            if should_retry {
1193                                let delay = calculate_backoff_delay(
1194                                    attempt,
1195                                    base_delay_ms,
1196                                    max_delay_ms,
1197                                    2.0,
1198                                );
1199                                tracing::warn!(
1200                                    subtask_id = %subtask_id,
1201                                    attempt = attempt + 1,
1202                                    max_retries = max_retries,
1203                                    error = %e,
1204                                    delay_ms = delay.as_millis(),
1205                                    "Sub-agent error, retrying with backoff"
1206                                );
1207                                tokio::time::sleep(delay).await;
1208                            } else {
1209                                tracing::error!(
1210                                    subtask_id = %subtask_id,
1211                                    attempt = attempt + 1,
1212                                    max_retries = max_retries,
1213                                    error = %e,
1214                                    "Sub-agent error, retries exhausted"
1215                                );
1216                            }
1217                        }
1218                    }
1219
1220                    attempt += 1;
1221                }
1222
1223                let result = result;
1224
1225                let task_result = match result {
1226                    Ok((output, steps, tool_calls, exit_reason)) => {
1227                        let (success, status, error) = match exit_reason {
1228                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1229                            AgentLoopExit::MaxStepsReached => (
1230                                false,
1231                                SubTaskStatus::Failed,
1232                                Some(format!("Sub-agent hit max steps ({max_steps})")),
1233                            ),
1234                            AgentLoopExit::TimedOut => (
1235                                false,
1236                                SubTaskStatus::TimedOut,
1237                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
1238                            ),
1239                        };
1240
1241                        // Calculate actual retry info - attempt is 0-indexed, so attempt=0 means 1 attempt, attempt=1 means 2 attempts, etc.
1242                        let total_attempts = attempt + 1;
1243                        let actual_retry_attempts = if total_attempts > 1 {
1244                            total_attempts - 1
1245                        } else {
1246                            0
1247                        };
1248
1249                        // Emit completion events
1250                        if let Some(ref tx) = event_tx {
1251                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1252                                id: subtask_id.clone(),
1253                                name: subtask_name.clone(),
1254                                status,
1255                                agent_name: Some(format!("agent-{}", subtask_id)),
1256                            });
1257                            if let Some(ref message) = error {
1258                                let _ = tx.try_send(SwarmEvent::AgentError {
1259                                    subtask_id: subtask_id.clone(),
1260                                    error: message.clone(),
1261                                });
1262                            }
1263                            let _ = tx.try_send(SwarmEvent::AgentOutput {
1264                                subtask_id: subtask_id.clone(),
1265                                output: output.clone(),
1266                            });
1267                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1268                                subtask_id: subtask_id.clone(),
1269                                success,
1270                                steps,
1271                            });
1272                        }
1273                        Ok(SubTaskResult {
1274                            subtask_id: subtask_id.clone(),
1275                            subagent_id: format!("agent-{}", subtask_id),
1276                            success,
1277                            result: output,
1278                            steps,
1279                            tool_calls,
1280                            execution_time_ms: start.elapsed().as_millis() as u64,
1281                            error,
1282                            artifacts: Vec::new(),
1283                            retry_count: actual_retry_attempts,
1284                        })
1285                    }
1286                    Err(e) => {
1287                        // Calculate actual retry info - attempt is 0-indexed
1288                        let total_attempts = attempt + 1;
1289                        let actual_retry_attempts = if total_attempts > 1 {
1290                            total_attempts - 1
1291                        } else {
1292                            0
1293                        };
1294
1295                        // Emit error events
1296                        if let Some(ref tx) = event_tx {
1297                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1298                                id: subtask_id.clone(),
1299                                name: subtask_name.clone(),
1300                                status: SubTaskStatus::Failed,
1301                                agent_name: Some(format!("agent-{}", subtask_id)),
1302                            });
1303                            let _ = tx.try_send(SwarmEvent::AgentError {
1304                                subtask_id: subtask_id.clone(),
1305                                error: e.to_string(),
1306                            });
1307                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1308                                subtask_id: subtask_id.clone(),
1309                                success: false,
1310                                steps: 0,
1311                            });
1312                        }
1313                        Ok(SubTaskResult {
1314                            subtask_id: subtask_id.clone(),
1315                            subagent_id: format!("agent-{}", subtask_id),
1316                            success: false,
1317                            result: String::new(),
1318                            steps: 0,
1319                            tool_calls: 0,
1320                            execution_time_ms: start.elapsed().as_millis() as u64,
1321                            error: Some(e.to_string()),
1322                            artifacts: Vec::new(),
1323                            retry_count: actual_retry_attempts,
1324                        })
1325                    }
1326                };
1327
1328                (subtask_id.clone(), task_result)
1329            });
1330
1331            let abort_handle = handle.abort_handle();
1332            abort_handles.insert(subtask_id_for_handle.clone(), abort_handle);
1333            task_ids.insert(handle.id(), subtask_id_for_handle.clone());
1334            handles.push(handle);
1335        }
1336
1337        // Deterministic collapse loop while branches are still running.
1338        let mut collapse_controller = if worktree_manager.is_some() && active_worktrees.len() > 1 {
1339            Some(CollapseController::new(CollapsePolicy::default()))
1340        } else {
1341            None
1342        };
1343        let mut collapse_tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1344        collapse_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1345        // Consume the immediate first tick so sampling starts after the first interval.
1346        let _ = collapse_tick.tick().await;
1347
1348        while !handles.is_empty() {
1349            tokio::select! {
1350                maybe_join = handles.next() => {
1351                    let Some(joined) = maybe_join else {
1352                        continue;
1353                    };
1354                    match joined {
1355                        Ok((subtask_id, Ok(result))) => {
1356                            abort_handles.remove(&subtask_id);
1357                            let wt = active_worktrees.remove(&subtask_id).or_else(|| all_worktrees.get(&subtask_id).cloned());
1358                            if let (Some(state), Some((prov, spec))) =
1359                                (self.delegation.as_ref(), subtask_assignments.get(&subtask_id))
1360                            {
1361                                super::delegation_outcome::record_subtask_outcome(
1362                                    state,
1363                                    prov,
1364                                    spec,
1365                                    result.success,
1366                                );
1367                            }
1368                            completed_entries.push((result, wt));
1369                        }
1370                        Ok((subtask_id, Err(e))) => {
1371                            abort_handles.remove(&subtask_id);
1372                            active_worktrees.remove(&subtask_id);
1373                            let wt = all_worktrees.get(&subtask_id).cloned();
1374                            if let (Some(state), Some((prov, spec))) =
1375                                (self.delegation.as_ref(), subtask_assignments.get(&subtask_id))
1376                            {
1377                                super::delegation_outcome::record_subtask_outcome(
1378                                    state, prov, spec, false,
1379                                );
1380                            }
1381                            completed_entries.push((
1382                                SubTaskResult {
1383                                    subtask_id: subtask_id.clone(),
1384                                    subagent_id: format!("agent-{subtask_id}"),
1385                                    success: false,
1386                                    result: String::new(),
1387                                    steps: 0,
1388                                    tool_calls: 0,
1389                                    execution_time_ms: 0,
1390                                    error: Some(e.to_string()),
1391                                    artifacts: Vec::new(),
1392                                    retry_count: 0,
1393                                },
1394                                wt,
1395                            ));
1396                        }
1397                        Err(e) => {
1398                            let subtask_id = task_ids
1399                                .remove(&e.id())
1400                                .unwrap_or_else(|| "unknown".to_string());
1401                            abort_handles.remove(&subtask_id);
1402                            active_worktrees.remove(&subtask_id);
1403                            let wt = all_worktrees.get(&subtask_id).cloned();
1404                            if let (Some(state), Some((prov, spec))) =
1405                                (self.delegation.as_ref(), subtask_assignments.get(&subtask_id))
1406                            {
1407                                super::delegation_outcome::record_subtask_outcome(
1408                                    state, prov, spec, false,
1409                                );
1410                            }
1411                            completed_entries.push((
1412                                SubTaskResult {
1413                                    subtask_id: subtask_id.clone(),
1414                                    subagent_id: format!("agent-{subtask_id}"),
1415                                    success: false,
1416                                    result: String::new(),
1417                                    steps: 0,
1418                                    tool_calls: 0,
1419                                    execution_time_ms: 0,
1420                                    error: Some(format!("Task join error: {e}")),
1421                                    artifacts: Vec::new(),
1422                                    retry_count: 0,
1423                                },
1424                                wt,
1425                            ));
1426                        }
1427                    }
1428                }
1429                _ = collapse_tick.tick(), if collapse_controller.is_some() && !active_worktrees.is_empty() => {
1430                    let branches: Vec<BranchRuntimeState> = active_worktrees
1431                        .iter()
1432                        .map(|(subtask_id, wt)| BranchRuntimeState {
1433                            subtask_id: subtask_id.clone(),
1434                            branch: wt.branch.clone(),
1435                            worktree_path: wt.path.clone(),
1436                        })
1437                        .collect();
1438
1439                    if let Some(controller) = collapse_controller.as_mut() {
1440                        match controller.sample(&branches) {
1441                            Ok(tick) => {
1442                                if promoted_subtask_id != tick.promoted_subtask_id {
1443                                    promoted_subtask_id = tick.promoted_subtask_id.clone();
1444                                    if let Some(ref promoted) = promoted_subtask_id {
1445                                        tracing::info!(
1446                                            subtask_id = %promoted,
1447                                            "Collapse controller promoted branch"
1448                                        );
1449                                        if let Some(audit) = crate::audit::try_audit_log() {
1450                                            audit.log_with_correlation(
1451                                                crate::audit::AuditCategory::Swarm,
1452                                                "collapse_promote_branch",
1453                                                crate::audit::AuditOutcome::Success,
1454                                                Some("collapse-controller".to_string()),
1455                                                Some(serde_json::json!({
1456                                                    "swarm_id": swarm_id,
1457                                                    "subtask_id": promoted,
1458                                                })),
1459                                                None,
1460                                                None,
1461                                                Some(swarm_id.to_string()),
1462                                                None,
1463                                            ).await;
1464                                        }
1465                                    }
1466                                }
1467
1468                                for kill in tick.kills {
1469                                    if kill_reasons.contains_key(&kill.subtask_id) {
1470                                        continue;
1471                                    }
1472                                    let Some(abort_handle) = abort_handles.get(&kill.subtask_id) else {
1473                                        continue;
1474                                    };
1475
1476                                    abort_handle.abort();
1477                                    abort_handles.remove(&kill.subtask_id);
1478                                    active_worktrees.remove(&kill.subtask_id);
1479                                    kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1480
1481                                    tracing::warn!(
1482                                        subtask_id = %kill.subtask_id,
1483                                        branch = %kill.branch,
1484                                        reason = %kill.reason,
1485                                        "Collapse controller killed branch"
1486                                    );
1487
1488                                    if let Some(ref tx) = self.event_tx {
1489                                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1490                                            id: kill.subtask_id.clone(),
1491                                            name: kill.subtask_id.clone(),
1492                                            status: SubTaskStatus::Cancelled,
1493                                            agent_name: Some(format!("agent-{}", kill.subtask_id)),
1494                                        });
1495                                        let _ = tx.try_send(SwarmEvent::AgentError {
1496                                            subtask_id: kill.subtask_id.clone(),
1497                                            error: format!("Cancelled by collapse controller: {}", kill.reason),
1498                                        });
1499                                    }
1500
1501                                    if let Some(audit) = crate::audit::try_audit_log() {
1502                                        audit.log_with_correlation(
1503                                            crate::audit::AuditCategory::Swarm,
1504                                            "collapse_kill_branch",
1505                                            crate::audit::AuditOutcome::Success,
1506                                            Some("collapse-controller".to_string()),
1507                                            Some(serde_json::json!({
1508                                                "swarm_id": swarm_id,
1509                                                "subtask_id": kill.subtask_id,
1510                                                "branch": kill.branch,
1511                                                "reason": kill.reason,
1512                                            })),
1513                                            None,
1514                                            None,
1515                                            Some(swarm_id.to_string()),
1516                                            None,
1517                                        ).await;
1518                                    }
1519                                }
1520                            }
1521                            Err(e) => {
1522                                tracing::warn!(error = %e, "Collapse controller sampling failed");
1523                            }
1524                        }
1525                    }
1526                }
1527            }
1528        }
1529
1530        // Merge in deterministic order: promoted branch first when present.
1531        if let Some(ref promoted) = promoted_subtask_id {
1532            completed_entries.sort_by_key(|(result, _)| {
1533                if &result.subtask_id == promoted {
1534                    0usize
1535                } else {
1536                    1usize
1537                }
1538            });
1539        }
1540
1541        let mut results = cached_results;
1542        let auto_merge = self.config.worktree_auto_merge;
1543
1544        for (mut result, worktree_info) in completed_entries {
1545            // Handle worktree merge/cleanup if applicable
1546            if let Some(wt) = worktree_info {
1547                if let Some(reason) = kill_reasons.get(&result.subtask_id) {
1548                    result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1549                    result.result.push_str(&format!(
1550                        "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1551                    ));
1552                    if let Some(ref mgr) = worktree_manager
1553                        && let Err(e) = mgr.cleanup(&wt.name).await
1554                    {
1555                        tracing::warn!(error = %e, "Failed to cleanup killed worktree");
1556                    }
1557                } else if result.success && auto_merge {
1558                    if let Some(ref mgr) = worktree_manager {
1559                        match mgr.merge(&wt.name).await {
1560                            Ok(merge_result) => {
1561                                if merge_result.success {
1562                                    tracing::info!(
1563                                        subtask_id = %result.subtask_id,
1564                                        files_changed = merge_result.files_changed,
1565                                        "Merged worktree changes successfully"
1566                                    );
1567                                    result.result.push_str(&format!(
1568                                        "\n\n--- Merge Result ---\n{}",
1569                                        merge_result.summary
1570                                    ));
1571                                } else if merge_result.aborted {
1572                                    tracing::warn!(
1573                                        subtask_id = %result.subtask_id,
1574                                        summary = %merge_result.summary,
1575                                        "Merge was aborted"
1576                                    );
1577                                    result.result.push_str(&format!(
1578                                        "\n\n--- Merge Aborted ---\n{}",
1579                                        merge_result.summary
1580                                    ));
1581                                } else {
1582                                    tracing::warn!(
1583                                        subtask_id = %result.subtask_id,
1584                                        conflicts = ?merge_result.conflicts,
1585                                        "Merge had conflicts"
1586                                    );
1587                                    result.result.push_str(&format!(
1588                                        "\n\n--- Merge Conflicts ---\n{}",
1589                                        merge_result.summary
1590                                    ));
1591                                }
1592                                if let Err(e) = mgr.cleanup(&wt.name).await {
1593                                    tracing::warn!(error = %e, "Failed to cleanup worktree");
1594                                }
1595                            }
1596                            Err(e) => {
1597                                tracing::error!(
1598                                    subtask_id = %result.subtask_id,
1599                                    error = %e,
1600                                    "Failed to merge worktree"
1601                                );
1602                            }
1603                        }
1604                    }
1605                } else if !result.success {
1606                    tracing::info!(
1607                        subtask_id = %result.subtask_id,
1608                        worktree_path = %wt.path.display(),
1609                        "Keeping worktree for debugging (task failed)"
1610                    );
1611                }
1612            }
1613
1614            // Cache successful result
1615            if result.success
1616                && let Some(ref cache_arc) = self.cache
1617            {
1618                let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1619                    cache_arc.lock().await;
1620                let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1621                if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1622                    tracing::warn!(
1623                        subtask_id = %result.subtask_id,
1624                        error = %e,
1625                        "Failed to cache subtask result"
1626                    );
1627                }
1628            }
1629
1630            results.push(result);
1631        }
1632
1633        Ok(results)
1634    }
1635
1636    async fn execute_stage_kubernetes(
1637        &self,
1638        orchestrator: &Orchestrator,
1639        subtasks: Vec<SubTask>,
1640        completed_results: Arc<RwLock<HashMap<String, String>>>,
1641        swarm_id: &str,
1642    ) -> Result<Vec<SubTaskResult>> {
1643        let k8s = K8sManager::new().await;
1644        if !k8s.is_available() {
1645            anyhow::bail!(
1646                "Kubernetes execution mode requested but K8s client is unavailable in this environment"
1647            );
1648        }
1649
1650        let provider_name = orchestrator.provider().to_string();
1651        let model = orchestrator.model().to_string();
1652        let pod_budget = self.config.k8s_pod_budget.max(1);
1653        let mut pending: VecDeque<SubTask> = subtasks.into_iter().collect();
1654        let mut active: HashMap<String, ActiveK8sBranch> = HashMap::new();
1655        let mut subtask_names: HashMap<String, String> = HashMap::new();
1656        let mut results: Vec<SubTaskResult> = Vec::new();
1657        let mut kill_reasons: HashMap<String, String> = HashMap::new();
1658        let mut promoted_subtask_id: Option<String> = None;
1659        let mut collapse_controller = CollapseController::new(CollapsePolicy::default());
1660
1661        let mut tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1662        tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1663        let _ = tick.tick().await;
1664
1665        loop {
1666            while active.len() < pod_budget {
1667                let Some(subtask) = pending.pop_front() else {
1668                    break;
1669                };
1670
1671                if let Some(ref cache) = self.cache {
1672                    let mut cache_guard = cache.lock().await;
1673                    if let Some(cached_result) = cache_guard.get(&subtask).await {
1674                        tracing::info!(
1675                            subtask_id = %subtask.id,
1676                            task_name = %subtask.name,
1677                            "Cache hit for subtask, skipping Kubernetes execution"
1678                        );
1679                        self.try_send_event(SwarmEvent::SubTaskUpdate {
1680                            id: subtask.id.clone(),
1681                            name: subtask.name.clone(),
1682                            status: SubTaskStatus::Completed,
1683                            agent_name: Some("cached".to_string()),
1684                        });
1685                        results.push(cached_result);
1686                        continue;
1687                    }
1688                }
1689
1690                let context = {
1691                    let completed = completed_results.read().await;
1692                    let mut dep_context = String::new();
1693                    for dep_id in &subtask.dependencies {
1694                        if let Some(result) = completed.get(dep_id) {
1695                            dep_context.push_str(&format!(
1696                                "\n--- Result from dependency {} ---\n{}\n",
1697                                dep_id, result
1698                            ));
1699                        }
1700                    }
1701                    dep_context
1702                };
1703
1704                let payload = RemoteSubtaskPayload {
1705                    swarm_id: swarm_id.to_string(),
1706                    subtask_id: subtask.id.clone(),
1707                    subtask_name: subtask.name.clone(),
1708                    specialty: subtask.specialty.clone().unwrap_or_default(),
1709                    instruction: subtask.instruction.clone(),
1710                    context: context.clone(),
1711                    provider: provider_name.clone(),
1712                    model: model.clone(),
1713                    max_steps: self.config.max_steps_per_subagent,
1714                    timeout_secs: self.config.subagent_timeout_secs,
1715                    working_dir: self.config.working_dir.clone(),
1716                    read_only: tool_policy::is_read_only_task(&subtask),
1717                    probe_interval_secs: COLLAPSE_SAMPLE_SECS,
1718                };
1719                let payload_b64 = match encode_payload(&payload) {
1720                    Ok(payload) => payload,
1721                    Err(error) => {
1722                        let error_text = format!("Failed to encode remote payload: {error}");
1723                        tracing::error!(subtask_id = %subtask.id, error = %error, "K8s payload encoding failed");
1724                        self.try_send_event(SwarmEvent::SubTaskUpdate {
1725                            id: subtask.id.clone(),
1726                            name: subtask.name.clone(),
1727                            status: SubTaskStatus::Failed,
1728                            agent_name: Some("k8s-encoder".to_string()),
1729                        });
1730                        self.try_send_event(SwarmEvent::AgentError {
1731                            subtask_id: subtask.id.clone(),
1732                            error: error_text.clone(),
1733                        });
1734                        results.push(SubTaskResult {
1735                            subtask_id: subtask.id.clone(),
1736                            subagent_id: format!("agent-{}", subtask.id),
1737                            success: false,
1738                            result: String::new(),
1739                            steps: 0,
1740                            tool_calls: 0,
1741                            execution_time_ms: 0,
1742                            error: Some(error_text),
1743                            artifacts: Vec::new(),
1744                            retry_count: 0,
1745                        });
1746                        continue;
1747                    }
1748                };
1749
1750                let mut env_vars = HashMap::new();
1751                env_vars.insert(SWARM_SUBTASK_PAYLOAD_ENV.to_string(), payload_b64);
1752                for key in K8S_PASSTHROUGH_ENV_VARS {
1753                    if let Ok(value) = std::env::var(key)
1754                        && !value.trim().is_empty()
1755                    {
1756                        env_vars.insert((*key).to_string(), value);
1757                    }
1758                }
1759                let mut labels = HashMap::new();
1760                labels.insert("codetether.run/swarm-id".to_string(), swarm_id.to_string());
1761                labels.insert(
1762                    "codetether.run/stage".to_string(),
1763                    subtask.stage.to_string(),
1764                );
1765
1766                let spec = SubagentPodSpec {
1767                    image: self.config.k8s_subagent_image.clone(),
1768                    env_vars,
1769                    labels,
1770                    command: Some(vec!["sh".to_string(), "-lc".to_string()]),
1771                    args: Some(vec![format!(
1772                        "exec codetether swarm-subagent --payload-env {payload_env}",
1773                        payload_env = SWARM_SUBTASK_PAYLOAD_ENV,
1774                    )]),
1775                };
1776
1777                if let Err(error) = k8s.spawn_subagent_pod_with_spec(&subtask.id, spec).await {
1778                    let error_text = format!("Failed to spawn Kubernetes pod: {error}");
1779                    tracing::error!(subtask_id = %subtask.id, error = %error, "K8s sub-agent pod spawn failed");
1780                    self.try_send_event(SwarmEvent::SubTaskUpdate {
1781                        id: subtask.id.clone(),
1782                        name: subtask.name.clone(),
1783                        status: SubTaskStatus::Failed,
1784                        agent_name: Some("k8s-spawn".to_string()),
1785                    });
1786                    self.try_send_event(SwarmEvent::AgentError {
1787                        subtask_id: subtask.id.clone(),
1788                        error: error_text.clone(),
1789                    });
1790                    results.push(SubTaskResult {
1791                        subtask_id: subtask.id.clone(),
1792                        subagent_id: format!("agent-{}", subtask.id),
1793                        success: false,
1794                        result: String::new(),
1795                        steps: 0,
1796                        tool_calls: 0,
1797                        execution_time_ms: 0,
1798                        error: Some(error_text),
1799                        artifacts: Vec::new(),
1800                        retry_count: 0,
1801                    });
1802                    continue;
1803                }
1804
1805                let branch = K8sManager::subagent_pod_name(&subtask.id);
1806                subtask_names.insert(subtask.id.clone(), subtask.name.clone());
1807                active.insert(
1808                    subtask.id.clone(),
1809                    ActiveK8sBranch {
1810                        branch: branch.clone(),
1811                        started_at: Instant::now(),
1812                    },
1813                );
1814
1815                self.try_send_event(SwarmEvent::SubTaskUpdate {
1816                    id: subtask.id.clone(),
1817                    name: subtask.name.clone(),
1818                    status: SubTaskStatus::Running,
1819                    agent_name: Some(format!("k8s-{branch}")),
1820                });
1821                self.try_send_event(SwarmEvent::AgentStarted {
1822                    subtask_id: subtask.id.clone(),
1823                    agent_name: format!("k8s-{branch}"),
1824                    specialty: subtask
1825                        .specialty
1826                        .clone()
1827                        .unwrap_or_else(|| "generalist".to_string()),
1828                });
1829
1830                tracing::info!(
1831                    subtask_id = %subtask.id,
1832                    pod = %branch,
1833                    "Spawned Kubernetes sub-agent pod"
1834                );
1835            }
1836
1837            if pending.is_empty() && active.is_empty() {
1838                break;
1839            }
1840
1841            tick.tick().await;
1842
1843            let active_ids: Vec<String> = active.keys().cloned().collect();
1844            let mut finished_results: Vec<SubTaskResult> = Vec::new();
1845            for subtask_id in active_ids {
1846                let Some(active_state) = active.get(&subtask_id).cloned() else {
1847                    continue;
1848                };
1849
1850                if active_state.started_at.elapsed()
1851                    > Duration::from_secs(self.config.subagent_timeout_secs)
1852                {
1853                    let reason = format!(
1854                        "Timed out after {}s in Kubernetes pod",
1855                        self.config.subagent_timeout_secs
1856                    );
1857                    kill_reasons.insert(subtask_id.clone(), reason.clone());
1858                    if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1859                        tracing::warn!(
1860                            subtask_id = %subtask_id,
1861                            error = %error,
1862                            "Failed deleting timed-out Kubernetes pod"
1863                        );
1864                    }
1865                    active.remove(&subtask_id);
1866                    finished_results.push(SubTaskResult {
1867                        subtask_id: subtask_id.clone(),
1868                        subagent_id: format!("agent-{subtask_id}"),
1869                        success: false,
1870                        result: String::new(),
1871                        steps: 0,
1872                        tool_calls: 0,
1873                        execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1874                        error: Some(reason),
1875                        artifacts: Vec::new(),
1876                        retry_count: 0,
1877                    });
1878                    continue;
1879                }
1880
1881                let pod_state = match k8s.get_subagent_pod_state(&subtask_id).await {
1882                    Ok(state) => state,
1883                    Err(error) => {
1884                        tracing::warn!(
1885                            subtask_id = %subtask_id,
1886                            error = %error,
1887                            "Failed to query Kubernetes pod state for sub-agent"
1888                        );
1889                        continue;
1890                    }
1891                };
1892                let Some(pod_state) = pod_state else {
1893                    active.remove(&subtask_id);
1894                    finished_results.push(SubTaskResult {
1895                        subtask_id: subtask_id.clone(),
1896                        subagent_id: format!("agent-{subtask_id}"),
1897                        success: false,
1898                        result: String::new(),
1899                        steps: 0,
1900                        tool_calls: 0,
1901                        execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1902                        error: Some("Sub-agent pod disappeared".to_string()),
1903                        artifacts: Vec::new(),
1904                        retry_count: 0,
1905                    });
1906                    continue;
1907                };
1908
1909                let phase = pod_state.phase.to_ascii_lowercase();
1910                let finished = pod_state.terminated || phase == "succeeded" || phase == "failed";
1911                if !finished {
1912                    continue;
1913                }
1914
1915                let logs = k8s
1916                    .subagent_logs(&subtask_id, 10_000)
1917                    .await
1918                    .unwrap_or_default();
1919                let elapsed_ms = active_state.started_at.elapsed().as_millis() as u64;
1920                let mut result = k8s_result::final_result(
1921                    &subtask_id,
1922                    elapsed_ms,
1923                    pod_state.exit_code,
1924                    pod_state.reason.as_deref(),
1925                    &logs,
1926                );
1927
1928                if let Some(reason) = kill_reasons.get(&subtask_id) {
1929                    result.success = false;
1930                    result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1931                    result.result.push_str(&format!(
1932                        "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1933                    ));
1934                }
1935
1936                active.remove(&subtask_id);
1937                if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1938                    tracing::warn!(
1939                        subtask_id = %subtask_id,
1940                        error = %error,
1941                        "Failed deleting completed Kubernetes pod"
1942                    );
1943                }
1944                finished_results.push(result);
1945            }
1946
1947            for result in finished_results {
1948                if result.success {
1949                    completed_results
1950                        .write()
1951                        .await
1952                        .insert(result.subtask_id.clone(), result.result.clone());
1953                }
1954                if result.success
1955                    && let Some(ref cache_arc) = self.cache
1956                {
1957                    let mut cache_guard = cache_arc.lock().await;
1958                    let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1959                    let _ = cache_guard.put(&cache_subtask, &result).await;
1960                }
1961
1962                self.try_send_event(SwarmEvent::SubTaskUpdate {
1963                    id: result.subtask_id.clone(),
1964                    name: subtask_names
1965                        .get(&result.subtask_id)
1966                        .cloned()
1967                        .unwrap_or_else(|| result.subtask_id.clone()),
1968                    status: if result.success {
1969                        SubTaskStatus::Completed
1970                    } else {
1971                        SubTaskStatus::Failed
1972                    },
1973                    agent_name: Some(format!("k8s-{}", result.subtask_id)),
1974                });
1975                if let Some(ref error) = result.error {
1976                    self.try_send_event(SwarmEvent::AgentError {
1977                        subtask_id: result.subtask_id.clone(),
1978                        error: error.clone(),
1979                    });
1980                }
1981                self.try_send_event(SwarmEvent::AgentOutput {
1982                    subtask_id: result.subtask_id.clone(),
1983                    output: result.result.clone(),
1984                });
1985                self.try_send_event(SwarmEvent::AgentComplete {
1986                    subtask_id: result.subtask_id.clone(),
1987                    success: result.success,
1988                    steps: result.steps,
1989                });
1990                results.push(result);
1991            }
1992
1993            if active.len() > 1 {
1994                let mut observations = Vec::with_capacity(active.len());
1995                for (subtask_id, state) in &active {
1996                    let pod_state = match k8s.get_subagent_pod_state(subtask_id).await {
1997                        Ok(state) => state,
1998                        Err(error) => {
1999                            tracing::warn!(
2000                                subtask_id = %subtask_id,
2001                                error = %error,
2002                                "Failed to query pod state while sampling branch observation"
2003                            );
2004                            None
2005                        }
2006                    };
2007                    let (resource_health_score, infra_unhealthy_signals) =
2008                        compute_resource_health(pod_state.as_ref());
2009
2010                    let logs = k8s.subagent_logs(subtask_id, 500).await.unwrap_or_default();
2011                    if let Some(probe) = latest_probe_from_logs(&logs) {
2012                        let compile_ok = pod_state
2013                            .as_ref()
2014                            .map(|p| probe.compile_ok && !p.phase.eq_ignore_ascii_case("failed"))
2015                            .unwrap_or(probe.compile_ok);
2016                        observations.push(BranchObservation {
2017                            subtask_id: subtask_id.clone(),
2018                            branch: state.branch.clone(),
2019                            compile_ok,
2020                            changed_files: probe_changed_files_set(&probe),
2021                            changed_lines: probe.changed_lines,
2022                            resource_health_score,
2023                            infra_unhealthy_signals,
2024                        });
2025                        continue;
2026                    }
2027                    let compile_ok = pod_state
2028                        .as_ref()
2029                        .map(|p| !p.phase.eq_ignore_ascii_case("failed"))
2030                        .unwrap_or(false);
2031                    observations.push(BranchObservation {
2032                        subtask_id: subtask_id.clone(),
2033                        branch: state.branch.clone(),
2034                        compile_ok,
2035                        changed_files: std::collections::HashSet::new(),
2036                        changed_lines: 0,
2037                        resource_health_score,
2038                        infra_unhealthy_signals,
2039                    });
2040                }
2041
2042                let tick = collapse_controller.sample_observations(&observations);
2043                if promoted_subtask_id != tick.promoted_subtask_id {
2044                    promoted_subtask_id = tick.promoted_subtask_id.clone();
2045                    if let Some(ref promoted) = promoted_subtask_id {
2046                        tracing::info!(subtask_id = %promoted, "Collapse controller promoted branch");
2047                        if let Some(audit) = crate::audit::try_audit_log() {
2048                            audit
2049                                .log_with_correlation(
2050                                    crate::audit::AuditCategory::Swarm,
2051                                    "collapse_promote_branch",
2052                                    crate::audit::AuditOutcome::Success,
2053                                    Some("collapse-controller".to_string()),
2054                                    Some(serde_json::json!({
2055                                        "swarm_id": swarm_id,
2056                                        "subtask_id": promoted,
2057                                        "execution_mode": "kubernetes_pod",
2058                                    })),
2059                                    None,
2060                                    None,
2061                                    Some(swarm_id.to_string()),
2062                                    None,
2063                                )
2064                                .await;
2065                        }
2066                    }
2067                }
2068
2069                for kill in tick.kills {
2070                    if kill_reasons.contains_key(&kill.subtask_id) {
2071                        continue;
2072                    }
2073                    if !active.contains_key(&kill.subtask_id) {
2074                        continue;
2075                    }
2076
2077                    if let Err(error) = k8s.delete_subagent_pod(&kill.subtask_id).await {
2078                        tracing::warn!(
2079                            subtask_id = %kill.subtask_id,
2080                            error = %error,
2081                            "Failed deleting Kubernetes pod after collapse kill"
2082                        );
2083                    }
2084                    kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
2085                    let elapsed_ms = active
2086                        .remove(&kill.subtask_id)
2087                        .map(|s| s.started_at.elapsed().as_millis() as u64)
2088                        .unwrap_or(0);
2089
2090                    tracing::warn!(
2091                        subtask_id = %kill.subtask_id,
2092                        branch = %kill.branch,
2093                        reason = %kill.reason,
2094                        "Collapse controller killed Kubernetes branch"
2095                    );
2096
2097                    if let Some(audit) = crate::audit::try_audit_log() {
2098                        audit
2099                            .log_with_correlation(
2100                                crate::audit::AuditCategory::Swarm,
2101                                "collapse_kill_branch",
2102                                crate::audit::AuditOutcome::Success,
2103                                Some("collapse-controller".to_string()),
2104                                Some(serde_json::json!({
2105                                    "swarm_id": swarm_id,
2106                                    "subtask_id": kill.subtask_id.clone(),
2107                                    "branch": kill.branch.clone(),
2108                                    "reason": kill.reason.clone(),
2109                                    "execution_mode": "kubernetes_pod",
2110                                })),
2111                                None,
2112                                None,
2113                                Some(swarm_id.to_string()),
2114                                None,
2115                            )
2116                            .await;
2117                    }
2118
2119                    self.try_send_event(SwarmEvent::SubTaskUpdate {
2120                        id: kill.subtask_id.clone(),
2121                        name: subtask_names
2122                            .get(&kill.subtask_id)
2123                            .cloned()
2124                            .unwrap_or_else(|| kill.subtask_id.clone()),
2125                        status: SubTaskStatus::Cancelled,
2126                        agent_name: Some(format!("agent-{}", kill.subtask_id)),
2127                    });
2128                    self.try_send_event(SwarmEvent::AgentError {
2129                        subtask_id: kill.subtask_id.clone(),
2130                        error: format!("Cancelled by collapse controller: {}", kill.reason),
2131                    });
2132
2133                    results.push(SubTaskResult {
2134                        subtask_id: kill.subtask_id.clone(),
2135                        subagent_id: format!("agent-{}", kill.subtask_id),
2136                        success: false,
2137                        result: format!(
2138                            "\n\n--- Collapse Controller ---\nBranch terminated: {}",
2139                            kill.reason
2140                        ),
2141                        steps: 0,
2142                        tool_calls: 0,
2143                        execution_time_ms: elapsed_ms,
2144                        error: Some(format!("Cancelled by collapse controller: {}", kill.reason)),
2145                        artifacts: Vec::new(),
2146                        retry_count: 0,
2147                    });
2148                }
2149            }
2150        }
2151
2152        if let Some(ref promoted) = promoted_subtask_id {
2153            results.sort_by_key(|result| {
2154                if &result.subtask_id == promoted {
2155                    0usize
2156                } else {
2157                    1usize
2158                }
2159            });
2160        }
2161
2162        if !active.is_empty() {
2163            let residual_ids: Vec<String> = active.keys().cloned().collect();
2164            for subtask_id in residual_ids {
2165                if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
2166                    tracing::warn!(
2167                        subtask_id = %subtask_id,
2168                        error = %error,
2169                        "Failed deleting residual Kubernetes pod at stage end"
2170                    );
2171                }
2172            }
2173        }
2174
2175        Ok(results)
2176    }
2177
2178    /// Aggregate results from all subtasks into a final result
2179    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
2180        let mut aggregated = String::new();
2181
2182        for (i, result) in results.iter().enumerate() {
2183            if result.success {
2184                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
2185            } else {
2186                aggregated.push_str(&format!(
2187                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
2188                    i + 1,
2189                    result.error.as_deref().unwrap_or("Unknown error")
2190                ));
2191            }
2192        }
2193
2194        Ok(aggregated)
2195    }
2196
2197    /// Execute a single task without decomposition (for simple cases)
2198    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
2199        self.execute(task, DecompositionStrategy::None).await
2200    }
2201}
2202
2203/// Builder for swarm execution
2204pub struct SwarmExecutorBuilder {
2205    config: SwarmConfig,
2206}
2207
2208impl SwarmExecutorBuilder {
2209    pub fn new() -> Self {
2210        Self {
2211            config: SwarmConfig::default(),
2212        }
2213    }
2214
2215    pub fn max_subagents(mut self, max: usize) -> Self {
2216        self.config.max_subagents = max;
2217        self
2218    }
2219
2220    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
2221        self.config.max_steps_per_subagent = max;
2222        self
2223    }
2224
2225    pub fn max_total_steps(mut self, max: usize) -> Self {
2226        self.config.max_total_steps = max;
2227        self
2228    }
2229
2230    pub fn timeout_secs(mut self, secs: u64) -> Self {
2231        self.config.subagent_timeout_secs = secs;
2232        self
2233    }
2234
2235    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
2236        self.config.parallel_enabled = enabled;
2237        self
2238    }
2239
2240    pub fn build(self) -> SwarmExecutor {
2241        SwarmExecutor::new(self.config)
2242    }
2243}
2244
2245impl Default for SwarmExecutorBuilder {
2246    fn default() -> Self {
2247        Self::new()
2248    }
2249}
2250
2251/// Bounded-concurrency parallelism for `git worktree add`. Higher
2252/// values reduce wall time at the cost of contention on the
2253/// `.git/worktrees` lock. 8 is empirically a sweet spot — past 8,
2254/// throughput plateaus and flaky lock errors increase.
2255const WORKTREE_CREATE_PARALLELISM: usize = 8;
2256
2257fn required_worktree_failure(subtask: &SubTask) -> SubTaskResult {
2258    let error = "Failed to create required worktree for mutating subtask".to_string();
2259    SubTaskResult {
2260        subtask_id: subtask.id.clone(),
2261        subagent_id: "worktree-preflight".to_string(),
2262        success: false,
2263        result: error.clone(),
2264        steps: 0,
2265        tool_calls: 0,
2266        execution_time_ms: 0,
2267        error: Some(error),
2268        artifacts: Vec::new(),
2269        retry_count: 0,
2270    }
2271}
2272
2273/// Create worktrees for a batch of subtask IDs in parallel.
2274///
2275/// Each creation is bounded-concurrent via [`WORKTREE_CREATE_PARALLELISM`].
2276/// Failures are logged and dropped from the returned map — callers
2277/// fall back to the shared working directory for those subtasks.
2278///
2279/// This replaces the previous serial `for … mgr.create(slug).await` loop
2280/// inside `execute_stage`, which blocked all subsequent agents from
2281/// starting until every worktree was provisioned. For an N=26 swarm on
2282/// a mid-size repo that alone saved roughly 30–40 seconds of startup.
2283async fn precreate_worktrees(
2284    mgr: Arc<WorktreeManager>,
2285    subtask_ids: Vec<String>,
2286) -> HashMap<String, WorktreeInfo> {
2287    stream::iter(subtask_ids.into_iter().map(|sid| {
2288        let mgr = Arc::clone(&mgr);
2289        async move {
2290            let slug = sid.replace("-", "_");
2291            match mgr.create(&slug).await {
2292                Ok(wt) => {
2293                    if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
2294                        tracing::warn!(
2295                            subtask_id = %sid,
2296                            error = %e,
2297                            "Failed to inject workspace stub into worktree"
2298                        );
2299                    }
2300                    tracing::info!(
2301                        subtask_id = %sid,
2302                        worktree_path = %wt.path.display(),
2303                        worktree_branch = %wt.branch,
2304                        "Created worktree for sub-agent"
2305                    );
2306                    Some((sid, wt))
2307                }
2308                Err(e) => {
2309                    tracing::warn!(
2310                        subtask_id = %sid,
2311                        error = %e,
2312                        "Failed to create worktree, sub-agent will use shared directory"
2313                    );
2314                    None
2315                }
2316            }
2317        }
2318    }))
2319    .buffer_unordered(WORKTREE_CREATE_PARALLELISM)
2320    .filter_map(|x| async move { x })
2321    .collect()
2322    .await
2323}
2324
2325/// Run the agentic loop for a sub-agent with tool execution
2326#[allow(clippy::too_many_arguments)]
2327/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
2328
2329pub async fn run_agent_loop(
2330    provider: Arc<dyn Provider>,
2331    model: &str,
2332    system_prompt: &str,
2333    user_prompt: &str,
2334    tools: Vec<crate::provider::ToolDefinition>,
2335    registry: Arc<ToolRegistry>,
2336    max_steps: usize,
2337    timeout_secs: u64,
2338    event_tx: Option<mpsc::Sender<SwarmEvent>>,
2339    subtask_id: String,
2340    bus: Option<Arc<AgentBus>>,
2341    working_dir: Option<std::path::PathBuf>,
2342) -> Result<(String, usize, usize, AgentLoopExit)> {
2343    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
2344    let temperature = 0.7;
2345
2346    tracing::info!(
2347        model = %model,
2348        max_steps = max_steps,
2349        timeout_secs = timeout_secs,
2350        "Sub-agent starting agentic loop"
2351    );
2352    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
2353    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
2354
2355    // Initialize conversation with system and user messages
2356    let mut messages = vec![
2357        Message {
2358            role: Role::System,
2359            content: vec![ContentPart::Text {
2360                text: system_prompt.to_string(),
2361            }],
2362        },
2363        Message {
2364            role: Role::User,
2365            content: vec![ContentPart::Text {
2366                text: user_prompt.to_string(),
2367            }],
2368        },
2369    ];
2370
2371    let mut steps = 0;
2372    let mut total_tool_calls = 0;
2373    let mut final_output = String::new();
2374
2375    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
2376
2377    let exit_reason = loop {
2378        if steps >= max_steps {
2379            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
2380            break AgentLoopExit::MaxStepsReached;
2381        }
2382
2383        if Instant::now() > deadline {
2384            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
2385            break AgentLoopExit::TimedOut;
2386        }
2387
2388        steps += 1;
2389        tracing::info!(step = steps, "Sub-agent step starting");
2390
2391        // Check context size and truncate if approaching limit
2392        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
2393
2394        let request = CompletionRequest {
2395            messages: messages.clone(),
2396            tools: tools.clone(),
2397            model: model.to_string(),
2398            temperature: Some(temperature),
2399            top_p: None,
2400            max_tokens: Some(8192),
2401            stop: Vec::new(),
2402        };
2403
2404        let step_start = Instant::now();
2405        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
2406        let step_duration = step_start.elapsed();
2407
2408        tracing::info!(
2409            step = steps,
2410            duration_ms = step_duration.as_millis() as u64,
2411            finish_reason = ?response.finish_reason,
2412            prompt_tokens = response.usage.prompt_tokens,
2413            completion_tokens = response.usage.completion_tokens,
2414            "Sub-agent step completed LLM call"
2415        );
2416
2417        // Extract text from response
2418        let mut text_parts = Vec::new();
2419        let mut thinking_parts = Vec::new();
2420        let mut tool_calls = Vec::new();
2421
2422        for part in &response.message.content {
2423            match part {
2424                ContentPart::Text { text } => {
2425                    text_parts.push(text.clone());
2426                }
2427                ContentPart::Thinking { text } if !text.is_empty() => {
2428                    thinking_parts.push(text.clone());
2429                }
2430                ContentPart::ToolCall {
2431                    id,
2432                    name,
2433                    arguments,
2434                    ..
2435                } => {
2436                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
2437                }
2438                _ => {}
2439            }
2440        }
2441
2442        // Publish thinking/reasoning to bus for training data capture
2443        if !thinking_parts.is_empty()
2444            && let Some(ref bus) = bus
2445        {
2446            let thinking_text = thinking_parts.join("\n");
2447            let handle = bus.handle(&subtask_id);
2448            handle.send(
2449                format!("agent.{subtask_id}.thinking"),
2450                BusMessage::AgentThinking {
2451                    agent_id: subtask_id.clone(),
2452                    thinking: crate::swarm::live_bus::thinking(&thinking_text),
2453                    step: steps,
2454                },
2455            );
2456        }
2457
2458        // Log assistant output
2459        if !text_parts.is_empty() {
2460            let step_output = text_parts.join("\n");
2461            if !final_output.is_empty() {
2462                final_output.push('\n');
2463            }
2464            final_output.push_str(&step_output);
2465            tracing::info!(
2466                step = steps,
2467                output_len = final_output.len(),
2468                "Sub-agent text output"
2469            );
2470            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
2471
2472            // Emit assistant message event for TUI detail view
2473            if let Some(ref tx) = event_tx {
2474                let preview = if step_output.len() > 500 {
2475                    let mut end = 500;
2476                    while end > 0 && !step_output.is_char_boundary(end) {
2477                        end -= 1;
2478                    }
2479                    format!("{}...", &step_output[..end])
2480                } else {
2481                    step_output.clone()
2482                };
2483                let _ = tx.try_send(SwarmEvent::AgentMessage {
2484                    subtask_id: subtask_id.clone(),
2485                    entry: AgentMessageEntry {
2486                        role: "assistant".to_string(),
2487                        content: preview,
2488                        is_tool_call: false,
2489                    },
2490                });
2491            }
2492        }
2493
2494        // Log tool calls
2495        if !tool_calls.is_empty() {
2496            tracing::info!(
2497                step = steps,
2498                num_tool_calls = tool_calls.len(),
2499                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
2500                "Sub-agent requesting tool calls"
2501            );
2502        }
2503
2504        // Add assistant message to history
2505        messages.push(response.message.clone());
2506
2507        // If no tool calls or stop, we're done
2508        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
2509            tracing::info!(
2510                steps = steps,
2511                total_tool_calls = total_tool_calls,
2512                "Sub-agent finished"
2513            );
2514            break AgentLoopExit::Completed;
2515        }
2516
2517        // Execute tool calls
2518        let mut tool_results = Vec::new();
2519
2520        for (call_id, tool_name, arguments) in tool_calls {
2521            total_tool_calls += 1;
2522
2523            // Emit tool call event
2524            if let Some(ref tx) = event_tx {
2525                let _ = tx.try_send(SwarmEvent::AgentToolCall {
2526                    subtask_id: subtask_id.clone(),
2527                    tool_name: tool_name.clone(),
2528                });
2529            }
2530
2531            tracing::info!(
2532                step = steps,
2533                tool_call_id = %call_id,
2534                tool = %tool_name,
2535                "Executing tool"
2536            );
2537            tracing::debug!(
2538                tool = %tool_name,
2539                arguments = %arguments,
2540                "Tool call arguments"
2541            );
2542
2543            let tool_start = Instant::now();
2544            let mut tool_success = true;
2545            let result = if let Some(tool) = registry.get(&tool_name) {
2546                // Parse arguments as JSON
2547                let mut args: serde_json::Value =
2548                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
2549                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
2550                        serde_json::json!({})
2551                    });
2552
2553                // Normalize and enforce worktree filesystem paths before tool execution.
2554                let path_denial = working_dir.as_ref().and_then(|wd| {
2555                    path_guard::normalize_tool_args(&tool_name, &mut args, wd).err()
2556                });
2557                if let Some(e) = path_denial {
2558                    tool_success = false;
2559                    tracing::warn!(tool = %tool_name, error = %e, "Tool path policy denied");
2560                    format!("Tool path policy denied: {e}")
2561                } else {
2562                    let agent_name = format!("agent-{subtask_id}");
2563                    let provenance =
2564                        ExecutionProvenance::for_operation(&agent_name, ExecutionOrigin::Swarm);
2565                    args = enrich_tool_input_with_runtime_context(
2566                        &args,
2567                        working_dir
2568                            .as_deref()
2569                            .unwrap_or_else(|| std::path::Path::new(".")),
2570                        Some(model),
2571                        &subtask_id,
2572                        &agent_name,
2573                        Some(&provenance),
2574                    );
2575
2576                    match tool.execute(args).await {
2577                        Ok(r) => {
2578                            if r.success {
2579                                tracing::info!(
2580                                    tool = %tool_name,
2581                                    duration_ms = tool_start.elapsed().as_millis() as u64,
2582                                    success = true,
2583                                    "Tool execution completed"
2584                                );
2585                                r.output
2586                            } else {
2587                                tool_success = false;
2588                                tracing::warn!(
2589                                    tool = %tool_name,
2590                                    error = %r.output,
2591                                    "Tool returned error"
2592                                );
2593                                format!("Tool error: {}", r.output)
2594                            }
2595                        }
2596                        Err(e) => {
2597                            tool_success = false;
2598                            tracing::error!(
2599                                tool = %tool_name,
2600                                error = %e,
2601                                "Tool execution failed"
2602                            );
2603                            format!("Tool execution failed: {}", e)
2604                        }
2605                    }
2606                }
2607            } else {
2608                tool_success = false;
2609                tracing::error!(tool = %tool_name, "Unknown tool requested");
2610                format!("Unknown tool: {}", tool_name)
2611            };
2612
2613            // Emit detailed tool call event
2614            if let Some(ref tx) = event_tx {
2615                let input_preview = if arguments.len() > 200 {
2616                    let mut end = 200;
2617                    while end > 0 && !arguments.is_char_boundary(end) {
2618                        end -= 1;
2619                    }
2620                    format!("{}...", &arguments[..end])
2621                } else {
2622                    arguments.clone()
2623                };
2624                let output_preview = if result.len() > 500 {
2625                    let mut end = 500;
2626                    while end > 0 && !result.is_char_boundary(end) {
2627                        end -= 1;
2628                    }
2629                    format!("{}...", &result[..end])
2630                } else {
2631                    result.clone()
2632                };
2633                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
2634                    subtask_id: subtask_id.clone(),
2635                    detail: AgentToolCallDetail {
2636                        tool_name: tool_name.clone(),
2637                        input_preview,
2638                        output_preview,
2639                        success: tool_success,
2640                    },
2641                });
2642            }
2643
2644            tracing::debug!(
2645                tool = %tool_name,
2646                result_len = result.len(),
2647                "Tool result"
2648            );
2649
2650            // Publish full, untruncated tool output to the bus so other
2651            // agents and the bus log see the complete result.
2652            if let Some(ref bus) = bus {
2653                let handle = bus.handle(&subtask_id);
2654                handle.send(
2655                    format!("tools.{tool_name}"),
2656                    BusMessage::ToolOutputFull {
2657                        agent_id: subtask_id.clone(),
2658                        tool_name: tool_name.clone(),
2659                        output: crate::swarm::live_bus::tool_output(&result),
2660                        success: tool_success,
2661                        step: steps,
2662                    },
2663                );
2664            }
2665
2666            // Process large results with RLM or truncate smaller ones
2667            let result = if result.len() > RLM_THRESHOLD_CHARS {
2668                // Use RLM for very large results
2669                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
2670                    .await
2671            } else {
2672                // Simple truncation for medium results
2673                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
2674            };
2675
2676            tool_results.push((call_id, tool_name, result));
2677        }
2678
2679        // Add tool results to conversation
2680        for (call_id, _tool_name, result) in tool_results {
2681            messages.push(Message {
2682                role: Role::Tool,
2683                content: vec![ContentPart::ToolResult {
2684                    tool_call_id: call_id,
2685                    content: result,
2686                }],
2687            });
2688        }
2689
2690        // Reset deadline after each successful step — agent is making progress
2691        deadline = Instant::now() + Duration::from_secs(timeout_secs);
2692    };
2693
2694    Ok((final_output, steps, total_tool_calls, exit_reason))
2695}