Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    CacheConfig, CacheStats, DecompositionStrategy, StageStats, SwarmCache, SwarmConfig,
8    SwarmResult,
9    orchestrator::Orchestrator,
10    result_store::ResultStore,
11    subtask::{SubTask, SubTaskResult, SubTaskStatus},
12};
13use crate::bus::{AgentBus, BusMessage};
14use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
15
16// Re-export swarm types for convenience
17pub use super::SwarmMessage;
18use crate::{
19    agent::Agent,
20    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
21    rlm::RlmExecutor,
22    swarm::{SwarmArtifact, SwarmStats},
23    telemetry::SwarmTelemetryCollector,
24    tool::ToolRegistry,
25    worktree::{WorktreeInfo, WorktreeManager},
26};
27use anyhow::Result;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::sync::{RwLock, mpsc};
32use tokio::time::{Duration, timeout};
33
34/// Default context limit (256k tokens - conservative for most models)
35const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
36
37/// Reserve tokens for the response generation
38const RESPONSE_RESERVE_TOKENS: usize = 8_192;
39
40/// Safety margin before we start truncating (90% of limit)
41const TRUNCATION_THRESHOLD: f64 = 0.85;
42
43/// Estimate token count from text (rough heuristic: ~4 chars per token)
44fn estimate_tokens(text: &str) -> usize {
45    // This is a rough estimate - actual tokenization varies by model
46    // Most tokenizers average 3-5 chars per token for English text
47    // We use 3.5 to be conservative
48    (text.len() as f64 / 3.5).ceil() as usize
49}
50
51/// Estimate total tokens in a message
52fn estimate_message_tokens(message: &Message) -> usize {
53    let mut tokens = 4; // Role overhead
54
55    for part in &message.content {
56        tokens += match part {
57            ContentPart::Text { text } => estimate_tokens(text),
58            ContentPart::ToolCall {
59                id,
60                name,
61                arguments,
62            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
63            ContentPart::ToolResult {
64                tool_call_id,
65                content,
66            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
67            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
68            ContentPart::Thinking { text } => estimate_tokens(text),
69        };
70    }
71
72    tokens
73}
74
75/// Estimate total tokens in all messages
76fn estimate_total_tokens(messages: &[Message]) -> usize {
77    messages.iter().map(estimate_message_tokens).sum()
78}
79
80/// Truncate messages to fit within context limit
81///
82/// Strategy:
83/// 1. First, aggressively truncate large tool results
84/// 2. Keep system message (first) and user message (second) always
85/// 3. Keep the most recent assistant + tool result pairs together
86/// 4. Drop oldest middle messages in matched pairs
87fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
88    let target_tokens =
89        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
90
91    let current_tokens = estimate_total_tokens(messages);
92    if current_tokens <= target_tokens {
93        return;
94    }
95
96    tracing::warn!(
97        current_tokens = current_tokens,
98        target_tokens = target_tokens,
99        context_limit = context_limit,
100        "Context approaching limit, truncating conversation history"
101    );
102
103    // FIRST: Aggressively truncate large tool results (this is the main culprit)
104    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
105
106    let after_tool_truncation = estimate_total_tokens(messages);
107    if after_tool_truncation <= target_tokens {
108        tracing::info!(
109            old_tokens = current_tokens,
110            new_tokens = after_tool_truncation,
111            "Truncated large tool results, context now within limits"
112        );
113        return;
114    }
115
116    // Minimum: keep first 2 (system + initial user) and last 4 messages
117    if messages.len() <= 6 {
118        tracing::warn!(
119            tokens = after_tool_truncation,
120            target = target_tokens,
121            "Cannot truncate further - conversation too short"
122        );
123        return;
124    }
125
126    // Remove messages from the middle, keeping first 2 and last 4
127    // But we need to remove in pairs to maintain tool_call_id references
128    let keep_start = 2;
129    let keep_end = 4;
130    let removable_count = messages.len() - keep_start - keep_end;
131
132    if removable_count == 0 {
133        return;
134    }
135
136    // Remove all middle messages
137    let removed_messages: Vec<_> = messages
138        .drain(keep_start..keep_start + removable_count)
139        .collect();
140    let summary = summarize_removed_messages(&removed_messages);
141
142    // Insert a summary message where we removed content
143    messages.insert(
144        keep_start,
145        Message {
146            role: Role::User,
147            content: vec![ContentPart::Text {
148                text: format!(
149                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
150                    removed_messages.len(),
151                    summary
152                ),
153            }],
154        },
155    );
156
157    let new_tokens = estimate_total_tokens(messages);
158    tracing::info!(
159        removed_messages = removed_messages.len(),
160        old_tokens = current_tokens,
161        new_tokens = new_tokens,
162        "Truncated conversation history"
163    );
164}
165
166/// Summarize removed messages for context preservation
167fn summarize_removed_messages(messages: &[Message]) -> String {
168    let mut summary = String::new();
169    let mut tool_calls: Vec<String> = Vec::new();
170
171    for msg in messages {
172        for part in &msg.content {
173            if let ContentPart::ToolCall { name, .. } = part {
174                if !tool_calls.contains(name) {
175                    tool_calls.push(name.clone());
176                }
177            }
178        }
179    }
180
181    if !tool_calls.is_empty() {
182        summary.push_str(&format!(
183            "Tools used in truncated history: {}",
184            tool_calls.join(", ")
185        ));
186    }
187
188    summary
189}
190
191/// Truncate large tool results that exceed a reasonable size
192fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
193    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
194    let mut truncated_count = 0;
195    let mut saved_tokens = 0usize;
196
197    for message in messages.iter_mut() {
198        for part in message.content.iter_mut() {
199            if let ContentPart::ToolResult { content, .. } = part {
200                let tokens = estimate_tokens(content);
201                if tokens > max_tokens_per_result {
202                    let old_len = content.len();
203                    *content = truncate_single_result(content, char_limit);
204                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
205                    if content.len() < old_len {
206                        truncated_count += 1;
207                    }
208                }
209            }
210        }
211    }
212
213    if truncated_count > 0 {
214        tracing::info!(
215            truncated_count = truncated_count,
216            saved_tokens = saved_tokens,
217            max_tokens_per_result = max_tokens_per_result,
218            "Truncated large tool results"
219        );
220    }
221}
222
223/// Truncate a single result string to a maximum character limit
224fn truncate_single_result(content: &str, max_chars: usize) -> String {
225    if content.len() <= max_chars {
226        return content.to_string();
227    }
228
229    // Find a valid char boundary at or before max_chars
230    let safe_limit = {
231        let mut limit = max_chars.min(content.len());
232        while limit > 0 && !content.is_char_boundary(limit) {
233            limit -= 1;
234        }
235        limit
236    };
237
238    // Try to find a good break point (newline) near the limit
239    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
240
241    let truncated = format!(
242        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
243        &content[..break_point],
244        content.len(),
245        break_point
246    );
247
248    tracing::debug!(
249        original_len = content.len(),
250        truncated_len = truncated.len(),
251        "Truncated large result"
252    );
253
254    truncated
255}
256
257/// Threshold for when to use RLM vs truncation (in characters)
258const RLM_THRESHOLD_CHARS: usize = 50_000;
259
260/// Max chars for simple truncation (below RLM threshold)
261const SIMPLE_TRUNCATE_CHARS: usize = 6000;
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum AgentLoopExit {
265    Completed,
266    MaxStepsReached,
267    TimedOut,
268}
269
270/// Process a large tool result using RLM to intelligently summarize it
271async fn process_large_result_with_rlm(
272    content: &str,
273    tool_name: &str,
274    provider: Arc<dyn Provider>,
275    model: &str,
276) -> String {
277    if content.len() <= SIMPLE_TRUNCATE_CHARS {
278        return content.to_string();
279    }
280
281    // For medium-sized content, just truncate
282    if content.len() <= RLM_THRESHOLD_CHARS {
283        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
284    }
285
286    // For very large content, use RLM to intelligently summarize
287    tracing::info!(
288        tool = %tool_name,
289        content_len = content.len(),
290        "Using RLM to process large tool result"
291    );
292
293    let query = format!(
294        "Summarize the key information from this {} output. \
295         Focus on: errors, warnings, important findings, and actionable items. \
296         Be concise but thorough.",
297        tool_name
298    );
299
300    let mut executor =
301        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
302
303    match executor.analyze(&query).await {
304        Ok(result) => {
305            tracing::info!(
306                tool = %tool_name,
307                original_len = content.len(),
308                summary_len = result.answer.len(),
309                iterations = result.iterations,
310                "RLM summarized large result"
311            );
312
313            format!(
314                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
315                tool_name,
316                content.len(),
317                result.answer.len(),
318                result.answer
319            )
320        }
321        Err(e) => {
322            tracing::warn!(
323                tool = %tool_name,
324                error = %e,
325                "RLM analysis failed, falling back to truncation"
326            );
327            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
328        }
329    }
330}
331
332/// The swarm executor runs subtasks in parallel
333pub struct SwarmExecutor {
334    config: SwarmConfig,
335    /// Optional agent for handling swarm-level coordination (reserved for future use)
336    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
337    /// Optional event channel for TUI real-time updates
338    event_tx: Option<mpsc::Sender<SwarmEvent>>,
339    /// Telemetry collector for swarm execution metrics
340    telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
341    /// Cache for avoiding duplicate subtask execution
342    cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
343    /// Shared result store for sub-agent result sharing
344    result_store: Arc<ResultStore>,
345    /// Optional agent bus for inter-agent communication
346    bus: Option<Arc<AgentBus>>,
347}
348
349impl SwarmExecutor {
350    /// Create a new executor
351    pub fn new(config: SwarmConfig) -> Self {
352        Self {
353            config,
354            coordinator_agent: None,
355            event_tx: None,
356            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
357            cache: None,
358            result_store: ResultStore::new_arc(),
359            bus: None,
360        }
361    }
362
363    /// Create a new executor with caching enabled
364    pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
365        let cache = SwarmCache::new(cache_config).await?;
366        Ok(Self {
367            config,
368            coordinator_agent: None,
369            event_tx: None,
370            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
371            cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
372            result_store: ResultStore::new_arc(),
373            bus: None,
374        })
375    }
376
377    /// Set a pre-initialized cache
378    pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
379        self.cache = Some(cache);
380        self
381    }
382
383    /// Set an agent bus for inter-agent communication
384    pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
385        self.bus = Some(bus);
386        self
387    }
388
389    /// Get the agent bus if set
390    pub fn bus(&self) -> Option<&Arc<AgentBus>> {
391        self.bus.as_ref()
392    }
393
394    /// Set an event channel for real-time TUI updates
395    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
396        self.event_tx = Some(tx);
397        self
398    }
399
400    /// Set a coordinator agent for swarm-level coordination
401    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
402        tracing::debug!("Setting coordinator agent for swarm execution");
403        self.coordinator_agent = Some(agent);
404        self
405    }
406
407    /// Set a telemetry collector for swarm execution metrics
408    pub fn with_telemetry(
409        mut self,
410        telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
411    ) -> Self {
412        self.telemetry = telemetry;
413        self
414    }
415
416    /// Get the telemetry collector as an Arc
417    pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
418        Arc::clone(&self.telemetry)
419    }
420    /// Get the coordinator agent if set
421    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
422        self.coordinator_agent.as_ref()
423    }
424
425    /// Get the shared result store
426    pub fn result_store(&self) -> &Arc<ResultStore> {
427        &self.result_store
428    }
429
430    /// Get cache statistics if caching is enabled
431    pub async fn cache_stats(&self) -> Option<CacheStats> {
432        if let Some(ref cache) = self.cache {
433            let cache_guard = cache.lock().await;
434            Some(cache_guard.stats().clone())
435        } else {
436            None
437        }
438    }
439
440    /// Clear the cache if enabled
441    pub async fn clear_cache(&self) -> Result<()> {
442        if let Some(ref cache) = self.cache {
443            let mut cache_guard = cache.lock().await;
444            cache_guard.clear().await?;
445        }
446        Ok(())
447    }
448
449    /// Send an event to the TUI if channel is connected (non-blocking)
450    fn try_send_event(&self, event: SwarmEvent) {
451        // Also emit on the agent bus if connected
452        if let Some(ref bus) = self.bus {
453            let handle = bus.handle("swarm-executor");
454            match &event {
455                SwarmEvent::Started { task, .. } => {
456                    handle.send(
457                        "broadcast",
458                        BusMessage::AgentReady {
459                            agent_id: "swarm-executor".to_string(),
460                            capabilities: vec![format!("executing:{task}")],
461                        },
462                    );
463                }
464                SwarmEvent::Complete { success, .. } => {
465                    let state = if *success {
466                        crate::a2a::types::TaskState::Completed
467                    } else {
468                        crate::a2a::types::TaskState::Failed
469                    };
470                    handle.send_task_update("swarm", state, None);
471                }
472                _ => {} // Other events are TUI-specific
473            }
474        }
475
476        if let Some(ref tx) = self.event_tx {
477            let _ = tx.try_send(event);
478        }
479    }
480
481    /// Execute a task using the swarm
482    pub async fn execute(
483        &self,
484        task: &str,
485        strategy: DecompositionStrategy,
486    ) -> Result<SwarmResult> {
487        let start_time = Instant::now();
488
489        // Create orchestrator
490        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
491
492        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
493
494        // Decompose the task
495        let subtasks = orchestrator.decompose(task, strategy).await?;
496
497        if subtasks.is_empty() {
498            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
499            return Ok(SwarmResult {
500                success: false,
501                result: String::new(),
502                subtask_results: Vec::new(),
503                stats: SwarmStats::default(),
504                artifacts: Vec::new(),
505                error: Some("No subtasks generated".to_string()),
506            });
507        }
508
509        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
510
511        self.try_send_event(SwarmEvent::Started {
512            task: task.to_string(),
513            total_subtasks: subtasks.len(),
514        });
515
516        // Emit decomposition event for TUI
517        self.try_send_event(SwarmEvent::Decomposed {
518            subtasks: subtasks
519                .iter()
520                .map(|s| SubTaskInfo {
521                    id: s.id.clone(),
522                    name: s.name.clone(),
523                    status: SubTaskStatus::Pending,
524                    stage: s.stage,
525                    dependencies: s.dependencies.clone(),
526                    agent_name: s.specialty.clone(),
527                    current_tool: None,
528                    steps: 0,
529                    max_steps: self.config.max_steps_per_subagent,
530                    tool_call_history: Vec::new(),
531                    messages: Vec::new(),
532                    output: None,
533                    error: None,
534                })
535                .collect(),
536        });
537
538        // Execute stages in order
539        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
540        let mut all_results: Vec<SubTaskResult> = Vec::new();
541        let artifacts: Vec<SwarmArtifact> = Vec::new();
542
543        // Initialize telemetry for this swarm execution
544        let swarm_id = uuid::Uuid::new_v4().to_string();
545        self.telemetry.lock().await.start_swarm(
546            swarm_id.clone(),
547            subtasks.len(),
548            &format!("{:?}", strategy),
549        );
550
551        // Shared state for completed results
552        let completed_results: Arc<RwLock<HashMap<String, String>>> =
553            Arc::new(RwLock::new(HashMap::new()));
554
555        for stage in 0..=max_stage {
556            let stage_start = Instant::now();
557
558            let stage_subtasks: Vec<SubTask> = orchestrator
559                .subtasks_for_stage(stage)
560                .into_iter()
561                .cloned()
562                .collect();
563
564            tracing::debug!(
565                "Stage {} has {} subtasks (max_stage={})",
566                stage,
567                stage_subtasks.len(),
568                max_stage
569            );
570
571            if stage_subtasks.is_empty() {
572                continue;
573            }
574
575            tracing::info!(
576                provider_name = %orchestrator.provider(),
577                "Executing stage {} with {} subtasks",
578                stage,
579                stage_subtasks.len()
580            );
581
582            // Execute all subtasks in this stage in parallel
583            let stage_results = self
584                .execute_stage(
585                    &orchestrator,
586                    stage_subtasks,
587                    completed_results.clone(),
588                    &swarm_id,
589                )
590                .await?;
591
592            // Update completed results for next stage
593            {
594                let mut completed = completed_results.write().await;
595                for result in &stage_results {
596                    completed.insert(result.subtask_id.clone(), result.result.clone());
597                    // Also publish to shared ResultStore for richer querying
598                    let tags = vec![
599                        format!("stage:{stage}"),
600                        format!("subtask:{}", result.subtask_id),
601                    ];
602                    let _ = self
603                        .result_store
604                        .publish(
605                            &result.subtask_id,
606                            &result.subagent_id,
607                            &result.result,
608                            tags,
609                            None,
610                        )
611                        .await;
612                }
613            }
614
615            // Update orchestrator stats
616            let stage_time = stage_start.elapsed().as_millis() as u64;
617            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
618            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
619
620            orchestrator.stats_mut().stages.push(StageStats {
621                stage,
622                subagent_count: stage_results.len(),
623                max_steps,
624                total_steps,
625                execution_time_ms: stage_time,
626            });
627
628            // Mark subtasks as completed
629            for result in &stage_results {
630                orchestrator.complete_subtask(&result.subtask_id, result.clone());
631            }
632
633            // Emit stage complete event
634            let stage_completed = stage_results.iter().filter(|r| r.success).count();
635            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
636            self.try_send_event(SwarmEvent::StageComplete {
637                stage,
638                completed: stage_completed,
639                failed: stage_failed,
640            });
641
642            all_results.extend(stage_results);
643        }
644
645        // Get provider name before mutable borrow
646        let provider_name = orchestrator.provider().to_string();
647
648        // Record overall execution latency
649        self.telemetry
650            .lock()
651            .await
652            .record_swarm_latency("total_execution", start_time.elapsed());
653
654        // Calculate final stats
655        let stats = orchestrator.stats_mut();
656        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
657        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
658        stats.calculate_critical_path();
659        stats.calculate_speedup();
660
661        // Aggregate results
662        let success = all_results.iter().all(|r| r.success);
663
664        // Complete telemetry collection
665        let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
666        let result = self.aggregate_results(&all_results).await?;
667
668        tracing::info!(
669            provider_name = %provider_name,
670            "Swarm execution complete: {} subtasks, {:.1}x speedup",
671            all_results.len(),
672            stats.speedup_factor
673        );
674
675        let final_stats = orchestrator.stats().clone();
676        self.try_send_event(SwarmEvent::Complete {
677            success,
678            stats: final_stats.clone(),
679        });
680
681        Ok(SwarmResult {
682            success,
683            result,
684            subtask_results: all_results,
685            stats: final_stats,
686            artifacts,
687            error: None,
688        })
689    }
690
691    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
692    async fn execute_stage(
693        &self,
694        orchestrator: &Orchestrator,
695        subtasks: Vec<SubTask>,
696        completed_results: Arc<RwLock<HashMap<String, String>>>,
697        swarm_id: &str,
698    ) -> Result<Vec<SubTaskResult>> {
699        let mut handles: Vec<(
700            String,
701            tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
702        )> = Vec::new();
703        let mut cached_results: Vec<SubTaskResult> = Vec::new();
704
705        // Rate limiting: semaphore for max concurrent requests
706        let semaphore = Arc::new(tokio::sync::Semaphore::new(
707            self.config.max_concurrent_requests,
708        ));
709        let delay_ms = self.config.request_delay_ms;
710
711        // Get provider info for tool registry
712        let model = orchestrator.model().to_string();
713        let provider_name = orchestrator.provider().to_string();
714        let providers = orchestrator.providers();
715        let provider = providers
716            .get(&provider_name)
717            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
718
719        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
720
721        // Create base tool registry with provider for ralph and batch tool
722        let base_tool_registry =
723            ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
724        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
725        // Include 'swarm_share' definition so LLMs know about it (registered per-agent below)
726        let mut tool_definitions: Vec<_> = base_tool_registry
727            .definitions()
728            .into_iter()
729            .filter(|t| t.name != "question")
730            .collect();
731
732        // Add swarm_share tool definition so LLMs know it's available
733        let swarm_share_def = crate::provider::ToolDefinition {
734            name: "swarm_share".to_string(),
735            description: "Share results with other sub-agents in the swarm. Actions: publish \
736                          (share a result), get (retrieve a result by key), query_tags (find \
737                          results by tags), query_prefix (find results by key prefix), list \
738                          (show all shared results)."
739                .to_string(),
740            parameters: serde_json::json!({
741                "type": "object",
742                "properties": {
743                    "action": {
744                        "type": "string",
745                        "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
746                        "description": "Action to perform"
747                    },
748                    "key": {
749                        "type": "string",
750                        "description": "Result key (for publish/get)"
751                    },
752                    "value": {
753                        "description": "Result value to publish (any JSON value)"
754                    },
755                    "tags": {
756                        "type": "array",
757                        "items": {"type": "string"},
758                        "description": "Tags for publish or query_tags"
759                    },
760                    "prefix": {
761                        "type": "string",
762                        "description": "Key prefix for query_prefix"
763                    }
764                },
765                "required": ["action"]
766            }),
767        };
768        tool_definitions.push(swarm_share_def);
769
770        // Clone the result store for sub-agent sharing
771        let result_store = Arc::clone(&self.result_store);
772
773        // Create worktree manager if enabled
774        let worktree_manager = if self.config.worktree_enabled {
775            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
776                std::env::current_dir()
777                    .map(|p| p.to_string_lossy().to_string())
778                    .unwrap_or_else(|_| ".".to_string())
779            });
780
781            match WorktreeManager::new(&working_dir) {
782                Ok(mgr) => {
783                    tracing::info!(
784                        working_dir = %working_dir,
785                        "Worktree isolation enabled for parallel sub-agents"
786                    );
787                    Some(Arc::new(mgr) as Arc<WorktreeManager>)
788                }
789                Err(e) => {
790                    tracing::warn!(
791                        error = %e,
792                        "Failed to create worktree manager, falling back to shared directory"
793                    );
794                    None
795                }
796            }
797        } else {
798            None
799        };
800
801        for (idx, subtask) in subtasks.into_iter().enumerate() {
802            let model = model.clone();
803            let _provider_name = provider_name.clone();
804            let provider = Arc::clone(&provider);
805
806            // Check cache first
807            if let Some(ref cache) = self.cache {
808                let mut cache_guard = cache.lock().await;
809                if let Some(cached_result) = cache_guard.get(&subtask).await {
810                    tracing::info!(
811                        subtask_id = %subtask.id,
812                        task_name = %subtask.name,
813                        "Cache hit for subtask, skipping execution"
814                    );
815                    self.try_send_event(SwarmEvent::SubTaskUpdate {
816                        id: subtask.id.clone(),
817                        name: subtask.name.clone(),
818                        status: SubTaskStatus::Completed,
819                        agent_name: Some("cached".to_string()),
820                    });
821                    cached_results.push(cached_result);
822                    continue;
823                }
824            }
825
826            // Get context from dependencies
827            let context = {
828                let completed = completed_results.read().await;
829                let mut dep_context = String::new();
830                for dep_id in &subtask.dependencies {
831                    if let Some(result) = completed.get(dep_id) {
832                        dep_context.push_str(&format!(
833                            "\n--- Result from dependency {} ---\n{}\n",
834                            dep_id, result
835                        ));
836                    }
837                }
838                dep_context
839            };
840
841            let instruction = subtask.instruction.clone();
842            let subtask_name = subtask.name.clone();
843            let specialty = subtask.specialty.clone().unwrap_or_default();
844            let subtask_id = subtask.id.clone();
845            let subtask_id_for_handle = subtask_id.clone();
846            let max_steps = self.config.max_steps_per_subagent;
847            let timeout_secs = self.config.subagent_timeout_secs;
848
849            // Clone for the async block
850            let tools = tool_definitions.clone();
851            let _base_registry = Arc::clone(&base_tool_registry);
852            let agent_result_store = Arc::clone(&result_store);
853            let sem = Arc::clone(&semaphore);
854            let stagger_delay = delay_ms * idx as u64; // Stagger start times
855            let worktree_mgr = worktree_manager.clone();
856            let event_tx = self.event_tx.clone();
857
858            // Generate sub-agent execution ID
859            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
860
861            // Log telemetry for this sub-agent
862            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
863
864            // Spawn the subtask execution with agentic tool loop
865            let handle = tokio::spawn(async move {
866                // Rate limiting: stagger start and acquire semaphore
867                if stagger_delay > 0 {
868                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
869                }
870                let _permit = sem
871                    .acquire()
872                    .await
873                    .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
874
875                let _agent_start = Instant::now();
876
877                let start = Instant::now();
878
879                // Create worktree for this sub-agent if enabled
880                let worktree_info = if let Some(ref mgr) = worktree_mgr {
881                    let task_slug = subtask_id.replace("-", "_");
882                    match mgr.create(&task_slug) {
883                        Ok(wt) => {
884                            tracing::info!(
885                                subtask_id = %subtask_id,
886                                worktree_path = %wt.path.display(),
887                                worktree_branch = %wt.branch,
888                                "Created worktree for sub-agent"
889                            );
890                            Some(wt)
891                        }
892                        Err(e) => {
893                            tracing::warn!(
894                                subtask_id = %subtask_id,
895                                error = %e,
896                                "Failed to create worktree, using shared directory"
897                            );
898                            None
899                        }
900                    }
901                } else {
902                    None
903                };
904
905                // Determine working directory
906                let working_dir = worktree_info
907                    .as_ref()
908                    .map(|wt| wt.path.display().to_string())
909                    .unwrap_or_else(|| ".".to_string());
910
911                // Load AGENTS.md from working directory
912                let working_path = std::path::Path::new(&working_dir);
913                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
914                    .map(|(content, _)| {
915                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
916                    })
917                    .unwrap_or_default();
918
919                // Build the system prompt for this sub-agent
920                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
921                let system_prompt = format!(
922                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
923
924WORKING DIRECTORY: {}
925All file operations should be relative to this directory.
926
927IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
928
929Available tools:
930- read: Read file contents
931- write: Write/create files  
932- edit: Edit existing files (search and replace)
933- multiedit: Make multiple edits at once
934- glob: Find files by pattern
935- grep: Search file contents
936- bash: Run shell commands (use cwd: \"{}\" parameter)
937- webfetch: Fetch web pages
938- prd: Generate structured PRD for complex tasks
939- ralph: Run autonomous agent loop on a PRD
940- swarm_share: Share results with other sub-agents running in parallel
941
942SHARING RESULTS:
943Use swarm_share to collaborate with other sub-agents:
944- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
945- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
946- swarm_share({{action: 'list'}}) to see all shared results
947- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
948
949COMPLEX TASKS:
950If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
9511. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
9522. Break down into user stories with acceptance criteria
9533. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
9544. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
955
956NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
957
958When done, provide a brief summary of what you accomplished.{agents_md_content}",
959                    specialty,
960                    subtask_id,
961                    working_dir,
962                    working_dir,
963                    prd_filename,
964                    prd_filename,
965                    prd_filename
966                );
967
968                let user_prompt = if context.is_empty() {
969                    format!("Complete this task:\n\n{}", instruction)
970                } else {
971                    format!(
972                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
973                        instruction, context
974                    )
975                };
976
977                // Emit AgentStarted event
978                if let Some(ref tx) = event_tx {
979                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
980                        id: subtask_id.clone(),
981                        name: subtask_name.clone(),
982                        status: SubTaskStatus::Running,
983                        agent_name: Some(format!("agent-{}", subtask_id)),
984                    });
985                    let _ = tx.try_send(SwarmEvent::AgentStarted {
986                        subtask_id: subtask_id.clone(),
987                        agent_name: format!("agent-{}", subtask_id),
988                        specialty: specialty.clone(),
989                    });
990                }
991
992                // Run the agentic loop
993                // Create per-agent registry with SwarmShareTool for this subtask
994                let mut agent_registry =
995                    ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
996                agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
997                    Arc::clone(&agent_result_store),
998                    subtask_id.clone(),
999                )));
1000                let registry = Arc::new(agent_registry);
1001
1002                let result = run_agent_loop(
1003                    provider,
1004                    &model,
1005                    &system_prompt,
1006                    &user_prompt,
1007                    tools,
1008                    registry,
1009                    max_steps,
1010                    timeout_secs,
1011                    event_tx.clone(),
1012                    subtask_id.clone(),
1013                )
1014                .await;
1015
1016                match result {
1017                    Ok((output, steps, tool_calls, exit_reason)) => {
1018                        let (success, status, error) = match exit_reason {
1019                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1020                            AgentLoopExit::MaxStepsReached => (
1021                                false,
1022                                SubTaskStatus::Failed,
1023                                Some(format!("Sub-agent hit max steps ({max_steps})")),
1024                            ),
1025                            AgentLoopExit::TimedOut => (
1026                                false,
1027                                SubTaskStatus::TimedOut,
1028                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
1029                            ),
1030                        };
1031
1032                        // Emit completion events
1033                        if let Some(ref tx) = event_tx {
1034                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1035                                id: subtask_id.clone(),
1036                                name: subtask_name.clone(),
1037                                status,
1038                                agent_name: Some(format!("agent-{}", subtask_id)),
1039                            });
1040                            if let Some(ref message) = error {
1041                                let _ = tx.try_send(SwarmEvent::AgentError {
1042                                    subtask_id: subtask_id.clone(),
1043                                    error: message.clone(),
1044                                });
1045                            }
1046                            let _ = tx.try_send(SwarmEvent::AgentOutput {
1047                                subtask_id: subtask_id.clone(),
1048                                output: output.clone(),
1049                            });
1050                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1051                                subtask_id: subtask_id.clone(),
1052                                success,
1053                                steps,
1054                            });
1055                        }
1056                        Ok((
1057                            SubTaskResult {
1058                                subtask_id: subtask_id.clone(),
1059                                subagent_id: format!("agent-{}", subtask_id),
1060                                success,
1061                                result: output,
1062                                steps,
1063                                tool_calls,
1064                                execution_time_ms: start.elapsed().as_millis() as u64,
1065                                error,
1066                                artifacts: Vec::new(),
1067                            },
1068                            worktree_info,
1069                        ))
1070                    }
1071                    Err(e) => {
1072                        // Emit error events
1073                        if let Some(ref tx) = event_tx {
1074                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1075                                id: subtask_id.clone(),
1076                                name: subtask_name.clone(),
1077                                status: SubTaskStatus::Failed,
1078                                agent_name: Some(format!("agent-{}", subtask_id)),
1079                            });
1080                            let _ = tx.try_send(SwarmEvent::AgentError {
1081                                subtask_id: subtask_id.clone(),
1082                                error: e.to_string(),
1083                            });
1084                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1085                                subtask_id: subtask_id.clone(),
1086                                success: false,
1087                                steps: 0,
1088                            });
1089                        }
1090                        Ok((
1091                            SubTaskResult {
1092                                subtask_id: subtask_id.clone(),
1093                                subagent_id: format!("agent-{}", subtask_id),
1094                                success: false,
1095                                result: String::new(),
1096                                steps: 0,
1097                                tool_calls: 0,
1098                                execution_time_ms: start.elapsed().as_millis() as u64,
1099                                error: Some(e.to_string()),
1100                                artifacts: Vec::new(),
1101                            },
1102                            worktree_info,
1103                        ))
1104                    }
1105                }
1106            });
1107
1108            handles.push((subtask_id_for_handle, handle));
1109        }
1110
1111        // Wait for all handles and handle worktree merging
1112        let mut results = cached_results;
1113        let auto_merge = self.config.worktree_auto_merge;
1114
1115        for (subtask_id, handle) in handles {
1116            match handle.await {
1117                Ok(Ok((mut result, worktree_info))) => {
1118                    // Handle worktree merge if applicable
1119                    if let Some(wt) = worktree_info {
1120                        if result.success && auto_merge {
1121                            if let Some(ref mgr) = worktree_manager {
1122                                match mgr.merge(&wt) {
1123                                    Ok(merge_result) => {
1124                                        if merge_result.success {
1125                                            tracing::info!(
1126                                                subtask_id = %result.subtask_id,
1127                                                files_changed = merge_result.files_changed,
1128                                                "Merged worktree changes successfully"
1129                                            );
1130                                            result.result.push_str(&format!(
1131                                                "\n\n--- Merge Result ---\n{}",
1132                                                merge_result.summary
1133                                            ));
1134                                        } else if merge_result.aborted {
1135                                            // Merge was aborted due to non-conflict failure
1136                                            tracing::warn!(
1137                                                subtask_id = %result.subtask_id,
1138                                                summary = %merge_result.summary,
1139                                                "Merge was aborted"
1140                                            );
1141                                            result.result.push_str(&format!(
1142                                                "\n\n--- Merge Aborted ---\n{}",
1143                                                merge_result.summary
1144                                            ));
1145                                        } else {
1146                                            tracing::warn!(
1147                                                subtask_id = %result.subtask_id,
1148                                                conflicts = ?merge_result.conflicts,
1149                                                "Merge had conflicts"
1150                                            );
1151                                            result.result.push_str(&format!(
1152                                                "\n\n--- Merge Conflicts ---\n{}",
1153                                                merge_result.summary
1154                                            ));
1155                                        }
1156
1157                                        // Cleanup worktree after merge
1158                                        if let Err(e) = mgr.cleanup(&wt) {
1159                                            tracing::warn!(
1160                                                error = %e,
1161                                                "Failed to cleanup worktree"
1162                                            );
1163                                        }
1164                                    }
1165                                    Err(e) => {
1166                                        tracing::error!(
1167                                            subtask_id = %result.subtask_id,
1168                                            error = %e,
1169                                            "Failed to merge worktree"
1170                                        );
1171                                    }
1172                                }
1173                            }
1174                        } else if !result.success {
1175                            // Keep worktree for debugging on failure
1176                            tracing::info!(
1177                                subtask_id = %result.subtask_id,
1178                                worktree_path = %wt.path.display(),
1179                                "Keeping worktree for debugging (task failed)"
1180                            );
1181                        }
1182                    }
1183
1184                    // Cache successful result
1185                    if result.success {
1186                        if let Some(ref cache_arc) = self.cache {
1187                            let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1188                                cache_arc.lock().await;
1189                            // Create a minimal subtask for cache lookup key
1190                            let cache_subtask = SubTask::new(&subtask_id, &result.result);
1191                            if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1192                                tracing::warn!(
1193                                    subtask_id = %result.subtask_id,
1194                                    error = %e,
1195                                    "Failed to cache subtask result"
1196                                );
1197                            }
1198                        }
1199                    }
1200
1201                    results.push(result);
1202                }
1203                Ok(Err(e)) => {
1204                    tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
1205                    if let Some(ref tx) = self.event_tx {
1206                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1207                            id: subtask_id.clone(),
1208                            name: subtask_id.clone(),
1209                            status: SubTaskStatus::Failed,
1210                            agent_name: Some(format!("agent-{}", subtask_id)),
1211                        });
1212                        let _ = tx.try_send(SwarmEvent::AgentError {
1213                            subtask_id: subtask_id.clone(),
1214                            error: e.to_string(),
1215                        });
1216                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1217                            subtask_id: subtask_id.clone(),
1218                            success: false,
1219                            steps: 0,
1220                        });
1221                    }
1222                    results.push(SubTaskResult {
1223                        subtask_id: subtask_id.clone(),
1224                        subagent_id: format!("agent-{}", subtask_id),
1225                        success: false,
1226                        result: String::new(),
1227                        steps: 0,
1228                        tool_calls: 0,
1229                        execution_time_ms: 0,
1230                        error: Some(e.to_string()),
1231                        artifacts: Vec::new(),
1232                    });
1233                }
1234                Err(e) => {
1235                    tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1236                    if let Some(ref tx) = self.event_tx {
1237                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1238                            id: subtask_id.clone(),
1239                            name: subtask_id.clone(),
1240                            status: SubTaskStatus::Failed,
1241                            agent_name: Some(format!("agent-{}", subtask_id)),
1242                        });
1243                        let _ = tx.try_send(SwarmEvent::AgentError {
1244                            subtask_id: subtask_id.clone(),
1245                            error: format!("Task join error: {}", e),
1246                        });
1247                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1248                            subtask_id: subtask_id.clone(),
1249                            success: false,
1250                            steps: 0,
1251                        });
1252                    }
1253                    results.push(SubTaskResult {
1254                        subtask_id: subtask_id.clone(),
1255                        subagent_id: format!("agent-{}", subtask_id),
1256                        success: false,
1257                        result: String::new(),
1258                        steps: 0,
1259                        tool_calls: 0,
1260                        execution_time_ms: 0,
1261                        error: Some(format!("Task join error: {}", e)),
1262                        artifacts: Vec::new(),
1263                    });
1264                }
1265            }
1266        }
1267
1268        Ok(results)
1269    }
1270
1271    /// Aggregate results from all subtasks into a final result
1272    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1273        let mut aggregated = String::new();
1274
1275        for (i, result) in results.iter().enumerate() {
1276            if result.success {
1277                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1278            } else {
1279                aggregated.push_str(&format!(
1280                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1281                    i + 1,
1282                    result.error.as_deref().unwrap_or("Unknown error")
1283                ));
1284            }
1285        }
1286
1287        Ok(aggregated)
1288    }
1289
1290    /// Execute a single task without decomposition (for simple cases)
1291    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1292        self.execute(task, DecompositionStrategy::None).await
1293    }
1294}
1295
1296/// Builder for swarm execution
1297pub struct SwarmExecutorBuilder {
1298    config: SwarmConfig,
1299}
1300
1301impl SwarmExecutorBuilder {
1302    pub fn new() -> Self {
1303        Self {
1304            config: SwarmConfig::default(),
1305        }
1306    }
1307
1308    pub fn max_subagents(mut self, max: usize) -> Self {
1309        self.config.max_subagents = max;
1310        self
1311    }
1312
1313    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1314        self.config.max_steps_per_subagent = max;
1315        self
1316    }
1317
1318    pub fn max_total_steps(mut self, max: usize) -> Self {
1319        self.config.max_total_steps = max;
1320        self
1321    }
1322
1323    pub fn timeout_secs(mut self, secs: u64) -> Self {
1324        self.config.subagent_timeout_secs = secs;
1325        self
1326    }
1327
1328    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1329        self.config.parallel_enabled = enabled;
1330        self
1331    }
1332
1333    pub fn build(self) -> SwarmExecutor {
1334        SwarmExecutor::new(self.config)
1335    }
1336}
1337
1338impl Default for SwarmExecutorBuilder {
1339    fn default() -> Self {
1340        Self::new()
1341    }
1342}
1343
1344/// Run the agentic loop for a sub-agent with tool execution
1345#[allow(clippy::too_many_arguments)]
1346/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
1347pub async fn run_agent_loop(
1348    provider: Arc<dyn Provider>,
1349    model: &str,
1350    system_prompt: &str,
1351    user_prompt: &str,
1352    tools: Vec<crate::provider::ToolDefinition>,
1353    registry: Arc<ToolRegistry>,
1354    max_steps: usize,
1355    timeout_secs: u64,
1356    event_tx: Option<mpsc::Sender<SwarmEvent>>,
1357    subtask_id: String,
1358) -> Result<(String, usize, usize, AgentLoopExit)> {
1359    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
1360    let temperature = 0.7;
1361
1362    tracing::info!(
1363        model = %model,
1364        max_steps = max_steps,
1365        timeout_secs = timeout_secs,
1366        "Sub-agent starting agentic loop"
1367    );
1368    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1369    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1370
1371    // Initialize conversation with system and user messages
1372    let mut messages = vec![
1373        Message {
1374            role: Role::System,
1375            content: vec![ContentPart::Text {
1376                text: system_prompt.to_string(),
1377            }],
1378        },
1379        Message {
1380            role: Role::User,
1381            content: vec![ContentPart::Text {
1382                text: user_prompt.to_string(),
1383            }],
1384        },
1385    ];
1386
1387    let mut steps = 0;
1388    let mut total_tool_calls = 0;
1389    let mut final_output = String::new();
1390
1391    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1392
1393    let exit_reason = loop {
1394        if steps >= max_steps {
1395            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1396            break AgentLoopExit::MaxStepsReached;
1397        }
1398
1399        if Instant::now() > deadline {
1400            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1401            break AgentLoopExit::TimedOut;
1402        }
1403
1404        steps += 1;
1405        tracing::info!(step = steps, "Sub-agent step starting");
1406
1407        // Check context size and truncate if approaching limit
1408        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1409
1410        let request = CompletionRequest {
1411            messages: messages.clone(),
1412            tools: tools.clone(),
1413            model: model.to_string(),
1414            temperature: Some(temperature),
1415            top_p: None,
1416            max_tokens: Some(8192),
1417            stop: Vec::new(),
1418        };
1419
1420        let step_start = Instant::now();
1421        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1422        let step_duration = step_start.elapsed();
1423
1424        tracing::info!(
1425            step = steps,
1426            duration_ms = step_duration.as_millis() as u64,
1427            finish_reason = ?response.finish_reason,
1428            prompt_tokens = response.usage.prompt_tokens,
1429            completion_tokens = response.usage.completion_tokens,
1430            "Sub-agent step completed LLM call"
1431        );
1432
1433        // Extract text from response
1434        let mut text_parts = Vec::new();
1435        let mut tool_calls = Vec::new();
1436
1437        for part in &response.message.content {
1438            match part {
1439                ContentPart::Text { text } => {
1440                    text_parts.push(text.clone());
1441                }
1442                ContentPart::ToolCall {
1443                    id,
1444                    name,
1445                    arguments,
1446                } => {
1447                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1448                }
1449                _ => {}
1450            }
1451        }
1452
1453        // Log assistant output
1454        if !text_parts.is_empty() {
1455            let step_output = text_parts.join("\n");
1456            if !final_output.is_empty() {
1457                final_output.push('\n');
1458            }
1459            final_output.push_str(&step_output);
1460            tracing::info!(
1461                step = steps,
1462                output_len = final_output.len(),
1463                "Sub-agent text output"
1464            );
1465            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1466
1467            // Emit assistant message event for TUI detail view
1468            if let Some(ref tx) = event_tx {
1469                let preview = if step_output.len() > 500 {
1470                    let mut end = 500;
1471                    while end > 0 && !step_output.is_char_boundary(end) {
1472                        end -= 1;
1473                    }
1474                    format!("{}...", &step_output[..end])
1475                } else {
1476                    step_output.clone()
1477                };
1478                let _ = tx.try_send(SwarmEvent::AgentMessage {
1479                    subtask_id: subtask_id.clone(),
1480                    entry: AgentMessageEntry {
1481                        role: "assistant".to_string(),
1482                        content: preview,
1483                        is_tool_call: false,
1484                    },
1485                });
1486            }
1487        }
1488
1489        // Log tool calls
1490        if !tool_calls.is_empty() {
1491            tracing::info!(
1492                step = steps,
1493                num_tool_calls = tool_calls.len(),
1494                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1495                "Sub-agent requesting tool calls"
1496            );
1497        }
1498
1499        // Add assistant message to history
1500        messages.push(response.message.clone());
1501
1502        // If no tool calls or stop, we're done
1503        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1504            tracing::info!(
1505                steps = steps,
1506                total_tool_calls = total_tool_calls,
1507                "Sub-agent finished"
1508            );
1509            break AgentLoopExit::Completed;
1510        }
1511
1512        // Execute tool calls
1513        let mut tool_results = Vec::new();
1514
1515        for (call_id, tool_name, arguments) in tool_calls {
1516            total_tool_calls += 1;
1517
1518            // Emit tool call event
1519            if let Some(ref tx) = event_tx {
1520                let _ = tx.try_send(SwarmEvent::AgentToolCall {
1521                    subtask_id: subtask_id.clone(),
1522                    tool_name: tool_name.clone(),
1523                });
1524            }
1525
1526            tracing::info!(
1527                step = steps,
1528                tool_call_id = %call_id,
1529                tool = %tool_name,
1530                "Executing tool"
1531            );
1532            tracing::debug!(
1533                tool = %tool_name,
1534                arguments = %arguments,
1535                "Tool call arguments"
1536            );
1537
1538            let tool_start = Instant::now();
1539            let mut tool_success = true;
1540            let result = if let Some(tool) = registry.get(&tool_name) {
1541                // Parse arguments as JSON
1542                let args: serde_json::Value =
1543                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
1544                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1545                        serde_json::json!({})
1546                    });
1547
1548                match tool.execute(args).await {
1549                    Ok(r) => {
1550                        if r.success {
1551                            tracing::info!(
1552                                tool = %tool_name,
1553                                duration_ms = tool_start.elapsed().as_millis() as u64,
1554                                success = true,
1555                                "Tool execution completed"
1556                            );
1557                            r.output
1558                        } else {
1559                            tool_success = false;
1560                            tracing::warn!(
1561                                tool = %tool_name,
1562                                error = %r.output,
1563                                "Tool returned error"
1564                            );
1565                            format!("Tool error: {}", r.output)
1566                        }
1567                    }
1568                    Err(e) => {
1569                        tool_success = false;
1570                        tracing::error!(
1571                            tool = %tool_name,
1572                            error = %e,
1573                            "Tool execution failed"
1574                        );
1575                        format!("Tool execution failed: {}", e)
1576                    }
1577                }
1578            } else {
1579                tool_success = false;
1580                tracing::error!(tool = %tool_name, "Unknown tool requested");
1581                format!("Unknown tool: {}", tool_name)
1582            };
1583
1584            // Emit detailed tool call event
1585            if let Some(ref tx) = event_tx {
1586                let input_preview = if arguments.len() > 200 {
1587                    let mut end = 200;
1588                    while end > 0 && !arguments.is_char_boundary(end) {
1589                        end -= 1;
1590                    }
1591                    format!("{}...", &arguments[..end])
1592                } else {
1593                    arguments.clone()
1594                };
1595                let output_preview = if result.len() > 500 {
1596                    let mut end = 500;
1597                    while end > 0 && !result.is_char_boundary(end) {
1598                        end -= 1;
1599                    }
1600                    format!("{}...", &result[..end])
1601                } else {
1602                    result.clone()
1603                };
1604                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1605                    subtask_id: subtask_id.clone(),
1606                    detail: AgentToolCallDetail {
1607                        tool_name: tool_name.clone(),
1608                        input_preview,
1609                        output_preview,
1610                        success: tool_success,
1611                    },
1612                });
1613            }
1614
1615            tracing::debug!(
1616                tool = %tool_name,
1617                result_len = result.len(),
1618                "Tool result"
1619            );
1620
1621            // Process large results with RLM or truncate smaller ones
1622            let result = if result.len() > RLM_THRESHOLD_CHARS {
1623                // Use RLM for very large results
1624                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1625                    .await
1626            } else {
1627                // Simple truncation for medium results
1628                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1629            };
1630
1631            tool_results.push((call_id, tool_name, result));
1632        }
1633
1634        // Add tool results to conversation
1635        for (call_id, _tool_name, result) in tool_results {
1636            messages.push(Message {
1637                role: Role::Tool,
1638                content: vec![ContentPart::ToolResult {
1639                    tool_call_id: call_id,
1640                    content: result,
1641                }],
1642            });
1643        }
1644
1645        // Reset deadline after each successful step — agent is making progress
1646        deadline = Instant::now() + Duration::from_secs(timeout_secs);
1647    };
1648
1649    Ok((final_output, steps, total_tool_calls, exit_reason))
1650}