Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    BranchObservation, BranchRuntimeState, CacheConfig, CacheStats, CollapseController,
8    CollapsePolicy, DecompositionStrategy, ExecutionMode, StageStats, SwarmCache, SwarmConfig,
9    SwarmResult,
10    kubernetes_executor::{
11        RemoteSubtaskPayload, SWARM_SUBTASK_PAYLOAD_ENV, encode_payload, latest_probe_from_logs,
12        probe_changed_files_set, result_from_logs,
13    },
14    orchestrator::Orchestrator,
15    result_store::ResultStore,
16    subtask::{SubTask, SubTaskResult, SubTaskStatus},
17};
18use crate::bus::{AgentBus, BusMessage};
19use crate::k8s::{K8sManager, SubagentPodSpec, SubagentPodState};
20use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
21
22// Re-export swarm types for convenience
23pub use super::SwarmMessage;
24use crate::{
25    agent::Agent,
26    provenance::{ExecutionOrigin, ExecutionProvenance},
27    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
28    rlm::RlmExecutor,
29    session::helper::runtime::enrich_tool_input_with_runtime_context,
30    swarm::{SwarmArtifact, SwarmStats},
31    telemetry::SwarmTelemetryCollector,
32    tool::ToolRegistry,
33    worktree::{WorktreeInfo, WorktreeManager},
34};
35use anyhow::Result;
36use futures::stream::{self, FuturesUnordered, StreamExt};
37use std::collections::{HashMap, VecDeque};
38use std::sync::Arc;
39use std::time::Instant;
40use tokio::sync::{RwLock, mpsc};
41use tokio::task::AbortHandle;
42use tokio::time::{Duration, MissedTickBehavior, timeout};
43
44/// Default context limit (256k tokens - conservative for most models)
45const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
46
47/// Reserve tokens for the response generation
48const RESPONSE_RESERVE_TOKENS: usize = 8_192;
49
50/// Safety margin before we start truncating (90% of limit)
51const TRUNCATION_THRESHOLD: f64 = 0.85;
52
53/// Estimate token count from text (rough heuristic: ~4 chars per token)
54fn estimate_tokens(text: &str) -> usize {
55    // This is a rough estimate - actual tokenization varies by model
56    // Most tokenizers average 3-5 chars per token for English text
57    // We use 3.5 to be conservative
58    (text.len() as f64 / 3.5).ceil() as usize
59}
60
61/// Estimate total tokens in a message
62fn estimate_message_tokens(message: &Message) -> usize {
63    let mut tokens = 4; // Role overhead
64
65    for part in &message.content {
66        tokens += match part {
67            ContentPart::Text { text } => estimate_tokens(text),
68            ContentPart::ToolCall {
69                id,
70                name,
71                arguments,
72                ..
73            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
74            ContentPart::ToolResult {
75                tool_call_id,
76                content,
77            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
78            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
79            ContentPart::Thinking { text } => estimate_tokens(text),
80        };
81    }
82
83    tokens
84}
85
86/// Estimate total tokens in all messages
87fn estimate_total_tokens(messages: &[Message]) -> usize {
88    messages.iter().map(estimate_message_tokens).sum()
89}
90
91/// Truncate messages to fit within context limit
92///
93/// Strategy:
94/// 1. First, aggressively truncate large tool results
95/// 2. Keep system message (first) and user message (second) always
96/// 3. Keep the most recent assistant + tool result pairs together
97/// 4. Drop oldest middle messages in matched pairs
98fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
99    let target_tokens =
100        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
101
102    let current_tokens = estimate_total_tokens(messages);
103    if current_tokens <= target_tokens {
104        return;
105    }
106
107    tracing::warn!(
108        current_tokens = current_tokens,
109        target_tokens = target_tokens,
110        context_limit = context_limit,
111        "Context approaching limit, truncating conversation history"
112    );
113
114    // FIRST: Aggressively truncate large tool results (this is the main culprit)
115    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
116
117    let after_tool_truncation = estimate_total_tokens(messages);
118    if after_tool_truncation <= target_tokens {
119        tracing::info!(
120            old_tokens = current_tokens,
121            new_tokens = after_tool_truncation,
122            "Truncated large tool results, context now within limits"
123        );
124        return;
125    }
126
127    // Minimum: keep first 2 (system + initial user) and last 4 messages
128    if messages.len() <= 6 {
129        tracing::warn!(
130            tokens = after_tool_truncation,
131            target = target_tokens,
132            "Cannot truncate further - conversation too short"
133        );
134        return;
135    }
136
137    // Remove messages from the middle, keeping first 2 and last 4
138    // But we need to remove in pairs to maintain tool_call_id references
139    let keep_start = 2;
140    let keep_end = 4;
141    let removable_count = messages.len() - keep_start - keep_end;
142
143    if removable_count == 0 {
144        return;
145    }
146
147    // Remove all middle messages
148    let removed_messages: Vec<_> = messages
149        .drain(keep_start..keep_start + removable_count)
150        .collect();
151    let summary = summarize_removed_messages(&removed_messages);
152
153    // Insert a summary message where we removed content
154    messages.insert(
155        keep_start,
156        Message {
157            role: Role::User,
158            content: vec![ContentPart::Text {
159                text: format!(
160                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
161                    removed_messages.len(),
162                    summary
163                ),
164            }],
165        },
166    );
167
168    let new_tokens = estimate_total_tokens(messages);
169    tracing::info!(
170        removed_messages = removed_messages.len(),
171        old_tokens = current_tokens,
172        new_tokens = new_tokens,
173        "Truncated conversation history"
174    );
175}
176
177/// Summarize removed messages for context preservation
178fn summarize_removed_messages(messages: &[Message]) -> String {
179    let mut summary = String::new();
180    let mut tool_calls: Vec<String> = Vec::new();
181
182    for msg in messages {
183        for part in &msg.content {
184            if let ContentPart::ToolCall { name, .. } = part
185                && !tool_calls.contains(name)
186            {
187                tool_calls.push(name.clone());
188            }
189        }
190    }
191
192    if !tool_calls.is_empty() {
193        summary.push_str(&format!(
194            "Tools used in truncated history: {}",
195            tool_calls.join(", ")
196        ));
197    }
198
199    summary
200}
201
202/// Truncate large tool results that exceed a reasonable size
203fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
204    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
205    let mut truncated_count = 0;
206    let mut saved_tokens = 0usize;
207
208    for message in messages.iter_mut() {
209        for part in message.content.iter_mut() {
210            if let ContentPart::ToolResult { content, .. } = part {
211                let tokens = estimate_tokens(content);
212                if tokens > max_tokens_per_result {
213                    let old_len = content.len();
214                    *content = truncate_single_result(content, char_limit);
215                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
216                    if content.len() < old_len {
217                        truncated_count += 1;
218                    }
219                }
220            }
221        }
222    }
223
224    if truncated_count > 0 {
225        tracing::info!(
226            truncated_count = truncated_count,
227            saved_tokens = saved_tokens,
228            max_tokens_per_result = max_tokens_per_result,
229            "Truncated large tool results"
230        );
231    }
232}
233
234/// Truncate a single result string to a maximum character limit
235fn truncate_single_result(content: &str, max_chars: usize) -> String {
236    if content.len() <= max_chars {
237        return content.to_string();
238    }
239
240    // Find a valid char boundary at or before max_chars
241    let safe_limit = {
242        let mut limit = max_chars.min(content.len());
243        while limit > 0 && !content.is_char_boundary(limit) {
244            limit -= 1;
245        }
246        limit
247    };
248
249    // Try to find a good break point (newline) near the limit
250    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
251
252    let truncated = format!(
253        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
254        &content[..break_point],
255        content.len(),
256        break_point
257    );
258
259    tracing::debug!(
260        original_len = content.len(),
261        truncated_len = truncated.len(),
262        "Truncated large result"
263    );
264
265    truncated
266}
267
268/// Threshold for when to use RLM vs truncation (in characters)
269const RLM_THRESHOLD_CHARS: usize = 50_000;
270
271/// Max chars for simple truncation (below RLM threshold)
272const SIMPLE_TRUNCATE_CHARS: usize = 6000;
273
274/// Collapse controller sampling interval for branch health.
275const COLLAPSE_SAMPLE_SECS: u64 = 5;
276const SWARM_FALLBACK_PROMPT_ENV: &str = "CODETETHER_SWARM_FALLBACK_PROMPT";
277const SWARM_FALLBACK_MODEL_ENV: &str = "CODETETHER_SWARM_FALLBACK_MODEL";
278const K8S_PASSTHROUGH_ENV_VARS: &[&str] = &[
279    "VAULT_ADDR",
280    "VAULT_TOKEN",
281    "VAULT_MOUNT",
282    "VAULT_SECRETS_PATH",
283    "VAULT_NAMESPACE",
284    "CODETETHER_AUTH_TOKEN",
285];
286
287#[derive(Debug, Clone)]
288struct ActiveK8sBranch {
289    branch: String,
290    started_at: Instant,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294pub enum AgentLoopExit {
295    Completed,
296    MaxStepsReached,
297    TimedOut,
298}
299
300/// Calculate the delay for exponential backoff
301///
302/// Uses the formula: min(initial_delay * multiplier^attempt, max_delay)
303fn calculate_backoff_delay(
304    attempt: u32,
305    initial_delay_ms: u64,
306    max_delay_ms: u64,
307    multiplier: f64,
308) -> Duration {
309    let delay_ms =
310        (initial_delay_ms as f64 * multiplier.powi(attempt as i32)).min(max_delay_ms as f64);
311    Duration::from_millis(delay_ms as u64)
312}
313
314fn compute_resource_health(pod_state: Option<&SubagentPodState>) -> (f32, u32) {
315    let Some(pod_state) = pod_state else {
316        return (0.2, 1);
317    };
318
319    let reason = pod_state
320        .reason
321        .as_deref()
322        .unwrap_or_default()
323        .to_ascii_lowercase();
324    let phase = pod_state.phase.to_ascii_lowercase();
325
326    if reason.contains("oomkilled") {
327        return (0.0, 3);
328    }
329    if reason.contains("imagepullbackoff") || reason.contains("errimagepull") {
330        return (0.0, 3);
331    }
332    if reason.contains("crashloopbackoff") {
333        return (0.1, 2);
334    }
335    if phase == "failed" {
336        return (0.1, 2);
337    }
338
339    let mut score = 1.0f32;
340    let mut unhealthy_signals = 0u32;
341
342    if !pod_state.ready {
343        score -= 0.2;
344    }
345    if !reason.is_empty() {
346        score -= 0.3;
347        unhealthy_signals += 1;
348    }
349    if pod_state.restart_count > 0 {
350        score -= (pod_state.restart_count.min(3) as f32) * 0.2;
351        unhealthy_signals += 1;
352    }
353
354    (score.clamp(0.0, 1.0), unhealthy_signals)
355}
356
357/// Process a large tool result using RLM to intelligently summarize it
358async fn process_large_result_with_rlm(
359    content: &str,
360    tool_name: &str,
361    provider: Arc<dyn Provider>,
362    model: &str,
363) -> String {
364    if content.len() <= SIMPLE_TRUNCATE_CHARS {
365        return content.to_string();
366    }
367
368    // For medium-sized content, just truncate
369    if content.len() <= RLM_THRESHOLD_CHARS {
370        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
371    }
372
373    // For very large content, use RLM to intelligently summarize
374    tracing::info!(
375        tool = %tool_name,
376        content_len = content.len(),
377        "Using RLM to process large tool result"
378    );
379
380    let query = format!(
381        "Summarize the key information from this {} output. \
382         Focus on: errors, warnings, important findings, and actionable items. \
383         Be concise but thorough.",
384        tool_name
385    );
386
387    let mut executor =
388        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
389
390    match executor.analyze(&query).await {
391        Ok(result) => {
392            let bounded_answer = truncate_single_result(&result.answer, SIMPLE_TRUNCATE_CHARS * 2);
393            tracing::info!(
394                tool = %tool_name,
395                original_len = content.len(),
396                summary_len = bounded_answer.len(),
397                iterations = result.iterations,
398                "RLM summarized large result"
399            );
400
401            format!(
402                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
403                tool_name,
404                content.len(),
405                bounded_answer.len(),
406                bounded_answer
407            )
408        }
409        Err(e) => {
410            tracing::warn!(
411                tool = %tool_name,
412                error = %e,
413                "RLM analysis failed, falling back to truncation"
414            );
415            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
416        }
417    }
418}
419
420/// The swarm executor runs subtasks in parallel
421pub struct SwarmExecutor {
422    config: SwarmConfig,
423    /// Optional agent for handling swarm-level coordination (reserved for future use)
424    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
425    /// Optional event channel for TUI real-time updates
426    event_tx: Option<mpsc::Sender<SwarmEvent>>,
427    /// Telemetry collector for swarm execution metrics
428    telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
429    /// Cache for avoiding duplicate subtask execution
430    cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
431    /// Shared result store for sub-agent result sharing
432    result_store: Arc<ResultStore>,
433    /// Optional agent bus for inter-agent communication
434    bus: Option<Arc<AgentBus>>,
435}
436
437impl SwarmExecutor {
438    /// Create a new executor
439    pub fn new(config: SwarmConfig) -> Self {
440        Self {
441            config,
442            coordinator_agent: None,
443            event_tx: None,
444            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
445            cache: None,
446            result_store: ResultStore::new_arc(),
447            bus: None,
448        }
449    }
450
451    /// Create a new executor with caching enabled
452    pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
453        let cache = SwarmCache::new(cache_config).await?;
454        Ok(Self {
455            config,
456            coordinator_agent: None,
457            event_tx: None,
458            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
459            cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
460            result_store: ResultStore::new_arc(),
461            bus: None,
462        })
463    }
464
465    /// Set a pre-initialized cache
466    pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
467        self.cache = Some(cache);
468        self
469    }
470
471    /// Set an agent bus for inter-agent communication
472    pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
473        self.bus = Some(bus);
474        self
475    }
476
477    /// Get the agent bus if set
478    pub fn bus(&self) -> Option<&Arc<AgentBus>> {
479        self.bus.as_ref()
480    }
481
482    /// Set an event channel for real-time TUI updates
483    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
484        self.event_tx = Some(tx);
485        self
486    }
487
488    /// Set a coordinator agent for swarm-level coordination
489    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
490        tracing::debug!("Setting coordinator agent for swarm execution");
491        self.coordinator_agent = Some(agent);
492        self
493    }
494
495    /// Set a telemetry collector for swarm execution metrics
496    pub fn with_telemetry(
497        mut self,
498        telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
499    ) -> Self {
500        self.telemetry = telemetry;
501        self
502    }
503
504    /// Get the telemetry collector as an Arc
505    pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
506        Arc::clone(&self.telemetry)
507    }
508    /// Get the coordinator agent if set
509    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
510        tracing::debug!(
511            has_coordinator = self.coordinator_agent.is_some(),
512            "Getting coordinator agent"
513        );
514        self.coordinator_agent.as_ref()
515    }
516
517    /// Get the shared result store
518    pub fn result_store(&self) -> &Arc<ResultStore> {
519        &self.result_store
520    }
521
522    /// Get cache statistics if caching is enabled
523    pub async fn cache_stats(&self) -> Option<CacheStats> {
524        if let Some(ref cache) = self.cache {
525            let cache_guard = cache.lock().await;
526            Some(cache_guard.stats().clone())
527        } else {
528            None
529        }
530    }
531
532    /// Clear the cache if enabled
533    pub async fn clear_cache(&self) -> Result<()> {
534        if let Some(ref cache) = self.cache {
535            let mut cache_guard = cache.lock().await;
536            cache_guard.clear().await?;
537        }
538        Ok(())
539    }
540
541    /// Get the retry configuration
542    pub fn retry_config(&self) -> (u32, u64, u64, f64) {
543        (
544            self.config.max_retries,
545            self.config.base_delay_ms,
546            self.config.max_delay_ms,
547            2.0, // backoff multiplier
548        )
549    }
550
551    /// Check if retries are enabled
552    pub fn retries_enabled(&self) -> bool {
553        self.config.max_retries > 0
554    }
555
556    /// Send an event to the TUI if channel is connected (non-blocking)
557    fn try_send_event(&self, event: SwarmEvent) {
558        // Also emit on the agent bus if connected
559        if let Some(ref bus) = self.bus {
560            let handle = bus.handle("swarm-executor");
561            match &event {
562                SwarmEvent::Started { task, .. } => {
563                    handle.send(
564                        "broadcast",
565                        BusMessage::AgentReady {
566                            agent_id: "swarm-executor".to_string(),
567                            capabilities: vec![format!("executing:{task}")],
568                        },
569                    );
570                }
571                SwarmEvent::Complete { success, .. } => {
572                    let state = if *success {
573                        crate::a2a::types::TaskState::Completed
574                    } else {
575                        crate::a2a::types::TaskState::Failed
576                    };
577                    handle.send_task_update("swarm", state, None);
578                }
579                _ => {} // Other events are TUI-specific
580            }
581        }
582
583        if let Some(ref tx) = self.event_tx {
584            let _ = tx.try_send(event);
585        }
586    }
587
588    /// Execute a task using the swarm
589    pub async fn execute(
590        &self,
591        task: &str,
592        strategy: DecompositionStrategy,
593    ) -> Result<SwarmResult> {
594        let start_time = Instant::now();
595
596        // Create orchestrator
597        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
598
599        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
600
601        // Decompose the task
602        let subtasks = orchestrator.decompose(task, strategy).await?;
603
604        if subtasks.is_empty() {
605            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
606            return Ok(SwarmResult {
607                success: false,
608                result: String::new(),
609                subtask_results: Vec::new(),
610                stats: SwarmStats::default(),
611                artifacts: Vec::new(),
612                error: Some("No subtasks generated".to_string()),
613            });
614        }
615
616        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
617
618        self.try_send_event(SwarmEvent::Started {
619            task: task.to_string(),
620            total_subtasks: subtasks.len(),
621        });
622
623        // Emit decomposition event for TUI
624        self.try_send_event(SwarmEvent::Decomposed {
625            subtasks: subtasks
626                .iter()
627                .map(|s| SubTaskInfo {
628                    id: s.id.clone(),
629                    name: s.name.clone(),
630                    status: SubTaskStatus::Pending,
631                    stage: s.stage,
632                    dependencies: s.dependencies.clone(),
633                    agent_name: s.specialty.clone(),
634                    current_tool: None,
635                    steps: 0,
636                    max_steps: self.config.max_steps_per_subagent,
637                    tool_call_history: Vec::new(),
638                    messages: Vec::new(),
639                    output: None,
640                    error: None,
641                })
642                .collect(),
643        });
644
645        // Execute stages in order
646        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
647        let mut all_results: Vec<SubTaskResult> = Vec::new();
648        let artifacts: Vec<SwarmArtifact> = Vec::new();
649
650        // Initialize telemetry for this swarm execution
651        let swarm_id = uuid::Uuid::new_v4().to_string();
652        let strategy_str = format!("{:?}", strategy);
653        self.telemetry
654            .lock()
655            .await
656            .start_swarm(&swarm_id, subtasks.len(), &strategy_str)
657            .await;
658
659        // Shared state for completed results
660        let completed_results: Arc<RwLock<HashMap<String, String>>> =
661            Arc::new(RwLock::new(HashMap::new()));
662
663        for stage in 0..=max_stage {
664            let stage_start = Instant::now();
665
666            let stage_subtasks: Vec<SubTask> = orchestrator
667                .subtasks_for_stage(stage)
668                .into_iter()
669                .cloned()
670                .collect();
671
672            tracing::debug!(
673                "Stage {} has {} subtasks (max_stage={})",
674                stage,
675                stage_subtasks.len(),
676                max_stage
677            );
678
679            if stage_subtasks.is_empty() {
680                continue;
681            }
682
683            tracing::info!(
684                provider_name = %orchestrator.provider(),
685                "Executing stage {} with {} subtasks",
686                stage,
687                stage_subtasks.len()
688            );
689
690            // Execute all subtasks in this stage in parallel
691            let stage_results = self
692                .execute_stage(
693                    &orchestrator,
694                    stage_subtasks,
695                    completed_results.clone(),
696                    &swarm_id,
697                )
698                .await?;
699
700            // Update completed results for next stage
701            {
702                let mut completed = completed_results.write().await;
703                for result in &stage_results {
704                    completed.insert(result.subtask_id.clone(), result.result.clone());
705                    // Also publish to shared ResultStore for richer querying
706                    let tags = vec![
707                        format!("stage:{stage}"),
708                        format!("subtask:{}", result.subtask_id),
709                    ];
710                    let _ = self
711                        .result_store
712                        .publish(
713                            &result.subtask_id,
714                            &result.subagent_id,
715                            &result.result,
716                            tags,
717                            None,
718                        )
719                        .await;
720                }
721            }
722
723            // Update orchestrator stats
724            let stage_time = stage_start.elapsed().as_millis() as u64;
725            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
726            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
727
728            orchestrator.stats_mut().stages.push(StageStats {
729                stage,
730                subagent_count: stage_results.len(),
731                max_steps,
732                total_steps,
733                execution_time_ms: stage_time,
734            });
735
736            // Mark subtasks as completed
737            for result in &stage_results {
738                orchestrator.complete_subtask(&result.subtask_id, result.clone());
739            }
740
741            // Emit stage complete event
742            let stage_completed = stage_results.iter().filter(|r| r.success).count();
743            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
744            self.try_send_event(SwarmEvent::StageComplete {
745                stage,
746                completed: stage_completed,
747                failed: stage_failed,
748            });
749
750            all_results.extend(stage_results);
751        }
752
753        // Get provider name before mutable borrow
754        let provider_name = orchestrator.provider().to_string();
755
756        // Record overall execution latency
757        self.telemetry
758            .lock()
759            .await
760            .record_swarm_latency("total_execution", start_time.elapsed())
761            .await;
762
763        // Calculate final stats
764        let stats = orchestrator.stats_mut();
765        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
766        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
767        stats.calculate_critical_path();
768        stats.calculate_speedup();
769
770        // Aggregate results
771        let success = all_results.iter().all(|r| r.success);
772
773        // Complete telemetry collection
774        let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success).await;
775        let result = self.aggregate_results(&all_results).await?;
776
777        tracing::info!(
778            provider_name = %provider_name,
779            "Swarm execution complete: {} subtasks, {:.1}x speedup",
780            all_results.len(),
781            stats.speedup_factor
782        );
783
784        let final_stats = orchestrator.stats().clone();
785        self.try_send_event(SwarmEvent::Complete {
786            success,
787            stats: final_stats.clone(),
788        });
789
790        Ok(SwarmResult {
791            success,
792            result,
793            subtask_results: all_results,
794            stats: final_stats,
795            artifacts,
796            error: None,
797        })
798    }
799
800    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
801    async fn execute_stage(
802        &self,
803        orchestrator: &Orchestrator,
804        subtasks: Vec<SubTask>,
805        completed_results: Arc<RwLock<HashMap<String, String>>>,
806        swarm_id: &str,
807    ) -> Result<Vec<SubTaskResult>> {
808        if self.config.execution_mode == ExecutionMode::KubernetesPod {
809            return self
810                .execute_stage_kubernetes(orchestrator, subtasks, completed_results, swarm_id)
811                .await;
812        }
813
814        let mut handles: FuturesUnordered<
815            tokio::task::JoinHandle<(String, Result<SubTaskResult, anyhow::Error>)>,
816        > = FuturesUnordered::new();
817        let mut abort_handles: HashMap<String, AbortHandle> = HashMap::new();
818        let mut task_ids: HashMap<tokio::task::Id, String> = HashMap::new();
819        let mut active_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
820        let mut all_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
821        let mut cached_results: Vec<SubTaskResult> = Vec::new();
822        let mut completed_entries: Vec<(SubTaskResult, Option<WorktreeInfo>)> = Vec::new();
823        let mut kill_reasons: HashMap<String, String> = HashMap::new();
824        let mut promoted_subtask_id: Option<String> = None;
825
826        // Rate limiting: semaphore for max concurrent requests
827        let semaphore = Arc::new(tokio::sync::Semaphore::new(
828            self.config.max_concurrent_requests,
829        ));
830        let delay_ms = self.config.request_delay_ms;
831
832        // Get provider info for tool registry
833        let model = orchestrator.model().to_string();
834        let provider_name = orchestrator.provider().to_string();
835        let providers = orchestrator.providers();
836        let provider = providers
837            .get(&provider_name)
838            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
839
840        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
841
842        // Create base tool registry with provider for ralph and batch tool
843        let base_tool_registry =
844            ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
845        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
846        // Include 'swarm_share' definition so LLMs know about it (registered per-agent below)
847        let mut tool_definitions: Vec<_> = base_tool_registry
848            .definitions()
849            .into_iter()
850            .filter(|t| t.name != "question")
851            .collect();
852
853        // Add swarm_share tool definition so LLMs know it's available
854        let swarm_share_def = crate::provider::ToolDefinition {
855            name: "swarm_share".to_string(),
856            description: "Share results with other sub-agents in the swarm. Actions: publish \
857                          (share a result), get (retrieve a result by key), query_tags (find \
858                          results by tags), query_prefix (find results by key prefix), list \
859                          (show all shared results)."
860                .to_string(),
861            parameters: serde_json::json!({
862                "type": "object",
863                "properties": {
864                    "action": {
865                        "type": "string",
866                        "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
867                        "description": "Action to perform"
868                    },
869                    "key": {
870                        "type": "string",
871                        "description": "Result key (for publish/get)"
872                    },
873                    "value": {
874                        "description": "Result value to publish (any JSON value)"
875                    },
876                    "tags": {
877                        "type": "array",
878                        "items": {"type": "string"},
879                        "description": "Tags for publish or query_tags"
880                    },
881                    "prefix": {
882                        "type": "string",
883                        "description": "Key prefix for query_prefix"
884                    }
885                },
886                "required": ["action"]
887            }),
888        };
889        tool_definitions.push(swarm_share_def);
890
891        // Clone the result store for sub-agent sharing
892        let result_store = Arc::clone(&self.result_store);
893
894        // Create worktree manager if enabled
895        let worktree_manager = if self.config.worktree_enabled {
896            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
897                std::env::current_dir()
898                    .map(|p| p.to_string_lossy().to_string())
899                    .unwrap_or_else(|_| ".".to_string())
900            });
901
902            let mgr = WorktreeManager::new(&working_dir);
903            tracing::info!(
904                working_dir = %working_dir,
905                "Worktree isolation enabled for parallel sub-agents"
906            );
907            Some(Arc::new(mgr) as Arc<WorktreeManager>)
908        } else {
909            None
910        };
911
912        // PRE-PASS 1 (cache): split subtasks into cached results and
913        // pending work. Cached ones short-circuit so we don't even
914        // consider them for worktree creation.
915        let mut pending_subtasks: Vec<SubTask> = Vec::with_capacity(subtasks.len());
916        for subtask in subtasks.into_iter() {
917            if let Some(ref cache) = self.cache {
918                let mut cache_guard = cache.lock().await;
919                if let Some(cached_result) = cache_guard.get(&subtask).await {
920                    tracing::info!(
921                        subtask_id = %subtask.id,
922                        task_name = %subtask.name,
923                        "Cache hit for subtask, skipping execution"
924                    );
925                    self.try_send_event(SwarmEvent::SubTaskUpdate {
926                        id: subtask.id.clone(),
927                        name: subtask.name.clone(),
928                        status: SubTaskStatus::Completed,
929                        agent_name: Some("cached".to_string()),
930                    });
931                    cached_results.push(cached_result);
932                    continue;
933                }
934            }
935            pending_subtasks.push(subtask);
936        }
937
938        // PRE-PASS 2 (worktrees): create all worktrees for non-cached
939        // subtasks concurrently (bounded to avoid git .git/worktrees
940        // lock contention). Drops wall time from O(N·T_create) to
941        // ~O(N·T_create / WORKTREE_PARALLELISM). Failures are swallowed
942        // — the sub-agent falls back to the shared directory.
943        let mut worktrees_by_id: HashMap<String, WorktreeInfo> = HashMap::new();
944        if let Some(ref mgr) = worktree_manager {
945            // Only provision worktrees for subtasks that actually
946            // need isolation. Read-only research/review/fact-check
947            // agents run against the shared working directory —
948            // saves git setup cost and avoids `.git/worktrees`
949            // lock contention at high fan-out.
950            let ids: Vec<String> = pending_subtasks
951                .iter()
952                .filter(|s| s.needs_worktree())
953                .map(|s| s.id.clone())
954                .collect();
955            let skipped = pending_subtasks.len().saturating_sub(ids.len());
956            if skipped > 0 {
957                tracing::info!(
958                    skipped,
959                    total = pending_subtasks.len(),
960                    "Skipping worktree creation for read-only sub-agents"
961                );
962            }
963            worktrees_by_id = precreate_worktrees(Arc::clone(mgr), ids).await;
964            for (sid, wt) in &worktrees_by_id {
965                active_worktrees.insert(sid.clone(), wt.clone());
966                all_worktrees.insert(sid.clone(), wt.clone());
967            }
968        }
969
970        for (idx, subtask) in pending_subtasks.into_iter().enumerate() {
971            let model = model.clone();
972            let _provider_name = provider_name.clone();
973            let provider = Arc::clone(&provider);
974
975            // Get context from dependencies
976            let context = {
977                let completed = completed_results.read().await;
978                let mut dep_context = String::new();
979                for dep_id in &subtask.dependencies {
980                    if let Some(result) = completed.get(dep_id) {
981                        dep_context.push_str(&format!(
982                            "\n--- Result from dependency {} ---\n{}\n",
983                            dep_id, result
984                        ));
985                    }
986                }
987                dep_context
988            };
989
990            let instruction = subtask.instruction.clone();
991            let subtask_name = subtask.name.clone();
992            let specialty = subtask.specialty.clone().unwrap_or_default();
993            let subtask_id = subtask.id.clone();
994            let subtask_id_for_handle = subtask_id.clone();
995            let max_steps = self.config.max_steps_per_subagent;
996            let timeout_secs = self.config.subagent_timeout_secs;
997
998            // Retry configuration
999            let max_retries = self.config.max_retries;
1000            let base_delay_ms = self.config.base_delay_ms;
1001            let max_delay_ms = self.config.max_delay_ms;
1002
1003            // Look up the pre-created worktree. Parallel creation
1004            // happened before this loop (see PRE-PASS 2 above). `None`
1005            // means either worktrees are disabled or creation failed —
1006            // the sub-agent falls back to the shared working directory.
1007            let worktree_info = worktrees_by_id.remove(&subtask_id);
1008
1009            let working_dir = worktree_info
1010                .as_ref()
1011                .map(|wt| wt.path.display().to_string())
1012                .unwrap_or_else(|| ".".to_string());
1013            let working_dir_path = worktree_info.as_ref().map(|wt| wt.path.clone());
1014
1015            // Clone for the async block
1016            let tools = tool_definitions.clone();
1017            let _base_registry = Arc::clone(&base_tool_registry);
1018            let agent_result_store = Arc::clone(&result_store);
1019            let sem = Arc::clone(&semaphore);
1020            let stagger_delay = delay_ms * idx as u64; // Stagger start times
1021            let event_tx = self.event_tx.clone();
1022
1023            // Generate sub-agent execution ID
1024            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
1025
1026            // Log telemetry for this sub-agent
1027            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
1028
1029            // Spawn the subtask execution with agentic tool loop
1030            let handle = tokio::spawn(async move {
1031                // Rate limiting: stagger start and acquire semaphore
1032                if stagger_delay > 0 {
1033                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
1034                }
1035                let _permit = match sem.acquire().await {
1036                    Ok(permit) => permit,
1037                    Err(_) => {
1038                        return (
1039                            subtask_id.clone(),
1040                            Err(anyhow::anyhow!("Swarm execution cancelled")),
1041                        );
1042                    }
1043                };
1044
1045                let _agent_start = Instant::now();
1046
1047                let start = Instant::now();
1048
1049                // Load AGENTS.md from working directory
1050                let working_path = std::path::Path::new(&working_dir);
1051                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
1052                    .map(|(content, _)| {
1053                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
1054                    })
1055                    .unwrap_or_default();
1056
1057                // Build the system prompt for this sub-agent
1058                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
1059                let system_prompt = format!(
1060                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
1061
1062WORKING DIRECTORY: {}
1063All file operations should be relative to this directory.
1064
1065IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
1066
1067Available tools:
1068- read: Read file contents
1069- write: Write/create files
1070- edit: Edit existing files (search and replace)
1071- multiedit: Make multiple edits at once
1072- glob: Find files by pattern
1073- grep: Search file contents
1074- bash: Run shell commands (use cwd: \"{}\" parameter)
1075- webfetch: Fetch web pages
1076- prd: Generate structured PRD for complex tasks
1077- ralph: Run autonomous agent loop on a PRD
1078- swarm_share: Share results with other sub-agents running in parallel
1079- agent: Spawn specialized helper agents when needed (smart delegation)
1080
1081SMART SPAWN POLICY (mandatory):
1082- Any spawned agent MUST use a different model than your current model ('{}')
1083- Spawned model MUST be free/subscription-eligible (e.g. '*:free', openai-codex/*, github-copilot/*, gemini-web/*, local_cuda/*)
1084- Include `model` when calling agent.spawn
1085
1086SHARING RESULTS:
1087Use swarm_share to collaborate with other sub-agents:
1088- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
1089- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
1090- swarm_share({{action: 'list'}}) to see all shared results
1091- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
1092
1093COMPLEX TASKS:
1094If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
10951. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
10962. Break down into user stories with acceptance criteria
10973. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
10984. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
1099
1100NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
1101
1102When done, provide a brief summary of what you accomplished.{agents_md_content}",
1103                    specialty,
1104                    subtask_id,
1105                    working_dir,
1106                    working_dir,
1107                    model,
1108                    prd_filename,
1109                    prd_filename,
1110                    prd_filename
1111                );
1112
1113                let user_prompt = if context.is_empty() {
1114                    format!("Complete this task:\n\n{}", instruction)
1115                } else {
1116                    format!(
1117                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
1118                        instruction, context
1119                    )
1120                };
1121
1122                // Emit AgentStarted event
1123                if let Some(ref tx) = event_tx {
1124                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1125                        id: subtask_id.clone(),
1126                        name: subtask_name.clone(),
1127                        status: SubTaskStatus::Running,
1128                        agent_name: Some(format!("agent-{}", subtask_id)),
1129                    });
1130                    let _ = tx.try_send(SwarmEvent::AgentStarted {
1131                        subtask_id: subtask_id.clone(),
1132                        agent_name: format!("agent-{}", subtask_id),
1133                        specialty: specialty.clone(),
1134                    });
1135                }
1136
1137                // Run the agentic loop
1138                // Create per-agent registry with SwarmShareTool for this subtask
1139                let mut agent_registry =
1140                    ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1141                agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1142                    Arc::clone(&agent_result_store),
1143                    subtask_id.clone(),
1144                )));
1145                let registry = Arc::new(agent_registry);
1146
1147                // Execute with exponential backoff retry
1148                let mut attempt = 0u32;
1149                let mut result: Result<(String, usize, usize, AgentLoopExit), anyhow::Error> =
1150                    Err(anyhow::anyhow!("Not executed"));
1151
1152                while attempt <= max_retries {
1153                    let _attempt_start = Instant::now();
1154
1155                    // Run the agent loop
1156                    result = run_agent_loop(
1157                        Arc::clone(&provider),
1158                        &model,
1159                        &system_prompt,
1160                        &user_prompt,
1161                        tools.clone(),
1162                        Arc::clone(&registry),
1163                        max_steps,
1164                        timeout_secs,
1165                        event_tx.clone(),
1166                        subtask_id.clone(),
1167                        None,
1168                        working_dir_path.clone(),
1169                    )
1170                    .await;
1171
1172                    // Check if the attempt succeeded
1173                    match &result {
1174                        Ok((_, _, _, exit_reason)) => {
1175                            if *exit_reason == AgentLoopExit::Completed {
1176                                // Success - no need to retry
1177                                tracing::info!(
1178                                    subtask_id = %subtask_id,
1179                                    attempt = attempt + 1,
1180                                    "Sub-agent completed successfully on first attempt"
1181                                );
1182                                break;
1183                            }
1184                            // Failed but not an error - check if we should retry
1185                            let should_retry = attempt < max_retries;
1186                            if should_retry {
1187                                let delay = calculate_backoff_delay(
1188                                    attempt,
1189                                    base_delay_ms,
1190                                    max_delay_ms,
1191                                    2.0,
1192                                );
1193                                tracing::warn!(
1194                                    subtask_id = %subtask_id,
1195                                    attempt = attempt + 1,
1196                                    max_retries = max_retries,
1197                                    exit_reason = ?exit_reason,
1198                                    delay_ms = delay.as_millis(),
1199                                    "Sub-agent did not complete, retrying with backoff"
1200                                );
1201                                tokio::time::sleep(delay).await;
1202                            } else {
1203                                tracing::warn!(
1204                                    subtask_id = %subtask_id,
1205                                    attempt = attempt + 1,
1206                                    max_retries = max_retries,
1207                                    exit_reason = ?exit_reason,
1208                                    "Sub-agent did not complete, retries exhausted"
1209                                );
1210                            }
1211                        }
1212                        Err(e) => {
1213                            // Error occurred - retry with backoff if we have retries left
1214                            let should_retry = attempt < max_retries;
1215                            if should_retry {
1216                                let delay = calculate_backoff_delay(
1217                                    attempt,
1218                                    base_delay_ms,
1219                                    max_delay_ms,
1220                                    2.0,
1221                                );
1222                                tracing::warn!(
1223                                    subtask_id = %subtask_id,
1224                                    attempt = attempt + 1,
1225                                    max_retries = max_retries,
1226                                    error = %e,
1227                                    delay_ms = delay.as_millis(),
1228                                    "Sub-agent error, retrying with backoff"
1229                                );
1230                                tokio::time::sleep(delay).await;
1231                            } else {
1232                                tracing::error!(
1233                                    subtask_id = %subtask_id,
1234                                    attempt = attempt + 1,
1235                                    max_retries = max_retries,
1236                                    error = %e,
1237                                    "Sub-agent error, retries exhausted"
1238                                );
1239                            }
1240                        }
1241                    }
1242
1243                    attempt += 1;
1244                }
1245
1246                let result = result;
1247
1248                let task_result = match result {
1249                    Ok((output, steps, tool_calls, exit_reason)) => {
1250                        let (success, status, error) = match exit_reason {
1251                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1252                            AgentLoopExit::MaxStepsReached => (
1253                                false,
1254                                SubTaskStatus::Failed,
1255                                Some(format!("Sub-agent hit max steps ({max_steps})")),
1256                            ),
1257                            AgentLoopExit::TimedOut => (
1258                                false,
1259                                SubTaskStatus::TimedOut,
1260                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
1261                            ),
1262                        };
1263
1264                        // Calculate actual retry info - attempt is 0-indexed, so attempt=0 means 1 attempt, attempt=1 means 2 attempts, etc.
1265                        let total_attempts = attempt + 1;
1266                        let actual_retry_attempts = if total_attempts > 1 {
1267                            total_attempts - 1
1268                        } else {
1269                            0
1270                        };
1271
1272                        // Emit completion events
1273                        if let Some(ref tx) = event_tx {
1274                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1275                                id: subtask_id.clone(),
1276                                name: subtask_name.clone(),
1277                                status,
1278                                agent_name: Some(format!("agent-{}", subtask_id)),
1279                            });
1280                            if let Some(ref message) = error {
1281                                let _ = tx.try_send(SwarmEvent::AgentError {
1282                                    subtask_id: subtask_id.clone(),
1283                                    error: message.clone(),
1284                                });
1285                            }
1286                            let _ = tx.try_send(SwarmEvent::AgentOutput {
1287                                subtask_id: subtask_id.clone(),
1288                                output: output.clone(),
1289                            });
1290                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1291                                subtask_id: subtask_id.clone(),
1292                                success,
1293                                steps,
1294                            });
1295                        }
1296                        Ok(SubTaskResult {
1297                            subtask_id: subtask_id.clone(),
1298                            subagent_id: format!("agent-{}", subtask_id),
1299                            success,
1300                            result: output,
1301                            steps,
1302                            tool_calls,
1303                            execution_time_ms: start.elapsed().as_millis() as u64,
1304                            error,
1305                            artifacts: Vec::new(),
1306                            retry_count: actual_retry_attempts,
1307                        })
1308                    }
1309                    Err(e) => {
1310                        // Calculate actual retry info - attempt is 0-indexed
1311                        let total_attempts = attempt + 1;
1312                        let actual_retry_attempts = if total_attempts > 1 {
1313                            total_attempts - 1
1314                        } else {
1315                            0
1316                        };
1317
1318                        // Emit error events
1319                        if let Some(ref tx) = event_tx {
1320                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1321                                id: subtask_id.clone(),
1322                                name: subtask_name.clone(),
1323                                status: SubTaskStatus::Failed,
1324                                agent_name: Some(format!("agent-{}", subtask_id)),
1325                            });
1326                            let _ = tx.try_send(SwarmEvent::AgentError {
1327                                subtask_id: subtask_id.clone(),
1328                                error: e.to_string(),
1329                            });
1330                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1331                                subtask_id: subtask_id.clone(),
1332                                success: false,
1333                                steps: 0,
1334                            });
1335                        }
1336                        Ok(SubTaskResult {
1337                            subtask_id: subtask_id.clone(),
1338                            subagent_id: format!("agent-{}", subtask_id),
1339                            success: false,
1340                            result: String::new(),
1341                            steps: 0,
1342                            tool_calls: 0,
1343                            execution_time_ms: start.elapsed().as_millis() as u64,
1344                            error: Some(e.to_string()),
1345                            artifacts: Vec::new(),
1346                            retry_count: actual_retry_attempts,
1347                        })
1348                    }
1349                };
1350
1351                (subtask_id.clone(), task_result)
1352            });
1353
1354            let abort_handle = handle.abort_handle();
1355            abort_handles.insert(subtask_id_for_handle.clone(), abort_handle);
1356            task_ids.insert(handle.id(), subtask_id_for_handle.clone());
1357            handles.push(handle);
1358        }
1359
1360        // Deterministic collapse loop while branches are still running.
1361        let mut collapse_controller = if worktree_manager.is_some() && active_worktrees.len() > 1 {
1362            Some(CollapseController::new(CollapsePolicy::default()))
1363        } else {
1364            None
1365        };
1366        let mut collapse_tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1367        collapse_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1368        // Consume the immediate first tick so sampling starts after the first interval.
1369        let _ = collapse_tick.tick().await;
1370
1371        while !handles.is_empty() {
1372            tokio::select! {
1373                maybe_join = handles.next() => {
1374                    let Some(joined) = maybe_join else {
1375                        continue;
1376                    };
1377                    match joined {
1378                        Ok((subtask_id, Ok(result))) => {
1379                            abort_handles.remove(&subtask_id);
1380                            let wt = active_worktrees.remove(&subtask_id).or_else(|| all_worktrees.get(&subtask_id).cloned());
1381                            completed_entries.push((result, wt));
1382                        }
1383                        Ok((subtask_id, Err(e))) => {
1384                            abort_handles.remove(&subtask_id);
1385                            active_worktrees.remove(&subtask_id);
1386                            let wt = all_worktrees.get(&subtask_id).cloned();
1387                            completed_entries.push((
1388                                SubTaskResult {
1389                                    subtask_id: subtask_id.clone(),
1390                                    subagent_id: format!("agent-{subtask_id}"),
1391                                    success: false,
1392                                    result: String::new(),
1393                                    steps: 0,
1394                                    tool_calls: 0,
1395                                    execution_time_ms: 0,
1396                                    error: Some(e.to_string()),
1397                                    artifacts: Vec::new(),
1398                                    retry_count: 0,
1399                                },
1400                                wt,
1401                            ));
1402                        }
1403                        Err(e) => {
1404                            let subtask_id = task_ids
1405                                .remove(&e.id())
1406                                .unwrap_or_else(|| "unknown".to_string());
1407                            abort_handles.remove(&subtask_id);
1408                            active_worktrees.remove(&subtask_id);
1409                            let wt = all_worktrees.get(&subtask_id).cloned();
1410                            completed_entries.push((
1411                                SubTaskResult {
1412                                    subtask_id: subtask_id.clone(),
1413                                    subagent_id: format!("agent-{subtask_id}"),
1414                                    success: false,
1415                                    result: String::new(),
1416                                    steps: 0,
1417                                    tool_calls: 0,
1418                                    execution_time_ms: 0,
1419                                    error: Some(format!("Task join error: {e}")),
1420                                    artifacts: Vec::new(),
1421                                    retry_count: 0,
1422                                },
1423                                wt,
1424                            ));
1425                        }
1426                    }
1427                }
1428                _ = collapse_tick.tick(), if collapse_controller.is_some() && !active_worktrees.is_empty() => {
1429                    let branches: Vec<BranchRuntimeState> = active_worktrees
1430                        .iter()
1431                        .map(|(subtask_id, wt)| BranchRuntimeState {
1432                            subtask_id: subtask_id.clone(),
1433                            branch: wt.branch.clone(),
1434                            worktree_path: wt.path.clone(),
1435                        })
1436                        .collect();
1437
1438                    if let Some(controller) = collapse_controller.as_mut() {
1439                        match controller.sample(&branches) {
1440                            Ok(tick) => {
1441                                if promoted_subtask_id != tick.promoted_subtask_id {
1442                                    promoted_subtask_id = tick.promoted_subtask_id.clone();
1443                                    if let Some(ref promoted) = promoted_subtask_id {
1444                                        tracing::info!(
1445                                            subtask_id = %promoted,
1446                                            "Collapse controller promoted branch"
1447                                        );
1448                                        if let Some(audit) = crate::audit::try_audit_log() {
1449                                            audit.log_with_correlation(
1450                                                crate::audit::AuditCategory::Swarm,
1451                                                "collapse_promote_branch",
1452                                                crate::audit::AuditOutcome::Success,
1453                                                Some("collapse-controller".to_string()),
1454                                                Some(serde_json::json!({
1455                                                    "swarm_id": swarm_id,
1456                                                    "subtask_id": promoted,
1457                                                })),
1458                                                None,
1459                                                None,
1460                                                Some(swarm_id.to_string()),
1461                                                None,
1462                                            ).await;
1463                                        }
1464                                    }
1465                                }
1466
1467                                for kill in tick.kills {
1468                                    if kill_reasons.contains_key(&kill.subtask_id) {
1469                                        continue;
1470                                    }
1471                                    let Some(abort_handle) = abort_handles.get(&kill.subtask_id) else {
1472                                        continue;
1473                                    };
1474
1475                                    abort_handle.abort();
1476                                    abort_handles.remove(&kill.subtask_id);
1477                                    active_worktrees.remove(&kill.subtask_id);
1478                                    kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1479
1480                                    tracing::warn!(
1481                                        subtask_id = %kill.subtask_id,
1482                                        branch = %kill.branch,
1483                                        reason = %kill.reason,
1484                                        "Collapse controller killed branch"
1485                                    );
1486
1487                                    if let Some(ref tx) = self.event_tx {
1488                                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1489                                            id: kill.subtask_id.clone(),
1490                                            name: kill.subtask_id.clone(),
1491                                            status: SubTaskStatus::Cancelled,
1492                                            agent_name: Some(format!("agent-{}", kill.subtask_id)),
1493                                        });
1494                                        let _ = tx.try_send(SwarmEvent::AgentError {
1495                                            subtask_id: kill.subtask_id.clone(),
1496                                            error: format!("Cancelled by collapse controller: {}", kill.reason),
1497                                        });
1498                                    }
1499
1500                                    if let Some(audit) = crate::audit::try_audit_log() {
1501                                        audit.log_with_correlation(
1502                                            crate::audit::AuditCategory::Swarm,
1503                                            "collapse_kill_branch",
1504                                            crate::audit::AuditOutcome::Success,
1505                                            Some("collapse-controller".to_string()),
1506                                            Some(serde_json::json!({
1507                                                "swarm_id": swarm_id,
1508                                                "subtask_id": kill.subtask_id,
1509                                                "branch": kill.branch,
1510                                                "reason": kill.reason,
1511                                            })),
1512                                            None,
1513                                            None,
1514                                            Some(swarm_id.to_string()),
1515                                            None,
1516                                        ).await;
1517                                    }
1518                                }
1519                            }
1520                            Err(e) => {
1521                                tracing::warn!(error = %e, "Collapse controller sampling failed");
1522                            }
1523                        }
1524                    }
1525                }
1526            }
1527        }
1528
1529        // Merge in deterministic order: promoted branch first when present.
1530        if let Some(ref promoted) = promoted_subtask_id {
1531            completed_entries.sort_by_key(|(result, _)| {
1532                if &result.subtask_id == promoted {
1533                    0usize
1534                } else {
1535                    1usize
1536                }
1537            });
1538        }
1539
1540        let mut results = cached_results;
1541        let auto_merge = self.config.worktree_auto_merge;
1542
1543        for (mut result, worktree_info) in completed_entries {
1544            // Handle worktree merge/cleanup if applicable
1545            if let Some(wt) = worktree_info {
1546                if let Some(reason) = kill_reasons.get(&result.subtask_id) {
1547                    result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1548                    result.result.push_str(&format!(
1549                        "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1550                    ));
1551                    if let Some(ref mgr) = worktree_manager
1552                        && let Err(e) = mgr.cleanup(&wt.name).await
1553                    {
1554                        tracing::warn!(error = %e, "Failed to cleanup killed worktree");
1555                    }
1556                } else if result.success && auto_merge {
1557                    if let Some(ref mgr) = worktree_manager {
1558                        match mgr.merge(&wt.name).await {
1559                            Ok(merge_result) => {
1560                                if merge_result.success {
1561                                    tracing::info!(
1562                                        subtask_id = %result.subtask_id,
1563                                        files_changed = merge_result.files_changed,
1564                                        "Merged worktree changes successfully"
1565                                    );
1566                                    result.result.push_str(&format!(
1567                                        "\n\n--- Merge Result ---\n{}",
1568                                        merge_result.summary
1569                                    ));
1570                                } else if merge_result.aborted {
1571                                    tracing::warn!(
1572                                        subtask_id = %result.subtask_id,
1573                                        summary = %merge_result.summary,
1574                                        "Merge was aborted"
1575                                    );
1576                                    result.result.push_str(&format!(
1577                                        "\n\n--- Merge Aborted ---\n{}",
1578                                        merge_result.summary
1579                                    ));
1580                                } else {
1581                                    tracing::warn!(
1582                                        subtask_id = %result.subtask_id,
1583                                        conflicts = ?merge_result.conflicts,
1584                                        "Merge had conflicts"
1585                                    );
1586                                    result.result.push_str(&format!(
1587                                        "\n\n--- Merge Conflicts ---\n{}",
1588                                        merge_result.summary
1589                                    ));
1590                                }
1591                                if let Err(e) = mgr.cleanup(&wt.name).await {
1592                                    tracing::warn!(error = %e, "Failed to cleanup worktree");
1593                                }
1594                            }
1595                            Err(e) => {
1596                                tracing::error!(
1597                                    subtask_id = %result.subtask_id,
1598                                    error = %e,
1599                                    "Failed to merge worktree"
1600                                );
1601                            }
1602                        }
1603                    }
1604                } else if !result.success {
1605                    tracing::info!(
1606                        subtask_id = %result.subtask_id,
1607                        worktree_path = %wt.path.display(),
1608                        "Keeping worktree for debugging (task failed)"
1609                    );
1610                }
1611            }
1612
1613            // Cache successful result
1614            if result.success
1615                && let Some(ref cache_arc) = self.cache
1616            {
1617                let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1618                    cache_arc.lock().await;
1619                let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1620                if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1621                    tracing::warn!(
1622                        subtask_id = %result.subtask_id,
1623                        error = %e,
1624                        "Failed to cache subtask result"
1625                    );
1626                }
1627            }
1628
1629            results.push(result);
1630        }
1631
1632        Ok(results)
1633    }
1634
1635    async fn execute_stage_kubernetes(
1636        &self,
1637        orchestrator: &Orchestrator,
1638        subtasks: Vec<SubTask>,
1639        completed_results: Arc<RwLock<HashMap<String, String>>>,
1640        swarm_id: &str,
1641    ) -> Result<Vec<SubTaskResult>> {
1642        let k8s = K8sManager::new().await;
1643        if !k8s.is_available() {
1644            anyhow::bail!(
1645                "Kubernetes execution mode requested but K8s client is unavailable in this environment"
1646            );
1647        }
1648
1649        let provider_name = orchestrator.provider().to_string();
1650        let model = orchestrator.model().to_string();
1651        let pod_budget = self.config.k8s_pod_budget.max(1);
1652        let mut pending: VecDeque<SubTask> = subtasks.into_iter().collect();
1653        let mut active: HashMap<String, ActiveK8sBranch> = HashMap::new();
1654        let mut subtask_names: HashMap<String, String> = HashMap::new();
1655        let mut results: Vec<SubTaskResult> = Vec::new();
1656        let mut kill_reasons: HashMap<String, String> = HashMap::new();
1657        let mut promoted_subtask_id: Option<String> = None;
1658        let mut collapse_controller = CollapseController::new(CollapsePolicy::default());
1659
1660        let mut tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1661        tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1662        let _ = tick.tick().await;
1663
1664        loop {
1665            while active.len() < pod_budget {
1666                let Some(subtask) = pending.pop_front() else {
1667                    break;
1668                };
1669
1670                if let Some(ref cache) = self.cache {
1671                    let mut cache_guard = cache.lock().await;
1672                    if let Some(cached_result) = cache_guard.get(&subtask).await {
1673                        tracing::info!(
1674                            subtask_id = %subtask.id,
1675                            task_name = %subtask.name,
1676                            "Cache hit for subtask, skipping Kubernetes execution"
1677                        );
1678                        self.try_send_event(SwarmEvent::SubTaskUpdate {
1679                            id: subtask.id.clone(),
1680                            name: subtask.name.clone(),
1681                            status: SubTaskStatus::Completed,
1682                            agent_name: Some("cached".to_string()),
1683                        });
1684                        results.push(cached_result);
1685                        continue;
1686                    }
1687                }
1688
1689                let context = {
1690                    let completed = completed_results.read().await;
1691                    let mut dep_context = String::new();
1692                    for dep_id in &subtask.dependencies {
1693                        if let Some(result) = completed.get(dep_id) {
1694                            dep_context.push_str(&format!(
1695                                "\n--- Result from dependency {} ---\n{}\n",
1696                                dep_id, result
1697                            ));
1698                        }
1699                    }
1700                    dep_context
1701                };
1702
1703                let payload = RemoteSubtaskPayload {
1704                    swarm_id: swarm_id.to_string(),
1705                    subtask_id: subtask.id.clone(),
1706                    subtask_name: subtask.name.clone(),
1707                    specialty: subtask.specialty.clone().unwrap_or_default(),
1708                    instruction: subtask.instruction.clone(),
1709                    context: context.clone(),
1710                    provider: provider_name.clone(),
1711                    model: model.clone(),
1712                    max_steps: self.config.max_steps_per_subagent,
1713                    timeout_secs: self.config.subagent_timeout_secs,
1714                    working_dir: self.config.working_dir.clone(),
1715                    probe_interval_secs: COLLAPSE_SAMPLE_SECS,
1716                };
1717                let payload_b64 = match encode_payload(&payload) {
1718                    Ok(payload) => payload,
1719                    Err(error) => {
1720                        let error_text = format!("Failed to encode remote payload: {error}");
1721                        tracing::error!(subtask_id = %subtask.id, error = %error, "K8s payload encoding failed");
1722                        self.try_send_event(SwarmEvent::SubTaskUpdate {
1723                            id: subtask.id.clone(),
1724                            name: subtask.name.clone(),
1725                            status: SubTaskStatus::Failed,
1726                            agent_name: Some("k8s-encoder".to_string()),
1727                        });
1728                        self.try_send_event(SwarmEvent::AgentError {
1729                            subtask_id: subtask.id.clone(),
1730                            error: error_text.clone(),
1731                        });
1732                        results.push(SubTaskResult {
1733                            subtask_id: subtask.id.clone(),
1734                            subagent_id: format!("agent-{}", subtask.id),
1735                            success: false,
1736                            result: String::new(),
1737                            steps: 0,
1738                            tool_calls: 0,
1739                            execution_time_ms: 0,
1740                            error: Some(error_text),
1741                            artifacts: Vec::new(),
1742                            retry_count: 0,
1743                        });
1744                        continue;
1745                    }
1746                };
1747
1748                let mut env_vars = HashMap::new();
1749                env_vars.insert(SWARM_SUBTASK_PAYLOAD_ENV.to_string(), payload_b64);
1750                for key in K8S_PASSTHROUGH_ENV_VARS {
1751                    if let Ok(value) = std::env::var(key)
1752                        && !value.trim().is_empty()
1753                    {
1754                        env_vars.insert((*key).to_string(), value);
1755                    }
1756                }
1757                let fallback_prompt = if context.trim().is_empty() {
1758                    format!(
1759                        "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1760Return only the final subtask answer.",
1761                        subtask.id, subtask.instruction
1762                    )
1763                } else {
1764                    format!(
1765                        "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1766Dependency context:\n{}\n\nReturn only the final subtask answer.",
1767                        subtask.id, subtask.instruction, context
1768                    )
1769                };
1770                env_vars.insert(SWARM_FALLBACK_PROMPT_ENV.to_string(), fallback_prompt);
1771                env_vars.insert(SWARM_FALLBACK_MODEL_ENV.to_string(), model.clone());
1772
1773                let mut labels = HashMap::new();
1774                labels.insert("codetether.run/swarm-id".to_string(), swarm_id.to_string());
1775                labels.insert(
1776                    "codetether.run/stage".to_string(),
1777                    subtask.stage.to_string(),
1778                );
1779
1780                let spec = SubagentPodSpec {
1781                    image: self.config.k8s_subagent_image.clone(),
1782                    env_vars,
1783                    labels,
1784                    command: Some(vec!["sh".to_string(), "-lc".to_string()]),
1785                    args: Some(vec![
1786                        format!(
1787                            "if codetether help swarm-subagent >/dev/null 2>&1; then \
1788exec codetether swarm-subagent --payload-env {payload_env}; \
1789else \
1790exec codetether run \"$${fallback_prompt_env}\" --model \"$${fallback_model_env}\"; \
1791fi",
1792                            payload_env = SWARM_SUBTASK_PAYLOAD_ENV,
1793                            fallback_prompt_env = SWARM_FALLBACK_PROMPT_ENV,
1794                            fallback_model_env = SWARM_FALLBACK_MODEL_ENV,
1795                        )
1796                        .replace("$$", "$"),
1797                    ]),
1798                };
1799
1800                if let Err(error) = k8s.spawn_subagent_pod_with_spec(&subtask.id, spec).await {
1801                    let error_text = format!("Failed to spawn Kubernetes pod: {error}");
1802                    tracing::error!(subtask_id = %subtask.id, error = %error, "K8s sub-agent pod spawn failed");
1803                    self.try_send_event(SwarmEvent::SubTaskUpdate {
1804                        id: subtask.id.clone(),
1805                        name: subtask.name.clone(),
1806                        status: SubTaskStatus::Failed,
1807                        agent_name: Some("k8s-spawn".to_string()),
1808                    });
1809                    self.try_send_event(SwarmEvent::AgentError {
1810                        subtask_id: subtask.id.clone(),
1811                        error: error_text.clone(),
1812                    });
1813                    results.push(SubTaskResult {
1814                        subtask_id: subtask.id.clone(),
1815                        subagent_id: format!("agent-{}", subtask.id),
1816                        success: false,
1817                        result: String::new(),
1818                        steps: 0,
1819                        tool_calls: 0,
1820                        execution_time_ms: 0,
1821                        error: Some(error_text),
1822                        artifacts: Vec::new(),
1823                        retry_count: 0,
1824                    });
1825                    continue;
1826                }
1827
1828                let branch = K8sManager::subagent_pod_name(&subtask.id);
1829                subtask_names.insert(subtask.id.clone(), subtask.name.clone());
1830                active.insert(
1831                    subtask.id.clone(),
1832                    ActiveK8sBranch {
1833                        branch: branch.clone(),
1834                        started_at: Instant::now(),
1835                    },
1836                );
1837
1838                self.try_send_event(SwarmEvent::SubTaskUpdate {
1839                    id: subtask.id.clone(),
1840                    name: subtask.name.clone(),
1841                    status: SubTaskStatus::Running,
1842                    agent_name: Some(format!("k8s-{branch}")),
1843                });
1844                self.try_send_event(SwarmEvent::AgentStarted {
1845                    subtask_id: subtask.id.clone(),
1846                    agent_name: format!("k8s-{branch}"),
1847                    specialty: subtask
1848                        .specialty
1849                        .clone()
1850                        .unwrap_or_else(|| "generalist".to_string()),
1851                });
1852
1853                tracing::info!(
1854                    subtask_id = %subtask.id,
1855                    pod = %branch,
1856                    "Spawned Kubernetes sub-agent pod"
1857                );
1858            }
1859
1860            if pending.is_empty() && active.is_empty() {
1861                break;
1862            }
1863
1864            tick.tick().await;
1865
1866            let active_ids: Vec<String> = active.keys().cloned().collect();
1867            let mut finished_results: Vec<SubTaskResult> = Vec::new();
1868            for subtask_id in active_ids {
1869                let Some(active_state) = active.get(&subtask_id).cloned() else {
1870                    continue;
1871                };
1872
1873                if active_state.started_at.elapsed()
1874                    > Duration::from_secs(self.config.subagent_timeout_secs)
1875                {
1876                    let reason = format!(
1877                        "Timed out after {}s in Kubernetes pod",
1878                        self.config.subagent_timeout_secs
1879                    );
1880                    kill_reasons.insert(subtask_id.clone(), reason.clone());
1881                    if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1882                        tracing::warn!(
1883                            subtask_id = %subtask_id,
1884                            error = %error,
1885                            "Failed deleting timed-out Kubernetes pod"
1886                        );
1887                    }
1888                    active.remove(&subtask_id);
1889                    finished_results.push(SubTaskResult {
1890                        subtask_id: subtask_id.clone(),
1891                        subagent_id: format!("agent-{subtask_id}"),
1892                        success: false,
1893                        result: String::new(),
1894                        steps: 0,
1895                        tool_calls: 0,
1896                        execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1897                        error: Some(reason),
1898                        artifacts: Vec::new(),
1899                        retry_count: 0,
1900                    });
1901                    continue;
1902                }
1903
1904                let pod_state = match k8s.get_subagent_pod_state(&subtask_id).await {
1905                    Ok(state) => state,
1906                    Err(error) => {
1907                        tracing::warn!(
1908                            subtask_id = %subtask_id,
1909                            error = %error,
1910                            "Failed to query Kubernetes pod state for sub-agent"
1911                        );
1912                        continue;
1913                    }
1914                };
1915                let Some(pod_state) = pod_state else {
1916                    active.remove(&subtask_id);
1917                    finished_results.push(SubTaskResult {
1918                        subtask_id: subtask_id.clone(),
1919                        subagent_id: format!("agent-{subtask_id}"),
1920                        success: false,
1921                        result: String::new(),
1922                        steps: 0,
1923                        tool_calls: 0,
1924                        execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1925                        error: Some("Sub-agent pod disappeared".to_string()),
1926                        artifacts: Vec::new(),
1927                        retry_count: 0,
1928                    });
1929                    continue;
1930                };
1931
1932                let phase = pod_state.phase.to_ascii_lowercase();
1933                let finished = pod_state.terminated || phase == "succeeded" || phase == "failed";
1934                if !finished {
1935                    continue;
1936                }
1937
1938                let logs = k8s
1939                    .subagent_logs(&subtask_id, 10_000)
1940                    .await
1941                    .unwrap_or_default();
1942                let mut result = result_from_logs(&logs).unwrap_or_else(|| SubTaskResult {
1943                    subtask_id: subtask_id.clone(),
1944                    subagent_id: format!("agent-{subtask_id}"),
1945                    success: pod_state.exit_code.unwrap_or(1) == 0,
1946                    result: logs,
1947                    steps: 0,
1948                    tool_calls: 0,
1949                    execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1950                    error: if pod_state.exit_code.unwrap_or(1) == 0 {
1951                        None
1952                    } else {
1953                        Some(
1954                            pod_state
1955                                .reason
1956                                .clone()
1957                                .unwrap_or_else(|| "Remote sub-agent failed".to_string()),
1958                        )
1959                    },
1960                    artifacts: Vec::new(),
1961                    retry_count: 0,
1962                });
1963
1964                if let Some(reason) = kill_reasons.get(&subtask_id) {
1965                    result.success = false;
1966                    result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1967                    result.result.push_str(&format!(
1968                        "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1969                    ));
1970                }
1971
1972                active.remove(&subtask_id);
1973                if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1974                    tracing::warn!(
1975                        subtask_id = %subtask_id,
1976                        error = %error,
1977                        "Failed deleting completed Kubernetes pod"
1978                    );
1979                }
1980                finished_results.push(result);
1981            }
1982
1983            for result in finished_results {
1984                if result.success {
1985                    completed_results
1986                        .write()
1987                        .await
1988                        .insert(result.subtask_id.clone(), result.result.clone());
1989                }
1990                if result.success
1991                    && let Some(ref cache_arc) = self.cache
1992                {
1993                    let mut cache_guard = cache_arc.lock().await;
1994                    let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1995                    let _ = cache_guard.put(&cache_subtask, &result).await;
1996                }
1997
1998                self.try_send_event(SwarmEvent::SubTaskUpdate {
1999                    id: result.subtask_id.clone(),
2000                    name: subtask_names
2001                        .get(&result.subtask_id)
2002                        .cloned()
2003                        .unwrap_or_else(|| result.subtask_id.clone()),
2004                    status: if result.success {
2005                        SubTaskStatus::Completed
2006                    } else {
2007                        SubTaskStatus::Failed
2008                    },
2009                    agent_name: Some(format!("k8s-{}", result.subtask_id)),
2010                });
2011                if let Some(ref error) = result.error {
2012                    self.try_send_event(SwarmEvent::AgentError {
2013                        subtask_id: result.subtask_id.clone(),
2014                        error: error.clone(),
2015                    });
2016                }
2017                self.try_send_event(SwarmEvent::AgentOutput {
2018                    subtask_id: result.subtask_id.clone(),
2019                    output: result.result.clone(),
2020                });
2021                self.try_send_event(SwarmEvent::AgentComplete {
2022                    subtask_id: result.subtask_id.clone(),
2023                    success: result.success,
2024                    steps: result.steps,
2025                });
2026                results.push(result);
2027            }
2028
2029            if active.len() > 1 {
2030                let mut observations = Vec::with_capacity(active.len());
2031                for (subtask_id, state) in &active {
2032                    let pod_state = match k8s.get_subagent_pod_state(subtask_id).await {
2033                        Ok(state) => state,
2034                        Err(error) => {
2035                            tracing::warn!(
2036                                subtask_id = %subtask_id,
2037                                error = %error,
2038                                "Failed to query pod state while sampling branch observation"
2039                            );
2040                            None
2041                        }
2042                    };
2043                    let (resource_health_score, infra_unhealthy_signals) =
2044                        compute_resource_health(pod_state.as_ref());
2045
2046                    let logs = k8s.subagent_logs(subtask_id, 500).await.unwrap_or_default();
2047                    if let Some(probe) = latest_probe_from_logs(&logs) {
2048                        let compile_ok = pod_state
2049                            .as_ref()
2050                            .map(|p| probe.compile_ok && !p.phase.eq_ignore_ascii_case("failed"))
2051                            .unwrap_or(probe.compile_ok);
2052                        observations.push(BranchObservation {
2053                            subtask_id: subtask_id.clone(),
2054                            branch: state.branch.clone(),
2055                            compile_ok,
2056                            changed_files: probe_changed_files_set(&probe),
2057                            changed_lines: probe.changed_lines,
2058                            resource_health_score,
2059                            infra_unhealthy_signals,
2060                        });
2061                        continue;
2062                    }
2063                    let compile_ok = pod_state
2064                        .as_ref()
2065                        .map(|p| !p.phase.eq_ignore_ascii_case("failed"))
2066                        .unwrap_or(false);
2067                    observations.push(BranchObservation {
2068                        subtask_id: subtask_id.clone(),
2069                        branch: state.branch.clone(),
2070                        compile_ok,
2071                        changed_files: std::collections::HashSet::new(),
2072                        changed_lines: 0,
2073                        resource_health_score,
2074                        infra_unhealthy_signals,
2075                    });
2076                }
2077
2078                let tick = collapse_controller.sample_observations(&observations);
2079                if promoted_subtask_id != tick.promoted_subtask_id {
2080                    promoted_subtask_id = tick.promoted_subtask_id.clone();
2081                    if let Some(ref promoted) = promoted_subtask_id {
2082                        tracing::info!(subtask_id = %promoted, "Collapse controller promoted branch");
2083                        if let Some(audit) = crate::audit::try_audit_log() {
2084                            audit
2085                                .log_with_correlation(
2086                                    crate::audit::AuditCategory::Swarm,
2087                                    "collapse_promote_branch",
2088                                    crate::audit::AuditOutcome::Success,
2089                                    Some("collapse-controller".to_string()),
2090                                    Some(serde_json::json!({
2091                                        "swarm_id": swarm_id,
2092                                        "subtask_id": promoted,
2093                                        "execution_mode": "kubernetes_pod",
2094                                    })),
2095                                    None,
2096                                    None,
2097                                    Some(swarm_id.to_string()),
2098                                    None,
2099                                )
2100                                .await;
2101                        }
2102                    }
2103                }
2104
2105                for kill in tick.kills {
2106                    if kill_reasons.contains_key(&kill.subtask_id) {
2107                        continue;
2108                    }
2109                    if !active.contains_key(&kill.subtask_id) {
2110                        continue;
2111                    }
2112
2113                    if let Err(error) = k8s.delete_subagent_pod(&kill.subtask_id).await {
2114                        tracing::warn!(
2115                            subtask_id = %kill.subtask_id,
2116                            error = %error,
2117                            "Failed deleting Kubernetes pod after collapse kill"
2118                        );
2119                    }
2120                    kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
2121                    let elapsed_ms = active
2122                        .remove(&kill.subtask_id)
2123                        .map(|s| s.started_at.elapsed().as_millis() as u64)
2124                        .unwrap_or(0);
2125
2126                    tracing::warn!(
2127                        subtask_id = %kill.subtask_id,
2128                        branch = %kill.branch,
2129                        reason = %kill.reason,
2130                        "Collapse controller killed Kubernetes branch"
2131                    );
2132
2133                    if let Some(audit) = crate::audit::try_audit_log() {
2134                        audit
2135                            .log_with_correlation(
2136                                crate::audit::AuditCategory::Swarm,
2137                                "collapse_kill_branch",
2138                                crate::audit::AuditOutcome::Success,
2139                                Some("collapse-controller".to_string()),
2140                                Some(serde_json::json!({
2141                                    "swarm_id": swarm_id,
2142                                    "subtask_id": kill.subtask_id.clone(),
2143                                    "branch": kill.branch.clone(),
2144                                    "reason": kill.reason.clone(),
2145                                    "execution_mode": "kubernetes_pod",
2146                                })),
2147                                None,
2148                                None,
2149                                Some(swarm_id.to_string()),
2150                                None,
2151                            )
2152                            .await;
2153                    }
2154
2155                    self.try_send_event(SwarmEvent::SubTaskUpdate {
2156                        id: kill.subtask_id.clone(),
2157                        name: subtask_names
2158                            .get(&kill.subtask_id)
2159                            .cloned()
2160                            .unwrap_or_else(|| kill.subtask_id.clone()),
2161                        status: SubTaskStatus::Cancelled,
2162                        agent_name: Some(format!("agent-{}", kill.subtask_id)),
2163                    });
2164                    self.try_send_event(SwarmEvent::AgentError {
2165                        subtask_id: kill.subtask_id.clone(),
2166                        error: format!("Cancelled by collapse controller: {}", kill.reason),
2167                    });
2168
2169                    results.push(SubTaskResult {
2170                        subtask_id: kill.subtask_id.clone(),
2171                        subagent_id: format!("agent-{}", kill.subtask_id),
2172                        success: false,
2173                        result: format!(
2174                            "\n\n--- Collapse Controller ---\nBranch terminated: {}",
2175                            kill.reason
2176                        ),
2177                        steps: 0,
2178                        tool_calls: 0,
2179                        execution_time_ms: elapsed_ms,
2180                        error: Some(format!("Cancelled by collapse controller: {}", kill.reason)),
2181                        artifacts: Vec::new(),
2182                        retry_count: 0,
2183                    });
2184                }
2185            }
2186        }
2187
2188        if let Some(ref promoted) = promoted_subtask_id {
2189            results.sort_by_key(|result| {
2190                if &result.subtask_id == promoted {
2191                    0usize
2192                } else {
2193                    1usize
2194                }
2195            });
2196        }
2197
2198        if !active.is_empty() {
2199            let residual_ids: Vec<String> = active.keys().cloned().collect();
2200            for subtask_id in residual_ids {
2201                if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
2202                    tracing::warn!(
2203                        subtask_id = %subtask_id,
2204                        error = %error,
2205                        "Failed deleting residual Kubernetes pod at stage end"
2206                    );
2207                }
2208            }
2209        }
2210
2211        Ok(results)
2212    }
2213
2214    /// Aggregate results from all subtasks into a final result
2215    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
2216        let mut aggregated = String::new();
2217
2218        for (i, result) in results.iter().enumerate() {
2219            if result.success {
2220                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
2221            } else {
2222                aggregated.push_str(&format!(
2223                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
2224                    i + 1,
2225                    result.error.as_deref().unwrap_or("Unknown error")
2226                ));
2227            }
2228        }
2229
2230        Ok(aggregated)
2231    }
2232
2233    /// Execute a single task without decomposition (for simple cases)
2234    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
2235        self.execute(task, DecompositionStrategy::None).await
2236    }
2237}
2238
2239/// Builder for swarm execution
2240pub struct SwarmExecutorBuilder {
2241    config: SwarmConfig,
2242}
2243
2244impl SwarmExecutorBuilder {
2245    pub fn new() -> Self {
2246        Self {
2247            config: SwarmConfig::default(),
2248        }
2249    }
2250
2251    pub fn max_subagents(mut self, max: usize) -> Self {
2252        self.config.max_subagents = max;
2253        self
2254    }
2255
2256    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
2257        self.config.max_steps_per_subagent = max;
2258        self
2259    }
2260
2261    pub fn max_total_steps(mut self, max: usize) -> Self {
2262        self.config.max_total_steps = max;
2263        self
2264    }
2265
2266    pub fn timeout_secs(mut self, secs: u64) -> Self {
2267        self.config.subagent_timeout_secs = secs;
2268        self
2269    }
2270
2271    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
2272        self.config.parallel_enabled = enabled;
2273        self
2274    }
2275
2276    pub fn build(self) -> SwarmExecutor {
2277        SwarmExecutor::new(self.config)
2278    }
2279}
2280
2281impl Default for SwarmExecutorBuilder {
2282    fn default() -> Self {
2283        Self::new()
2284    }
2285}
2286
2287/// Bounded-concurrency parallelism for `git worktree add`. Higher
2288/// values reduce wall time at the cost of contention on the
2289/// `.git/worktrees` lock. 8 is empirically a sweet spot — past 8,
2290/// throughput plateaus and flaky lock errors increase.
2291const WORKTREE_CREATE_PARALLELISM: usize = 8;
2292
2293/// Create worktrees for a batch of subtask IDs in parallel.
2294///
2295/// Each creation is bounded-concurrent via [`WORKTREE_CREATE_PARALLELISM`].
2296/// Failures are logged and dropped from the returned map — callers
2297/// fall back to the shared working directory for those subtasks.
2298///
2299/// This replaces the previous serial `for … mgr.create(slug).await` loop
2300/// inside `execute_stage`, which blocked all subsequent agents from
2301/// starting until every worktree was provisioned. For an N=26 swarm on
2302/// a mid-size repo that alone saved roughly 30–40 seconds of startup.
2303async fn precreate_worktrees(
2304    mgr: Arc<WorktreeManager>,
2305    subtask_ids: Vec<String>,
2306) -> HashMap<String, WorktreeInfo> {
2307    stream::iter(subtask_ids.into_iter().map(|sid| {
2308        let mgr = Arc::clone(&mgr);
2309        async move {
2310            let slug = sid.replace("-", "_");
2311            match mgr.create(&slug).await {
2312                Ok(wt) => {
2313                    if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
2314                        tracing::warn!(
2315                            subtask_id = %sid,
2316                            error = %e,
2317                            "Failed to inject workspace stub into worktree"
2318                        );
2319                    }
2320                    tracing::info!(
2321                        subtask_id = %sid,
2322                        worktree_path = %wt.path.display(),
2323                        worktree_branch = %wt.branch,
2324                        "Created worktree for sub-agent"
2325                    );
2326                    Some((sid, wt))
2327                }
2328                Err(e) => {
2329                    tracing::warn!(
2330                        subtask_id = %sid,
2331                        error = %e,
2332                        "Failed to create worktree, sub-agent will use shared directory"
2333                    );
2334                    None
2335                }
2336            }
2337        }
2338    }))
2339    .buffer_unordered(WORKTREE_CREATE_PARALLELISM)
2340    .filter_map(|x| async move { x })
2341    .collect()
2342    .await
2343}
2344
2345/// Run the agentic loop for a sub-agent with tool execution
2346#[allow(clippy::too_many_arguments)]
2347/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
2348/// Resolve relative file paths in tool call arguments to use the given working directory.
2349///
2350/// When sub-agents run in worktrees, the file tools (read, write, edit, etc.) resolve
2351/// relative paths from the process CWD (the main repo), not the worktree. This function
2352/// rewrites relative paths to absolute paths within the worktree before tool execution.
2353fn resolve_tool_paths(
2354    tool_name: &str,
2355    args: &mut serde_json::Value,
2356    working_dir: &std::path::Path,
2357) {
2358    match tool_name {
2359        "read" | "write" | "list" | "grep" | "codesearch" => {
2360            if let Some(path) = args.get("path").and_then(|v| v.as_str()).map(String::from)
2361                && !std::path::Path::new(&path).is_absolute()
2362            {
2363                args["path"] = serde_json::json!(working_dir.join(&path).display().to_string());
2364            }
2365        }
2366        "edit" => {
2367            if let Some(path) = args
2368                .get("filePath")
2369                .and_then(|v| v.as_str())
2370                .map(String::from)
2371                && !std::path::Path::new(&path).is_absolute()
2372            {
2373                args["filePath"] = serde_json::json!(working_dir.join(&path).display().to_string());
2374            }
2375        }
2376        "glob" => {
2377            if let Some(pattern) = args
2378                .get("pattern")
2379                .and_then(|v| v.as_str())
2380                .map(String::from)
2381                && !std::path::Path::new(&pattern).is_absolute()
2382                && !pattern.starts_with("*")
2383            {
2384                args["pattern"] =
2385                    serde_json::json!(working_dir.join(&pattern).display().to_string());
2386            }
2387        }
2388        "multiedit" => {
2389            if let Some(edits) = args.get_mut("edits").and_then(|v| v.as_array_mut()) {
2390                for edit in edits.iter_mut() {
2391                    if let Some(file) = edit.get("file").and_then(|v| v.as_str()).map(String::from)
2392                        && !std::path::Path::new(&file).is_absolute()
2393                    {
2394                        edit["file"] =
2395                            serde_json::json!(working_dir.join(&file).display().to_string());
2396                    }
2397                }
2398            }
2399        }
2400        "patch" => {
2401            if let Some(path) = args.get("file").and_then(|v| v.as_str()).map(String::from)
2402                && !std::path::Path::new(&path).is_absolute()
2403            {
2404                args["file"] = serde_json::json!(working_dir.join(&path).display().to_string());
2405            }
2406        }
2407        "bash" => {
2408            // If bash has no cwd, set it to the working directory
2409            if args.get("cwd").and_then(|v| v.as_str()).is_none() {
2410                args["cwd"] = serde_json::json!(working_dir.display().to_string());
2411            }
2412        }
2413        _ => {}
2414    }
2415}
2416
2417pub async fn run_agent_loop(
2418    provider: Arc<dyn Provider>,
2419    model: &str,
2420    system_prompt: &str,
2421    user_prompt: &str,
2422    tools: Vec<crate::provider::ToolDefinition>,
2423    registry: Arc<ToolRegistry>,
2424    max_steps: usize,
2425    timeout_secs: u64,
2426    event_tx: Option<mpsc::Sender<SwarmEvent>>,
2427    subtask_id: String,
2428    bus: Option<Arc<AgentBus>>,
2429    working_dir: Option<std::path::PathBuf>,
2430) -> Result<(String, usize, usize, AgentLoopExit)> {
2431    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
2432    let temperature = 0.7;
2433
2434    tracing::info!(
2435        model = %model,
2436        max_steps = max_steps,
2437        timeout_secs = timeout_secs,
2438        "Sub-agent starting agentic loop"
2439    );
2440    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
2441    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
2442
2443    // Initialize conversation with system and user messages
2444    let mut messages = vec![
2445        Message {
2446            role: Role::System,
2447            content: vec![ContentPart::Text {
2448                text: system_prompt.to_string(),
2449            }],
2450        },
2451        Message {
2452            role: Role::User,
2453            content: vec![ContentPart::Text {
2454                text: user_prompt.to_string(),
2455            }],
2456        },
2457    ];
2458
2459    let mut steps = 0;
2460    let mut total_tool_calls = 0;
2461    let mut final_output = String::new();
2462
2463    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
2464
2465    let exit_reason = loop {
2466        if steps >= max_steps {
2467            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
2468            break AgentLoopExit::MaxStepsReached;
2469        }
2470
2471        if Instant::now() > deadline {
2472            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
2473            break AgentLoopExit::TimedOut;
2474        }
2475
2476        steps += 1;
2477        tracing::info!(step = steps, "Sub-agent step starting");
2478
2479        // Check context size and truncate if approaching limit
2480        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
2481
2482        let request = CompletionRequest {
2483            messages: messages.clone(),
2484            tools: tools.clone(),
2485            model: model.to_string(),
2486            temperature: Some(temperature),
2487            top_p: None,
2488            max_tokens: Some(8192),
2489            stop: Vec::new(),
2490        };
2491
2492        let step_start = Instant::now();
2493        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
2494        let step_duration = step_start.elapsed();
2495
2496        tracing::info!(
2497            step = steps,
2498            duration_ms = step_duration.as_millis() as u64,
2499            finish_reason = ?response.finish_reason,
2500            prompt_tokens = response.usage.prompt_tokens,
2501            completion_tokens = response.usage.completion_tokens,
2502            "Sub-agent step completed LLM call"
2503        );
2504
2505        // Extract text from response
2506        let mut text_parts = Vec::new();
2507        let mut thinking_parts = Vec::new();
2508        let mut tool_calls = Vec::new();
2509
2510        for part in &response.message.content {
2511            match part {
2512                ContentPart::Text { text } => {
2513                    text_parts.push(text.clone());
2514                }
2515                ContentPart::Thinking { text } if !text.is_empty() => {
2516                    thinking_parts.push(text.clone());
2517                }
2518                ContentPart::ToolCall {
2519                    id,
2520                    name,
2521                    arguments,
2522                    ..
2523                } => {
2524                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
2525                }
2526                _ => {}
2527            }
2528        }
2529
2530        // Publish thinking/reasoning to bus for training data capture
2531        if !thinking_parts.is_empty()
2532            && let Some(ref bus) = bus
2533        {
2534            let thinking_text = thinking_parts.join("\n");
2535            let handle = bus.handle(&subtask_id);
2536            handle.send(
2537                format!("agent.{subtask_id}.thinking"),
2538                BusMessage::AgentThinking {
2539                    agent_id: subtask_id.clone(),
2540                    thinking: thinking_text,
2541                    step: steps,
2542                },
2543            );
2544        }
2545
2546        // Log assistant output
2547        if !text_parts.is_empty() {
2548            let step_output = text_parts.join("\n");
2549            if !final_output.is_empty() {
2550                final_output.push('\n');
2551            }
2552            final_output.push_str(&step_output);
2553            tracing::info!(
2554                step = steps,
2555                output_len = final_output.len(),
2556                "Sub-agent text output"
2557            );
2558            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
2559
2560            // Emit assistant message event for TUI detail view
2561            if let Some(ref tx) = event_tx {
2562                let preview = if step_output.len() > 500 {
2563                    let mut end = 500;
2564                    while end > 0 && !step_output.is_char_boundary(end) {
2565                        end -= 1;
2566                    }
2567                    format!("{}...", &step_output[..end])
2568                } else {
2569                    step_output.clone()
2570                };
2571                let _ = tx.try_send(SwarmEvent::AgentMessage {
2572                    subtask_id: subtask_id.clone(),
2573                    entry: AgentMessageEntry {
2574                        role: "assistant".to_string(),
2575                        content: preview,
2576                        is_tool_call: false,
2577                    },
2578                });
2579            }
2580        }
2581
2582        // Log tool calls
2583        if !tool_calls.is_empty() {
2584            tracing::info!(
2585                step = steps,
2586                num_tool_calls = tool_calls.len(),
2587                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
2588                "Sub-agent requesting tool calls"
2589            );
2590        }
2591
2592        // Add assistant message to history
2593        messages.push(response.message.clone());
2594
2595        // If no tool calls or stop, we're done
2596        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
2597            tracing::info!(
2598                steps = steps,
2599                total_tool_calls = total_tool_calls,
2600                "Sub-agent finished"
2601            );
2602            break AgentLoopExit::Completed;
2603        }
2604
2605        // Execute tool calls
2606        let mut tool_results = Vec::new();
2607
2608        for (call_id, tool_name, arguments) in tool_calls {
2609            total_tool_calls += 1;
2610
2611            // Emit tool call event
2612            if let Some(ref tx) = event_tx {
2613                let _ = tx.try_send(SwarmEvent::AgentToolCall {
2614                    subtask_id: subtask_id.clone(),
2615                    tool_name: tool_name.clone(),
2616                });
2617            }
2618
2619            tracing::info!(
2620                step = steps,
2621                tool_call_id = %call_id,
2622                tool = %tool_name,
2623                "Executing tool"
2624            );
2625            tracing::debug!(
2626                tool = %tool_name,
2627                arguments = %arguments,
2628                "Tool call arguments"
2629            );
2630
2631            let tool_start = Instant::now();
2632            let mut tool_success = true;
2633            let result = if let Some(tool) = registry.get(&tool_name) {
2634                // Parse arguments as JSON
2635                let mut args: serde_json::Value =
2636                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
2637                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
2638                        serde_json::json!({})
2639                    });
2640
2641                // Resolve relative file paths to the working directory (critical for worktree isolation)
2642                if let Some(ref wd) = working_dir {
2643                    resolve_tool_paths(&tool_name, &mut args, wd);
2644                }
2645                let agent_name = format!("agent-{subtask_id}");
2646                let provenance =
2647                    ExecutionProvenance::for_operation(&agent_name, ExecutionOrigin::Swarm);
2648                args = enrich_tool_input_with_runtime_context(
2649                    &args,
2650                    working_dir
2651                        .as_deref()
2652                        .unwrap_or_else(|| std::path::Path::new(".")),
2653                    Some(model),
2654                    &subtask_id,
2655                    &agent_name,
2656                    Some(&provenance),
2657                );
2658
2659                match tool.execute(args).await {
2660                    Ok(r) => {
2661                        if r.success {
2662                            tracing::info!(
2663                                tool = %tool_name,
2664                                duration_ms = tool_start.elapsed().as_millis() as u64,
2665                                success = true,
2666                                "Tool execution completed"
2667                            );
2668                            r.output
2669                        } else {
2670                            tool_success = false;
2671                            tracing::warn!(
2672                                tool = %tool_name,
2673                                error = %r.output,
2674                                "Tool returned error"
2675                            );
2676                            format!("Tool error: {}", r.output)
2677                        }
2678                    }
2679                    Err(e) => {
2680                        tool_success = false;
2681                        tracing::error!(
2682                            tool = %tool_name,
2683                            error = %e,
2684                            "Tool execution failed"
2685                        );
2686                        format!("Tool execution failed: {}", e)
2687                    }
2688                }
2689            } else {
2690                tool_success = false;
2691                tracing::error!(tool = %tool_name, "Unknown tool requested");
2692                format!("Unknown tool: {}", tool_name)
2693            };
2694
2695            // Emit detailed tool call event
2696            if let Some(ref tx) = event_tx {
2697                let input_preview = if arguments.len() > 200 {
2698                    let mut end = 200;
2699                    while end > 0 && !arguments.is_char_boundary(end) {
2700                        end -= 1;
2701                    }
2702                    format!("{}...", &arguments[..end])
2703                } else {
2704                    arguments.clone()
2705                };
2706                let output_preview = if result.len() > 500 {
2707                    let mut end = 500;
2708                    while end > 0 && !result.is_char_boundary(end) {
2709                        end -= 1;
2710                    }
2711                    format!("{}...", &result[..end])
2712                } else {
2713                    result.clone()
2714                };
2715                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
2716                    subtask_id: subtask_id.clone(),
2717                    detail: AgentToolCallDetail {
2718                        tool_name: tool_name.clone(),
2719                        input_preview,
2720                        output_preview,
2721                        success: tool_success,
2722                    },
2723                });
2724            }
2725
2726            tracing::debug!(
2727                tool = %tool_name,
2728                result_len = result.len(),
2729                "Tool result"
2730            );
2731
2732            // Publish full, untruncated tool output to the bus so other
2733            // agents and the bus log see the complete result.
2734            if let Some(ref bus) = bus {
2735                let handle = bus.handle(&subtask_id);
2736                handle.send(
2737                    format!("tools.{tool_name}"),
2738                    BusMessage::ToolOutputFull {
2739                        agent_id: subtask_id.clone(),
2740                        tool_name: tool_name.clone(),
2741                        output: result.clone(),
2742                        success: tool_success,
2743                        step: steps,
2744                    },
2745                );
2746            }
2747
2748            // Process large results with RLM or truncate smaller ones
2749            let result = if result.len() > RLM_THRESHOLD_CHARS {
2750                // Use RLM for very large results
2751                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
2752                    .await
2753            } else {
2754                // Simple truncation for medium results
2755                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
2756            };
2757
2758            tool_results.push((call_id, tool_name, result));
2759        }
2760
2761        // Add tool results to conversation
2762        for (call_id, _tool_name, result) in tool_results {
2763            messages.push(Message {
2764                role: Role::Tool,
2765                content: vec![ContentPart::ToolResult {
2766                    tool_call_id: call_id,
2767                    content: result,
2768                }],
2769            });
2770        }
2771
2772        // Reset deadline after each successful step — agent is making progress
2773        deadline = Instant::now() + Duration::from_secs(timeout_secs);
2774    };
2775
2776    Ok((final_output, steps, total_tool_calls, exit_reason))
2777}