Skip to main content

codetether_agent/swarm/
executor.rs

1//! Parallel execution engine for swarm operations
2//!
3//! Executes subtasks in parallel across multiple sub-agents,
4//! respecting dependencies and optimizing for critical path.
5
6use super::{
7    CacheConfig, CacheStats, DecompositionStrategy, StageStats, SwarmCache, SwarmConfig,
8    SwarmResult,
9    orchestrator::Orchestrator,
10    result_store::ResultStore,
11    subtask::{SubTask, SubTaskResult, SubTaskStatus},
12};
13use crate::bus::{AgentBus, BusMessage};
14use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
15
16// Re-export swarm types for convenience
17pub use super::SwarmMessage;
18use crate::{
19    agent::Agent,
20    provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
21    rlm::RlmExecutor,
22    swarm::{SwarmArtifact, SwarmStats},
23    telemetry::SwarmTelemetryCollector,
24    tool::ToolRegistry,
25    worktree::{WorktreeInfo, WorktreeManager},
26};
27use anyhow::Result;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::sync::{RwLock, mpsc};
32use tokio::time::{Duration, timeout};
33
34/// Default context limit (256k tokens - conservative for most models)
35const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
36
37/// Reserve tokens for the response generation
38const RESPONSE_RESERVE_TOKENS: usize = 8_192;
39
40/// Safety margin before we start truncating (90% of limit)
41const TRUNCATION_THRESHOLD: f64 = 0.85;
42
43/// Estimate token count from text (rough heuristic: ~4 chars per token)
44fn estimate_tokens(text: &str) -> usize {
45    // This is a rough estimate - actual tokenization varies by model
46    // Most tokenizers average 3-5 chars per token for English text
47    // We use 3.5 to be conservative
48    (text.len() as f64 / 3.5).ceil() as usize
49}
50
51/// Estimate total tokens in a message
52fn estimate_message_tokens(message: &Message) -> usize {
53    let mut tokens = 4; // Role overhead
54
55    for part in &message.content {
56        tokens += match part {
57            ContentPart::Text { text } => estimate_tokens(text),
58            ContentPart::ToolCall {
59                id,
60                name,
61                arguments,
62            } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
63            ContentPart::ToolResult {
64                tool_call_id,
65                content,
66            } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
67            ContentPart::Image { .. } | ContentPart::File { .. } => 2000, // Binary content is expensive
68            ContentPart::Thinking { text } => estimate_tokens(text),
69        };
70    }
71
72    tokens
73}
74
75/// Estimate total tokens in all messages
76fn estimate_total_tokens(messages: &[Message]) -> usize {
77    messages.iter().map(estimate_message_tokens).sum()
78}
79
80/// Truncate messages to fit within context limit
81///
82/// Strategy:
83/// 1. First, aggressively truncate large tool results
84/// 2. Keep system message (first) and user message (second) always
85/// 3. Keep the most recent assistant + tool result pairs together
86/// 4. Drop oldest middle messages in matched pairs
87fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
88    let target_tokens =
89        ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
90
91    let current_tokens = estimate_total_tokens(messages);
92    if current_tokens <= target_tokens {
93        return;
94    }
95
96    tracing::warn!(
97        current_tokens = current_tokens,
98        target_tokens = target_tokens,
99        context_limit = context_limit,
100        "Context approaching limit, truncating conversation history"
101    );
102
103    // FIRST: Aggressively truncate large tool results (this is the main culprit)
104    truncate_large_tool_results(messages, 2000); // Max 2k tokens per tool result
105
106    let after_tool_truncation = estimate_total_tokens(messages);
107    if after_tool_truncation <= target_tokens {
108        tracing::info!(
109            old_tokens = current_tokens,
110            new_tokens = after_tool_truncation,
111            "Truncated large tool results, context now within limits"
112        );
113        return;
114    }
115
116    // Minimum: keep first 2 (system + initial user) and last 4 messages
117    if messages.len() <= 6 {
118        tracing::warn!(
119            tokens = after_tool_truncation,
120            target = target_tokens,
121            "Cannot truncate further - conversation too short"
122        );
123        return;
124    }
125
126    // Remove messages from the middle, keeping first 2 and last 4
127    // But we need to remove in pairs to maintain tool_call_id references
128    let keep_start = 2;
129    let keep_end = 4;
130    let removable_count = messages.len() - keep_start - keep_end;
131
132    if removable_count == 0 {
133        return;
134    }
135
136    // Remove all middle messages
137    let removed_messages: Vec<_> = messages
138        .drain(keep_start..keep_start + removable_count)
139        .collect();
140    let summary = summarize_removed_messages(&removed_messages);
141
142    // Insert a summary message where we removed content
143    messages.insert(
144        keep_start,
145        Message {
146            role: Role::User,
147            content: vec![ContentPart::Text {
148                text: format!(
149                    "[Context truncated: {} earlier messages removed to fit context window]\n{}",
150                    removed_messages.len(),
151                    summary
152                ),
153            }],
154        },
155    );
156
157    let new_tokens = estimate_total_tokens(messages);
158    tracing::info!(
159        removed_messages = removed_messages.len(),
160        old_tokens = current_tokens,
161        new_tokens = new_tokens,
162        "Truncated conversation history"
163    );
164}
165
166/// Summarize removed messages for context preservation
167fn summarize_removed_messages(messages: &[Message]) -> String {
168    let mut summary = String::new();
169    let mut tool_calls: Vec<String> = Vec::new();
170
171    for msg in messages {
172        for part in &msg.content {
173            if let ContentPart::ToolCall { name, .. } = part {
174                if !tool_calls.contains(name) {
175                    tool_calls.push(name.clone());
176                }
177            }
178        }
179    }
180
181    if !tool_calls.is_empty() {
182        summary.push_str(&format!(
183            "Tools used in truncated history: {}",
184            tool_calls.join(", ")
185        ));
186    }
187
188    summary
189}
190
191/// Truncate large tool results that exceed a reasonable size
192fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
193    let char_limit = max_tokens_per_result * 3; // ~3 chars per token
194    let mut truncated_count = 0;
195    let mut saved_tokens = 0usize;
196
197    for message in messages.iter_mut() {
198        for part in message.content.iter_mut() {
199            if let ContentPart::ToolResult { content, .. } = part {
200                let tokens = estimate_tokens(content);
201                if tokens > max_tokens_per_result {
202                    let old_len = content.len();
203                    *content = truncate_single_result(content, char_limit);
204                    saved_tokens += tokens.saturating_sub(estimate_tokens(content));
205                    if content.len() < old_len {
206                        truncated_count += 1;
207                    }
208                }
209            }
210        }
211    }
212
213    if truncated_count > 0 {
214        tracing::info!(
215            truncated_count = truncated_count,
216            saved_tokens = saved_tokens,
217            max_tokens_per_result = max_tokens_per_result,
218            "Truncated large tool results"
219        );
220    }
221}
222
223/// Truncate a single result string to a maximum character limit
224fn truncate_single_result(content: &str, max_chars: usize) -> String {
225    if content.len() <= max_chars {
226        return content.to_string();
227    }
228
229    // Find a valid char boundary at or before max_chars
230    let safe_limit = {
231        let mut limit = max_chars.min(content.len());
232        while limit > 0 && !content.is_char_boundary(limit) {
233            limit -= 1;
234        }
235        limit
236    };
237
238    // Try to find a good break point (newline) near the limit
239    let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
240
241    let truncated = format!(
242        "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
243        &content[..break_point],
244        content.len(),
245        break_point
246    );
247
248    tracing::debug!(
249        original_len = content.len(),
250        truncated_len = truncated.len(),
251        "Truncated large result"
252    );
253
254    truncated
255}
256
257/// Threshold for when to use RLM vs truncation (in characters)
258const RLM_THRESHOLD_CHARS: usize = 50_000;
259
260/// Max chars for simple truncation (below RLM threshold)
261const SIMPLE_TRUNCATE_CHARS: usize = 6000;
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum AgentLoopExit {
265    Completed,
266    MaxStepsReached,
267    TimedOut,
268}
269
270/// Process a large tool result using RLM to intelligently summarize it
271async fn process_large_result_with_rlm(
272    content: &str,
273    tool_name: &str,
274    provider: Arc<dyn Provider>,
275    model: &str,
276) -> String {
277    if content.len() <= SIMPLE_TRUNCATE_CHARS {
278        return content.to_string();
279    }
280
281    // For medium-sized content, just truncate
282    if content.len() <= RLM_THRESHOLD_CHARS {
283        return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
284    }
285
286    // For very large content, use RLM to intelligently summarize
287    tracing::info!(
288        tool = %tool_name,
289        content_len = content.len(),
290        "Using RLM to process large tool result"
291    );
292
293    let query = format!(
294        "Summarize the key information from this {} output. \
295         Focus on: errors, warnings, important findings, and actionable items. \
296         Be concise but thorough.",
297        tool_name
298    );
299
300    let mut executor =
301        RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
302
303    match executor.analyze(&query).await {
304        Ok(result) => {
305            tracing::info!(
306                tool = %tool_name,
307                original_len = content.len(),
308                summary_len = result.answer.len(),
309                iterations = result.iterations,
310                "RLM summarized large result"
311            );
312
313            format!(
314                "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
315                tool_name,
316                content.len(),
317                result.answer.len(),
318                result.answer
319            )
320        }
321        Err(e) => {
322            tracing::warn!(
323                tool = %tool_name,
324                error = %e,
325                "RLM analysis failed, falling back to truncation"
326            );
327            truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
328        }
329    }
330}
331
332/// The swarm executor runs subtasks in parallel
333pub struct SwarmExecutor {
334    config: SwarmConfig,
335    /// Optional agent for handling swarm-level coordination (reserved for future use)
336    coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
337    /// Optional event channel for TUI real-time updates
338    event_tx: Option<mpsc::Sender<SwarmEvent>>,
339    /// Telemetry collector for swarm execution metrics
340    telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
341    /// Cache for avoiding duplicate subtask execution
342    cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
343    /// Shared result store for sub-agent result sharing
344    result_store: Arc<ResultStore>,
345    /// Optional agent bus for inter-agent communication
346    bus: Option<Arc<AgentBus>>,
347}
348
349impl SwarmExecutor {
350    /// Create a new executor
351    pub fn new(config: SwarmConfig) -> Self {
352        Self {
353            config,
354            coordinator_agent: None,
355            event_tx: None,
356            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
357            cache: None,
358            result_store: ResultStore::new_arc(),
359            bus: None,
360        }
361    }
362
363    /// Create a new executor with caching enabled
364    pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
365        let cache = SwarmCache::new(cache_config).await?;
366        Ok(Self {
367            config,
368            coordinator_agent: None,
369            event_tx: None,
370            telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
371            cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
372            result_store: ResultStore::new_arc(),
373            bus: None,
374        })
375    }
376
377    /// Set a pre-initialized cache
378    pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
379        self.cache = Some(cache);
380        self
381    }
382
383    /// Set an agent bus for inter-agent communication
384    pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
385        self.bus = Some(bus);
386        self
387    }
388
389    /// Get the agent bus if set
390    pub fn bus(&self) -> Option<&Arc<AgentBus>> {
391        self.bus.as_ref()
392    }
393
394    /// Set an event channel for real-time TUI updates
395    pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
396        self.event_tx = Some(tx);
397        self
398    }
399
400    /// Set a coordinator agent for swarm-level coordination
401    pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
402        tracing::debug!("Setting coordinator agent for swarm execution");
403        self.coordinator_agent = Some(agent);
404        self
405    }
406
407    /// Set a telemetry collector for swarm execution metrics
408    pub fn with_telemetry(
409        mut self,
410        telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
411    ) -> Self {
412        self.telemetry = telemetry;
413        self
414    }
415
416    /// Get the telemetry collector as an Arc
417    pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
418        Arc::clone(&self.telemetry)
419    }
420    /// Get the coordinator agent if set
421    pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
422        tracing::debug!(
423            has_coordinator = self.coordinator_agent.is_some(),
424            "Getting coordinator agent"
425        );
426        self.coordinator_agent.as_ref()
427    }
428
429    /// Get the shared result store
430    pub fn result_store(&self) -> &Arc<ResultStore> {
431        &self.result_store
432    }
433
434    /// Get cache statistics if caching is enabled
435    pub async fn cache_stats(&self) -> Option<CacheStats> {
436        if let Some(ref cache) = self.cache {
437            let cache_guard = cache.lock().await;
438            Some(cache_guard.stats().clone())
439        } else {
440            None
441        }
442    }
443
444    /// Clear the cache if enabled
445    pub async fn clear_cache(&self) -> Result<()> {
446        if let Some(ref cache) = self.cache {
447            let mut cache_guard = cache.lock().await;
448            cache_guard.clear().await?;
449        }
450        Ok(())
451    }
452
453    /// Send an event to the TUI if channel is connected (non-blocking)
454    fn try_send_event(&self, event: SwarmEvent) {
455        // Also emit on the agent bus if connected
456        if let Some(ref bus) = self.bus {
457            let handle = bus.handle("swarm-executor");
458            match &event {
459                SwarmEvent::Started { task, .. } => {
460                    handle.send(
461                        "broadcast",
462                        BusMessage::AgentReady {
463                            agent_id: "swarm-executor".to_string(),
464                            capabilities: vec![format!("executing:{task}")],
465                        },
466                    );
467                }
468                SwarmEvent::Complete { success, .. } => {
469                    let state = if *success {
470                        crate::a2a::types::TaskState::Completed
471                    } else {
472                        crate::a2a::types::TaskState::Failed
473                    };
474                    handle.send_task_update("swarm", state, None);
475                }
476                _ => {} // Other events are TUI-specific
477            }
478        }
479
480        if let Some(ref tx) = self.event_tx {
481            let _ = tx.try_send(event);
482        }
483    }
484
485    /// Execute a task using the swarm
486    pub async fn execute(
487        &self,
488        task: &str,
489        strategy: DecompositionStrategy,
490    ) -> Result<SwarmResult> {
491        let start_time = Instant::now();
492
493        // Create orchestrator
494        let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
495
496        tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
497
498        // Decompose the task
499        let subtasks = orchestrator.decompose(task, strategy).await?;
500
501        if subtasks.is_empty() {
502            self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
503            return Ok(SwarmResult {
504                success: false,
505                result: String::new(),
506                subtask_results: Vec::new(),
507                stats: SwarmStats::default(),
508                artifacts: Vec::new(),
509                error: Some("No subtasks generated".to_string()),
510            });
511        }
512
513        tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
514
515        self.try_send_event(SwarmEvent::Started {
516            task: task.to_string(),
517            total_subtasks: subtasks.len(),
518        });
519
520        // Emit decomposition event for TUI
521        self.try_send_event(SwarmEvent::Decomposed {
522            subtasks: subtasks
523                .iter()
524                .map(|s| SubTaskInfo {
525                    id: s.id.clone(),
526                    name: s.name.clone(),
527                    status: SubTaskStatus::Pending,
528                    stage: s.stage,
529                    dependencies: s.dependencies.clone(),
530                    agent_name: s.specialty.clone(),
531                    current_tool: None,
532                    steps: 0,
533                    max_steps: self.config.max_steps_per_subagent,
534                    tool_call_history: Vec::new(),
535                    messages: Vec::new(),
536                    output: None,
537                    error: None,
538                })
539                .collect(),
540        });
541
542        // Execute stages in order
543        let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
544        let mut all_results: Vec<SubTaskResult> = Vec::new();
545        let artifacts: Vec<SwarmArtifact> = Vec::new();
546
547        // Initialize telemetry for this swarm execution
548        let swarm_id = uuid::Uuid::new_v4().to_string();
549        self.telemetry.lock().await.start_swarm(
550            swarm_id.clone(),
551            subtasks.len(),
552            &format!("{:?}", strategy),
553        );
554
555        // Shared state for completed results
556        let completed_results: Arc<RwLock<HashMap<String, String>>> =
557            Arc::new(RwLock::new(HashMap::new()));
558
559        for stage in 0..=max_stage {
560            let stage_start = Instant::now();
561
562            let stage_subtasks: Vec<SubTask> = orchestrator
563                .subtasks_for_stage(stage)
564                .into_iter()
565                .cloned()
566                .collect();
567
568            tracing::debug!(
569                "Stage {} has {} subtasks (max_stage={})",
570                stage,
571                stage_subtasks.len(),
572                max_stage
573            );
574
575            if stage_subtasks.is_empty() {
576                continue;
577            }
578
579            tracing::info!(
580                provider_name = %orchestrator.provider(),
581                "Executing stage {} with {} subtasks",
582                stage,
583                stage_subtasks.len()
584            );
585
586            // Execute all subtasks in this stage in parallel
587            let stage_results = self
588                .execute_stage(
589                    &orchestrator,
590                    stage_subtasks,
591                    completed_results.clone(),
592                    &swarm_id,
593                )
594                .await?;
595
596            // Update completed results for next stage
597            {
598                let mut completed = completed_results.write().await;
599                for result in &stage_results {
600                    completed.insert(result.subtask_id.clone(), result.result.clone());
601                    // Also publish to shared ResultStore for richer querying
602                    let tags = vec![
603                        format!("stage:{stage}"),
604                        format!("subtask:{}", result.subtask_id),
605                    ];
606                    let _ = self
607                        .result_store
608                        .publish(
609                            &result.subtask_id,
610                            &result.subagent_id,
611                            &result.result,
612                            tags,
613                            None,
614                        )
615                        .await;
616                }
617            }
618
619            // Update orchestrator stats
620            let stage_time = stage_start.elapsed().as_millis() as u64;
621            let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
622            let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
623
624            orchestrator.stats_mut().stages.push(StageStats {
625                stage,
626                subagent_count: stage_results.len(),
627                max_steps,
628                total_steps,
629                execution_time_ms: stage_time,
630            });
631
632            // Mark subtasks as completed
633            for result in &stage_results {
634                orchestrator.complete_subtask(&result.subtask_id, result.clone());
635            }
636
637            // Emit stage complete event
638            let stage_completed = stage_results.iter().filter(|r| r.success).count();
639            let stage_failed = stage_results.iter().filter(|r| !r.success).count();
640            self.try_send_event(SwarmEvent::StageComplete {
641                stage,
642                completed: stage_completed,
643                failed: stage_failed,
644            });
645
646            all_results.extend(stage_results);
647        }
648
649        // Get provider name before mutable borrow
650        let provider_name = orchestrator.provider().to_string();
651
652        // Record overall execution latency
653        self.telemetry
654            .lock()
655            .await
656            .record_swarm_latency("total_execution", start_time.elapsed());
657
658        // Calculate final stats
659        let stats = orchestrator.stats_mut();
660        stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
661        stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
662        stats.calculate_critical_path();
663        stats.calculate_speedup();
664
665        // Aggregate results
666        let success = all_results.iter().all(|r| r.success);
667
668        // Complete telemetry collection
669        let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
670        let result = self.aggregate_results(&all_results).await?;
671
672        tracing::info!(
673            provider_name = %provider_name,
674            "Swarm execution complete: {} subtasks, {:.1}x speedup",
675            all_results.len(),
676            stats.speedup_factor
677        );
678
679        let final_stats = orchestrator.stats().clone();
680        self.try_send_event(SwarmEvent::Complete {
681            success,
682            stats: final_stats.clone(),
683        });
684
685        Ok(SwarmResult {
686            success,
687            result,
688            subtask_results: all_results,
689            stats: final_stats,
690            artifacts,
691            error: None,
692        })
693    }
694
695    /// Execute a single stage of subtasks in parallel (with rate limiting and worktree isolation)
696    async fn execute_stage(
697        &self,
698        orchestrator: &Orchestrator,
699        subtasks: Vec<SubTask>,
700        completed_results: Arc<RwLock<HashMap<String, String>>>,
701        swarm_id: &str,
702    ) -> Result<Vec<SubTaskResult>> {
703        let mut handles: Vec<(
704            String,
705            tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
706        )> = Vec::new();
707        let mut cached_results: Vec<SubTaskResult> = Vec::new();
708
709        // Rate limiting: semaphore for max concurrent requests
710        let semaphore = Arc::new(tokio::sync::Semaphore::new(
711            self.config.max_concurrent_requests,
712        ));
713        let delay_ms = self.config.request_delay_ms;
714
715        // Get provider info for tool registry
716        let model = orchestrator.model().to_string();
717        let provider_name = orchestrator.provider().to_string();
718        let providers = orchestrator.providers();
719        let provider = providers
720            .get(&provider_name)
721            .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
722
723        tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
724
725        // Create base tool registry with provider for ralph and batch tool
726        let base_tool_registry =
727            ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
728        // Filter out 'question' tool - sub-agents must be autonomous, not interactive
729        // Include 'swarm_share' definition so LLMs know about it (registered per-agent below)
730        let mut tool_definitions: Vec<_> = base_tool_registry
731            .definitions()
732            .into_iter()
733            .filter(|t| t.name != "question")
734            .collect();
735
736        // Add swarm_share tool definition so LLMs know it's available
737        let swarm_share_def = crate::provider::ToolDefinition {
738            name: "swarm_share".to_string(),
739            description: "Share results with other sub-agents in the swarm. Actions: publish \
740                          (share a result), get (retrieve a result by key), query_tags (find \
741                          results by tags), query_prefix (find results by key prefix), list \
742                          (show all shared results)."
743                .to_string(),
744            parameters: serde_json::json!({
745                "type": "object",
746                "properties": {
747                    "action": {
748                        "type": "string",
749                        "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
750                        "description": "Action to perform"
751                    },
752                    "key": {
753                        "type": "string",
754                        "description": "Result key (for publish/get)"
755                    },
756                    "value": {
757                        "description": "Result value to publish (any JSON value)"
758                    },
759                    "tags": {
760                        "type": "array",
761                        "items": {"type": "string"},
762                        "description": "Tags for publish or query_tags"
763                    },
764                    "prefix": {
765                        "type": "string",
766                        "description": "Key prefix for query_prefix"
767                    }
768                },
769                "required": ["action"]
770            }),
771        };
772        tool_definitions.push(swarm_share_def);
773
774        // Clone the result store for sub-agent sharing
775        let result_store = Arc::clone(&self.result_store);
776
777        // Create worktree manager if enabled
778        let worktree_manager = if self.config.worktree_enabled {
779            let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
780                std::env::current_dir()
781                    .map(|p| p.to_string_lossy().to_string())
782                    .unwrap_or_else(|_| ".".to_string())
783            });
784
785            match WorktreeManager::new(&working_dir) {
786                Ok(mgr) => {
787                    tracing::info!(
788                        working_dir = %working_dir,
789                        "Worktree isolation enabled for parallel sub-agents"
790                    );
791                    Some(Arc::new(mgr) as Arc<WorktreeManager>)
792                }
793                Err(e) => {
794                    tracing::warn!(
795                        error = %e,
796                        "Failed to create worktree manager, falling back to shared directory"
797                    );
798                    None
799                }
800            }
801        } else {
802            None
803        };
804
805        for (idx, subtask) in subtasks.into_iter().enumerate() {
806            let model = model.clone();
807            let _provider_name = provider_name.clone();
808            let provider = Arc::clone(&provider);
809
810            // Check cache first
811            if let Some(ref cache) = self.cache {
812                let mut cache_guard = cache.lock().await;
813                if let Some(cached_result) = cache_guard.get(&subtask).await {
814                    tracing::info!(
815                        subtask_id = %subtask.id,
816                        task_name = %subtask.name,
817                        "Cache hit for subtask, skipping execution"
818                    );
819                    self.try_send_event(SwarmEvent::SubTaskUpdate {
820                        id: subtask.id.clone(),
821                        name: subtask.name.clone(),
822                        status: SubTaskStatus::Completed,
823                        agent_name: Some("cached".to_string()),
824                    });
825                    cached_results.push(cached_result);
826                    continue;
827                }
828            }
829
830            // Get context from dependencies
831            let context = {
832                let completed = completed_results.read().await;
833                let mut dep_context = String::new();
834                for dep_id in &subtask.dependencies {
835                    if let Some(result) = completed.get(dep_id) {
836                        dep_context.push_str(&format!(
837                            "\n--- Result from dependency {} ---\n{}\n",
838                            dep_id, result
839                        ));
840                    }
841                }
842                dep_context
843            };
844
845            let instruction = subtask.instruction.clone();
846            let subtask_name = subtask.name.clone();
847            let specialty = subtask.specialty.clone().unwrap_or_default();
848            let subtask_id = subtask.id.clone();
849            let subtask_id_for_handle = subtask_id.clone();
850            let max_steps = self.config.max_steps_per_subagent;
851            let timeout_secs = self.config.subagent_timeout_secs;
852
853            // Clone for the async block
854            let tools = tool_definitions.clone();
855            let _base_registry = Arc::clone(&base_tool_registry);
856            let agent_result_store = Arc::clone(&result_store);
857            let sem = Arc::clone(&semaphore);
858            let stagger_delay = delay_ms * idx as u64; // Stagger start times
859            let worktree_mgr = worktree_manager.clone();
860            let event_tx = self.event_tx.clone();
861
862            // Generate sub-agent execution ID
863            let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
864
865            // Log telemetry for this sub-agent
866            tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
867
868            // Spawn the subtask execution with agentic tool loop
869            let handle = tokio::spawn(async move {
870                // Rate limiting: stagger start and acquire semaphore
871                if stagger_delay > 0 {
872                    tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
873                }
874                let _permit = sem
875                    .acquire()
876                    .await
877                    .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
878
879                let _agent_start = Instant::now();
880
881                let start = Instant::now();
882
883                // Create worktree for this sub-agent if enabled
884                let worktree_info = if let Some(ref mgr) = worktree_mgr {
885                    let task_slug = subtask_id.replace("-", "_");
886                    match mgr.create(&task_slug) {
887                        Ok(wt) => {
888                            tracing::info!(
889                                subtask_id = %subtask_id,
890                                worktree_path = %wt.path.display(),
891                                worktree_branch = %wt.branch,
892                                "Created worktree for sub-agent"
893                            );
894                            Some(wt)
895                        }
896                        Err(e) => {
897                            tracing::warn!(
898                                subtask_id = %subtask_id,
899                                error = %e,
900                                "Failed to create worktree, using shared directory"
901                            );
902                            None
903                        }
904                    }
905                } else {
906                    None
907                };
908
909                // Determine working directory
910                let working_dir = worktree_info
911                    .as_ref()
912                    .map(|wt| wt.path.display().to_string())
913                    .unwrap_or_else(|| ".".to_string());
914
915                // Load AGENTS.md from working directory
916                let working_path = std::path::Path::new(&working_dir);
917                let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
918                    .map(|(content, _)| {
919                        format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
920                    })
921                    .unwrap_or_default();
922
923                // Build the system prompt for this sub-agent
924                let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
925                let system_prompt = format!(
926                    "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
927
928WORKING DIRECTORY: {}
929All file operations should be relative to this directory.
930
931IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
932
933Available tools:
934- read: Read file contents
935- write: Write/create files  
936- edit: Edit existing files (search and replace)
937- multiedit: Make multiple edits at once
938- glob: Find files by pattern
939- grep: Search file contents
940- bash: Run shell commands (use cwd: \"{}\" parameter)
941- webfetch: Fetch web pages
942- prd: Generate structured PRD for complex tasks
943- ralph: Run autonomous agent loop on a PRD
944- swarm_share: Share results with other sub-agents running in parallel
945
946SHARING RESULTS:
947Use swarm_share to collaborate with other sub-agents:
948- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
949- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
950- swarm_share({{action: 'list'}}) to see all shared results
951- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
952
953COMPLEX TASKS:
954If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
9551. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
9562. Break down into user stories with acceptance criteria
9573. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
9584. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
959
960NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
961
962When done, provide a brief summary of what you accomplished.{agents_md_content}",
963                    specialty,
964                    subtask_id,
965                    working_dir,
966                    working_dir,
967                    prd_filename,
968                    prd_filename,
969                    prd_filename
970                );
971
972                let user_prompt = if context.is_empty() {
973                    format!("Complete this task:\n\n{}", instruction)
974                } else {
975                    format!(
976                        "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
977                        instruction, context
978                    )
979                };
980
981                // Emit AgentStarted event
982                if let Some(ref tx) = event_tx {
983                    let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
984                        id: subtask_id.clone(),
985                        name: subtask_name.clone(),
986                        status: SubTaskStatus::Running,
987                        agent_name: Some(format!("agent-{}", subtask_id)),
988                    });
989                    let _ = tx.try_send(SwarmEvent::AgentStarted {
990                        subtask_id: subtask_id.clone(),
991                        agent_name: format!("agent-{}", subtask_id),
992                        specialty: specialty.clone(),
993                    });
994                }
995
996                // Run the agentic loop
997                // Create per-agent registry with SwarmShareTool for this subtask
998                let mut agent_registry =
999                    ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1000                agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1001                    Arc::clone(&agent_result_store),
1002                    subtask_id.clone(),
1003                )));
1004                let registry = Arc::new(agent_registry);
1005
1006                let result = run_agent_loop(
1007                    provider,
1008                    &model,
1009                    &system_prompt,
1010                    &user_prompt,
1011                    tools,
1012                    registry,
1013                    max_steps,
1014                    timeout_secs,
1015                    event_tx.clone(),
1016                    subtask_id.clone(),
1017                )
1018                .await;
1019
1020                match result {
1021                    Ok((output, steps, tool_calls, exit_reason)) => {
1022                        let (success, status, error) = match exit_reason {
1023                            AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1024                            AgentLoopExit::MaxStepsReached => (
1025                                false,
1026                                SubTaskStatus::Failed,
1027                                Some(format!("Sub-agent hit max steps ({max_steps})")),
1028                            ),
1029                            AgentLoopExit::TimedOut => (
1030                                false,
1031                                SubTaskStatus::TimedOut,
1032                                Some(format!("Sub-agent timed out after {timeout_secs}s")),
1033                            ),
1034                        };
1035
1036                        // Emit completion events
1037                        if let Some(ref tx) = event_tx {
1038                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1039                                id: subtask_id.clone(),
1040                                name: subtask_name.clone(),
1041                                status,
1042                                agent_name: Some(format!("agent-{}", subtask_id)),
1043                            });
1044                            if let Some(ref message) = error {
1045                                let _ = tx.try_send(SwarmEvent::AgentError {
1046                                    subtask_id: subtask_id.clone(),
1047                                    error: message.clone(),
1048                                });
1049                            }
1050                            let _ = tx.try_send(SwarmEvent::AgentOutput {
1051                                subtask_id: subtask_id.clone(),
1052                                output: output.clone(),
1053                            });
1054                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1055                                subtask_id: subtask_id.clone(),
1056                                success,
1057                                steps,
1058                            });
1059                        }
1060                        Ok((
1061                            SubTaskResult {
1062                                subtask_id: subtask_id.clone(),
1063                                subagent_id: format!("agent-{}", subtask_id),
1064                                success,
1065                                result: output,
1066                                steps,
1067                                tool_calls,
1068                                execution_time_ms: start.elapsed().as_millis() as u64,
1069                                error,
1070                                artifacts: Vec::new(),
1071                            },
1072                            worktree_info,
1073                        ))
1074                    }
1075                    Err(e) => {
1076                        // Emit error events
1077                        if let Some(ref tx) = event_tx {
1078                            let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1079                                id: subtask_id.clone(),
1080                                name: subtask_name.clone(),
1081                                status: SubTaskStatus::Failed,
1082                                agent_name: Some(format!("agent-{}", subtask_id)),
1083                            });
1084                            let _ = tx.try_send(SwarmEvent::AgentError {
1085                                subtask_id: subtask_id.clone(),
1086                                error: e.to_string(),
1087                            });
1088                            let _ = tx.try_send(SwarmEvent::AgentComplete {
1089                                subtask_id: subtask_id.clone(),
1090                                success: false,
1091                                steps: 0,
1092                            });
1093                        }
1094                        Ok((
1095                            SubTaskResult {
1096                                subtask_id: subtask_id.clone(),
1097                                subagent_id: format!("agent-{}", subtask_id),
1098                                success: false,
1099                                result: String::new(),
1100                                steps: 0,
1101                                tool_calls: 0,
1102                                execution_time_ms: start.elapsed().as_millis() as u64,
1103                                error: Some(e.to_string()),
1104                                artifacts: Vec::new(),
1105                            },
1106                            worktree_info,
1107                        ))
1108                    }
1109                }
1110            });
1111
1112            handles.push((subtask_id_for_handle, handle));
1113        }
1114
1115        // Wait for all handles and handle worktree merging
1116        let mut results = cached_results;
1117        let auto_merge = self.config.worktree_auto_merge;
1118
1119        for (subtask_id, handle) in handles {
1120            match handle.await {
1121                Ok(Ok((mut result, worktree_info))) => {
1122                    // Handle worktree merge if applicable
1123                    if let Some(wt) = worktree_info {
1124                        if result.success && auto_merge {
1125                            if let Some(ref mgr) = worktree_manager {
1126                                match mgr.merge(&wt) {
1127                                    Ok(merge_result) => {
1128                                        if merge_result.success {
1129                                            tracing::info!(
1130                                                subtask_id = %result.subtask_id,
1131                                                files_changed = merge_result.files_changed,
1132                                                "Merged worktree changes successfully"
1133                                            );
1134                                            result.result.push_str(&format!(
1135                                                "\n\n--- Merge Result ---\n{}",
1136                                                merge_result.summary
1137                                            ));
1138                                        } else if merge_result.aborted {
1139                                            // Merge was aborted due to non-conflict failure
1140                                            tracing::warn!(
1141                                                subtask_id = %result.subtask_id,
1142                                                summary = %merge_result.summary,
1143                                                "Merge was aborted"
1144                                            );
1145                                            result.result.push_str(&format!(
1146                                                "\n\n--- Merge Aborted ---\n{}",
1147                                                merge_result.summary
1148                                            ));
1149                                        } else {
1150                                            tracing::warn!(
1151                                                subtask_id = %result.subtask_id,
1152                                                conflicts = ?merge_result.conflicts,
1153                                                "Merge had conflicts"
1154                                            );
1155                                            result.result.push_str(&format!(
1156                                                "\n\n--- Merge Conflicts ---\n{}",
1157                                                merge_result.summary
1158                                            ));
1159                                        }
1160
1161                                        // Cleanup worktree after merge
1162                                        if let Err(e) = mgr.cleanup(&wt) {
1163                                            tracing::warn!(
1164                                                error = %e,
1165                                                "Failed to cleanup worktree"
1166                                            );
1167                                        }
1168                                    }
1169                                    Err(e) => {
1170                                        tracing::error!(
1171                                            subtask_id = %result.subtask_id,
1172                                            error = %e,
1173                                            "Failed to merge worktree"
1174                                        );
1175                                    }
1176                                }
1177                            }
1178                        } else if !result.success {
1179                            // Keep worktree for debugging on failure
1180                            tracing::info!(
1181                                subtask_id = %result.subtask_id,
1182                                worktree_path = %wt.path.display(),
1183                                "Keeping worktree for debugging (task failed)"
1184                            );
1185                        }
1186                    }
1187
1188                    // Cache successful result
1189                    if result.success {
1190                        if let Some(ref cache_arc) = self.cache {
1191                            let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1192                                cache_arc.lock().await;
1193                            // Create a minimal subtask for cache lookup key
1194                            let cache_subtask = SubTask::new(&subtask_id, &result.result);
1195                            if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1196                                tracing::warn!(
1197                                    subtask_id = %result.subtask_id,
1198                                    error = %e,
1199                                    "Failed to cache subtask result"
1200                                );
1201                            }
1202                        }
1203                    }
1204
1205                    results.push(result);
1206                }
1207                Ok(Err(e)) => {
1208                    tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
1209                    if let Some(ref tx) = self.event_tx {
1210                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1211                            id: subtask_id.clone(),
1212                            name: subtask_id.clone(),
1213                            status: SubTaskStatus::Failed,
1214                            agent_name: Some(format!("agent-{}", subtask_id)),
1215                        });
1216                        let _ = tx.try_send(SwarmEvent::AgentError {
1217                            subtask_id: subtask_id.clone(),
1218                            error: e.to_string(),
1219                        });
1220                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1221                            subtask_id: subtask_id.clone(),
1222                            success: false,
1223                            steps: 0,
1224                        });
1225                    }
1226                    results.push(SubTaskResult {
1227                        subtask_id: subtask_id.clone(),
1228                        subagent_id: format!("agent-{}", subtask_id),
1229                        success: false,
1230                        result: String::new(),
1231                        steps: 0,
1232                        tool_calls: 0,
1233                        execution_time_ms: 0,
1234                        error: Some(e.to_string()),
1235                        artifacts: Vec::new(),
1236                    });
1237                }
1238                Err(e) => {
1239                    tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1240                    if let Some(ref tx) = self.event_tx {
1241                        let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1242                            id: subtask_id.clone(),
1243                            name: subtask_id.clone(),
1244                            status: SubTaskStatus::Failed,
1245                            agent_name: Some(format!("agent-{}", subtask_id)),
1246                        });
1247                        let _ = tx.try_send(SwarmEvent::AgentError {
1248                            subtask_id: subtask_id.clone(),
1249                            error: format!("Task join error: {}", e),
1250                        });
1251                        let _ = tx.try_send(SwarmEvent::AgentComplete {
1252                            subtask_id: subtask_id.clone(),
1253                            success: false,
1254                            steps: 0,
1255                        });
1256                    }
1257                    results.push(SubTaskResult {
1258                        subtask_id: subtask_id.clone(),
1259                        subagent_id: format!("agent-{}", subtask_id),
1260                        success: false,
1261                        result: String::new(),
1262                        steps: 0,
1263                        tool_calls: 0,
1264                        execution_time_ms: 0,
1265                        error: Some(format!("Task join error: {}", e)),
1266                        artifacts: Vec::new(),
1267                    });
1268                }
1269            }
1270        }
1271
1272        Ok(results)
1273    }
1274
1275    /// Aggregate results from all subtasks into a final result
1276    async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1277        let mut aggregated = String::new();
1278
1279        for (i, result) in results.iter().enumerate() {
1280            if result.success {
1281                aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1282            } else {
1283                aggregated.push_str(&format!(
1284                    "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1285                    i + 1,
1286                    result.error.as_deref().unwrap_or("Unknown error")
1287                ));
1288            }
1289        }
1290
1291        Ok(aggregated)
1292    }
1293
1294    /// Execute a single task without decomposition (for simple cases)
1295    pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1296        self.execute(task, DecompositionStrategy::None).await
1297    }
1298}
1299
1300/// Builder for swarm execution
1301pub struct SwarmExecutorBuilder {
1302    config: SwarmConfig,
1303}
1304
1305impl SwarmExecutorBuilder {
1306    pub fn new() -> Self {
1307        Self {
1308            config: SwarmConfig::default(),
1309        }
1310    }
1311
1312    pub fn max_subagents(mut self, max: usize) -> Self {
1313        self.config.max_subagents = max;
1314        self
1315    }
1316
1317    pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1318        self.config.max_steps_per_subagent = max;
1319        self
1320    }
1321
1322    pub fn max_total_steps(mut self, max: usize) -> Self {
1323        self.config.max_total_steps = max;
1324        self
1325    }
1326
1327    pub fn timeout_secs(mut self, secs: u64) -> Self {
1328        self.config.subagent_timeout_secs = secs;
1329        self
1330    }
1331
1332    pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1333        self.config.parallel_enabled = enabled;
1334        self
1335    }
1336
1337    pub fn build(self) -> SwarmExecutor {
1338        SwarmExecutor::new(self.config)
1339    }
1340}
1341
1342impl Default for SwarmExecutorBuilder {
1343    fn default() -> Self {
1344        Self::new()
1345    }
1346}
1347
1348/// Run the agentic loop for a sub-agent with tool execution
1349#[allow(clippy::too_many_arguments)]
1350/// Run an agentic loop with tools - reusable for Ralph and swarm sub-agents
1351pub async fn run_agent_loop(
1352    provider: Arc<dyn Provider>,
1353    model: &str,
1354    system_prompt: &str,
1355    user_prompt: &str,
1356    tools: Vec<crate::provider::ToolDefinition>,
1357    registry: Arc<ToolRegistry>,
1358    max_steps: usize,
1359    timeout_secs: u64,
1360    event_tx: Option<mpsc::Sender<SwarmEvent>>,
1361    subtask_id: String,
1362) -> Result<(String, usize, usize, AgentLoopExit)> {
1363    // Let the provider handle temperature - K2 models need 0.6 when thinking is disabled
1364    let temperature = 0.7;
1365
1366    tracing::info!(
1367        model = %model,
1368        max_steps = max_steps,
1369        timeout_secs = timeout_secs,
1370        "Sub-agent starting agentic loop"
1371    );
1372    tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1373    tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1374
1375    // Initialize conversation with system and user messages
1376    let mut messages = vec![
1377        Message {
1378            role: Role::System,
1379            content: vec![ContentPart::Text {
1380                text: system_prompt.to_string(),
1381            }],
1382        },
1383        Message {
1384            role: Role::User,
1385            content: vec![ContentPart::Text {
1386                text: user_prompt.to_string(),
1387            }],
1388        },
1389    ];
1390
1391    let mut steps = 0;
1392    let mut total_tool_calls = 0;
1393    let mut final_output = String::new();
1394
1395    let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1396
1397    let exit_reason = loop {
1398        if steps >= max_steps {
1399            tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1400            break AgentLoopExit::MaxStepsReached;
1401        }
1402
1403        if Instant::now() > deadline {
1404            tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1405            break AgentLoopExit::TimedOut;
1406        }
1407
1408        steps += 1;
1409        tracing::info!(step = steps, "Sub-agent step starting");
1410
1411        // Check context size and truncate if approaching limit
1412        truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1413
1414        let request = CompletionRequest {
1415            messages: messages.clone(),
1416            tools: tools.clone(),
1417            model: model.to_string(),
1418            temperature: Some(temperature),
1419            top_p: None,
1420            max_tokens: Some(8192),
1421            stop: Vec::new(),
1422        };
1423
1424        let step_start = Instant::now();
1425        let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1426        let step_duration = step_start.elapsed();
1427
1428        tracing::info!(
1429            step = steps,
1430            duration_ms = step_duration.as_millis() as u64,
1431            finish_reason = ?response.finish_reason,
1432            prompt_tokens = response.usage.prompt_tokens,
1433            completion_tokens = response.usage.completion_tokens,
1434            "Sub-agent step completed LLM call"
1435        );
1436
1437        // Extract text from response
1438        let mut text_parts = Vec::new();
1439        let mut tool_calls = Vec::new();
1440
1441        for part in &response.message.content {
1442            match part {
1443                ContentPart::Text { text } => {
1444                    text_parts.push(text.clone());
1445                }
1446                ContentPart::ToolCall {
1447                    id,
1448                    name,
1449                    arguments,
1450                } => {
1451                    tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1452                }
1453                _ => {}
1454            }
1455        }
1456
1457        // Log assistant output
1458        if !text_parts.is_empty() {
1459            let step_output = text_parts.join("\n");
1460            if !final_output.is_empty() {
1461                final_output.push('\n');
1462            }
1463            final_output.push_str(&step_output);
1464            tracing::info!(
1465                step = steps,
1466                output_len = final_output.len(),
1467                "Sub-agent text output"
1468            );
1469            tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1470
1471            // Emit assistant message event for TUI detail view
1472            if let Some(ref tx) = event_tx {
1473                let preview = if step_output.len() > 500 {
1474                    let mut end = 500;
1475                    while end > 0 && !step_output.is_char_boundary(end) {
1476                        end -= 1;
1477                    }
1478                    format!("{}...", &step_output[..end])
1479                } else {
1480                    step_output.clone()
1481                };
1482                let _ = tx.try_send(SwarmEvent::AgentMessage {
1483                    subtask_id: subtask_id.clone(),
1484                    entry: AgentMessageEntry {
1485                        role: "assistant".to_string(),
1486                        content: preview,
1487                        is_tool_call: false,
1488                    },
1489                });
1490            }
1491        }
1492
1493        // Log tool calls
1494        if !tool_calls.is_empty() {
1495            tracing::info!(
1496                step = steps,
1497                num_tool_calls = tool_calls.len(),
1498                tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1499                "Sub-agent requesting tool calls"
1500            );
1501        }
1502
1503        // Add assistant message to history
1504        messages.push(response.message.clone());
1505
1506        // If no tool calls or stop, we're done
1507        if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1508            tracing::info!(
1509                steps = steps,
1510                total_tool_calls = total_tool_calls,
1511                "Sub-agent finished"
1512            );
1513            break AgentLoopExit::Completed;
1514        }
1515
1516        // Execute tool calls
1517        let mut tool_results = Vec::new();
1518
1519        for (call_id, tool_name, arguments) in tool_calls {
1520            total_tool_calls += 1;
1521
1522            // Emit tool call event
1523            if let Some(ref tx) = event_tx {
1524                let _ = tx.try_send(SwarmEvent::AgentToolCall {
1525                    subtask_id: subtask_id.clone(),
1526                    tool_name: tool_name.clone(),
1527                });
1528            }
1529
1530            tracing::info!(
1531                step = steps,
1532                tool_call_id = %call_id,
1533                tool = %tool_name,
1534                "Executing tool"
1535            );
1536            tracing::debug!(
1537                tool = %tool_name,
1538                arguments = %arguments,
1539                "Tool call arguments"
1540            );
1541
1542            let tool_start = Instant::now();
1543            let mut tool_success = true;
1544            let result = if let Some(tool) = registry.get(&tool_name) {
1545                // Parse arguments as JSON
1546                let args: serde_json::Value =
1547                    serde_json::from_str(&arguments).unwrap_or_else(|e| {
1548                        tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1549                        serde_json::json!({})
1550                    });
1551
1552                match tool.execute(args).await {
1553                    Ok(r) => {
1554                        if r.success {
1555                            tracing::info!(
1556                                tool = %tool_name,
1557                                duration_ms = tool_start.elapsed().as_millis() as u64,
1558                                success = true,
1559                                "Tool execution completed"
1560                            );
1561                            r.output
1562                        } else {
1563                            tool_success = false;
1564                            tracing::warn!(
1565                                tool = %tool_name,
1566                                error = %r.output,
1567                                "Tool returned error"
1568                            );
1569                            format!("Tool error: {}", r.output)
1570                        }
1571                    }
1572                    Err(e) => {
1573                        tool_success = false;
1574                        tracing::error!(
1575                            tool = %tool_name,
1576                            error = %e,
1577                            "Tool execution failed"
1578                        );
1579                        format!("Tool execution failed: {}", e)
1580                    }
1581                }
1582            } else {
1583                tool_success = false;
1584                tracing::error!(tool = %tool_name, "Unknown tool requested");
1585                format!("Unknown tool: {}", tool_name)
1586            };
1587
1588            // Emit detailed tool call event
1589            if let Some(ref tx) = event_tx {
1590                let input_preview = if arguments.len() > 200 {
1591                    let mut end = 200;
1592                    while end > 0 && !arguments.is_char_boundary(end) {
1593                        end -= 1;
1594                    }
1595                    format!("{}...", &arguments[..end])
1596                } else {
1597                    arguments.clone()
1598                };
1599                let output_preview = if result.len() > 500 {
1600                    let mut end = 500;
1601                    while end > 0 && !result.is_char_boundary(end) {
1602                        end -= 1;
1603                    }
1604                    format!("{}...", &result[..end])
1605                } else {
1606                    result.clone()
1607                };
1608                let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1609                    subtask_id: subtask_id.clone(),
1610                    detail: AgentToolCallDetail {
1611                        tool_name: tool_name.clone(),
1612                        input_preview,
1613                        output_preview,
1614                        success: tool_success,
1615                    },
1616                });
1617            }
1618
1619            tracing::debug!(
1620                tool = %tool_name,
1621                result_len = result.len(),
1622                "Tool result"
1623            );
1624
1625            // Process large results with RLM or truncate smaller ones
1626            let result = if result.len() > RLM_THRESHOLD_CHARS {
1627                // Use RLM for very large results
1628                process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1629                    .await
1630            } else {
1631                // Simple truncation for medium results
1632                truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1633            };
1634
1635            tool_results.push((call_id, tool_name, result));
1636        }
1637
1638        // Add tool results to conversation
1639        for (call_id, _tool_name, result) in tool_results {
1640            messages.push(Message {
1641                role: Role::Tool,
1642                content: vec![ContentPart::ToolResult {
1643                    tool_call_id: call_id,
1644                    content: result,
1645                }],
1646            });
1647        }
1648
1649        // Reset deadline after each successful step — agent is making progress
1650        deadline = Instant::now() + Duration::from_secs(timeout_secs);
1651    };
1652
1653    Ok((final_output, steps, total_tool_calls, exit_reason))
1654}