1use super::{
7 BranchObservation, BranchRuntimeState, CacheConfig, CacheStats, CollapseController,
8 CollapsePolicy, DecompositionStrategy, ExecutionMode, StageStats, SwarmCache, SwarmConfig,
9 SwarmResult,
10 kubernetes_executor::{
11 RemoteSubtaskPayload, SWARM_SUBTASK_PAYLOAD_ENV, encode_payload, latest_probe_from_logs,
12 probe_changed_files_set, result_from_logs,
13 },
14 orchestrator::Orchestrator,
15 result_store::ResultStore,
16 subtask::{SubTask, SubTaskResult, SubTaskStatus},
17};
18use crate::bus::{AgentBus, BusMessage};
19use crate::k8s::{K8sManager, SubagentPodSpec, SubagentPodState};
20use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
21
22pub use super::SwarmMessage;
24use crate::{
25 agent::Agent,
26 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
27 rlm::RlmExecutor,
28 swarm::{SwarmArtifact, SwarmStats},
29 telemetry::SwarmTelemetryCollector,
30 tool::ToolRegistry,
31 worktree::{WorktreeInfo, WorktreeManager},
32};
33use anyhow::Result;
34use futures::stream::{FuturesUnordered, StreamExt};
35use std::collections::{HashMap, VecDeque};
36use std::sync::Arc;
37use std::time::Instant;
38use tokio::sync::{RwLock, mpsc};
39use tokio::task::AbortHandle;
40use tokio::time::{Duration, MissedTickBehavior, timeout};
41
42const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
44
45const RESPONSE_RESERVE_TOKENS: usize = 8_192;
47
48const TRUNCATION_THRESHOLD: f64 = 0.85;
50
51fn estimate_tokens(text: &str) -> usize {
53 (text.len() as f64 / 3.5).ceil() as usize
57}
58
59fn estimate_message_tokens(message: &Message) -> usize {
61 let mut tokens = 4; for part in &message.content {
64 tokens += match part {
65 ContentPart::Text { text } => estimate_tokens(text),
66 ContentPart::ToolCall {
67 id,
68 name,
69 arguments,
70 ..
71 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
72 ContentPart::ToolResult {
73 tool_call_id,
74 content,
75 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
76 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, ContentPart::Thinking { text } => estimate_tokens(text),
78 };
79 }
80
81 tokens
82}
83
84fn estimate_total_tokens(messages: &[Message]) -> usize {
86 messages.iter().map(estimate_message_tokens).sum()
87}
88
89fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
97 let target_tokens =
98 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
99
100 let current_tokens = estimate_total_tokens(messages);
101 if current_tokens <= target_tokens {
102 return;
103 }
104
105 tracing::warn!(
106 current_tokens = current_tokens,
107 target_tokens = target_tokens,
108 context_limit = context_limit,
109 "Context approaching limit, truncating conversation history"
110 );
111
112 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
116 if after_tool_truncation <= target_tokens {
117 tracing::info!(
118 old_tokens = current_tokens,
119 new_tokens = after_tool_truncation,
120 "Truncated large tool results, context now within limits"
121 );
122 return;
123 }
124
125 if messages.len() <= 6 {
127 tracing::warn!(
128 tokens = after_tool_truncation,
129 target = target_tokens,
130 "Cannot truncate further - conversation too short"
131 );
132 return;
133 }
134
135 let keep_start = 2;
138 let keep_end = 4;
139 let removable_count = messages.len() - keep_start - keep_end;
140
141 if removable_count == 0 {
142 return;
143 }
144
145 let removed_messages: Vec<_> = messages
147 .drain(keep_start..keep_start + removable_count)
148 .collect();
149 let summary = summarize_removed_messages(&removed_messages);
150
151 messages.insert(
153 keep_start,
154 Message {
155 role: Role::User,
156 content: vec![ContentPart::Text {
157 text: format!(
158 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
159 removed_messages.len(),
160 summary
161 ),
162 }],
163 },
164 );
165
166 let new_tokens = estimate_total_tokens(messages);
167 tracing::info!(
168 removed_messages = removed_messages.len(),
169 old_tokens = current_tokens,
170 new_tokens = new_tokens,
171 "Truncated conversation history"
172 );
173}
174
175fn summarize_removed_messages(messages: &[Message]) -> String {
177 let mut summary = String::new();
178 let mut tool_calls: Vec<String> = Vec::new();
179
180 for msg in messages {
181 for part in &msg.content {
182 if let ContentPart::ToolCall { name, .. } = part {
183 if !tool_calls.contains(name) {
184 tool_calls.push(name.clone());
185 }
186 }
187 }
188 }
189
190 if !tool_calls.is_empty() {
191 summary.push_str(&format!(
192 "Tools used in truncated history: {}",
193 tool_calls.join(", ")
194 ));
195 }
196
197 summary
198}
199
200fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
202 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
204 let mut saved_tokens = 0usize;
205
206 for message in messages.iter_mut() {
207 for part in message.content.iter_mut() {
208 if let ContentPart::ToolResult { content, .. } = part {
209 let tokens = estimate_tokens(content);
210 if tokens > max_tokens_per_result {
211 let old_len = content.len();
212 *content = truncate_single_result(content, char_limit);
213 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
214 if content.len() < old_len {
215 truncated_count += 1;
216 }
217 }
218 }
219 }
220 }
221
222 if truncated_count > 0 {
223 tracing::info!(
224 truncated_count = truncated_count,
225 saved_tokens = saved_tokens,
226 max_tokens_per_result = max_tokens_per_result,
227 "Truncated large tool results"
228 );
229 }
230}
231
232fn truncate_single_result(content: &str, max_chars: usize) -> String {
234 if content.len() <= max_chars {
235 return content.to_string();
236 }
237
238 let safe_limit = {
240 let mut limit = max_chars.min(content.len());
241 while limit > 0 && !content.is_char_boundary(limit) {
242 limit -= 1;
243 }
244 limit
245 };
246
247 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
249
250 let truncated = format!(
251 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
252 &content[..break_point],
253 content.len(),
254 break_point
255 );
256
257 tracing::debug!(
258 original_len = content.len(),
259 truncated_len = truncated.len(),
260 "Truncated large result"
261 );
262
263 truncated
264}
265
266const RLM_THRESHOLD_CHARS: usize = 50_000;
268
269const SIMPLE_TRUNCATE_CHARS: usize = 6000;
271
272const COLLAPSE_SAMPLE_SECS: u64 = 5;
274const SWARM_FALLBACK_PROMPT_ENV: &str = "CODETETHER_SWARM_FALLBACK_PROMPT";
275const SWARM_FALLBACK_MODEL_ENV: &str = "CODETETHER_SWARM_FALLBACK_MODEL";
276const K8S_PASSTHROUGH_ENV_VARS: &[&str] = &[
277 "VAULT_ADDR",
278 "VAULT_TOKEN",
279 "VAULT_MOUNT",
280 "VAULT_SECRETS_PATH",
281 "VAULT_NAMESPACE",
282 "CODETETHER_AUTH_TOKEN",
283];
284
285#[derive(Debug, Clone)]
286struct ActiveK8sBranch {
287 branch: String,
288 started_at: Instant,
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
292pub enum AgentLoopExit {
293 Completed,
294 MaxStepsReached,
295 TimedOut,
296}
297
298fn compute_resource_health(pod_state: Option<&SubagentPodState>) -> (f32, u32) {
299 let Some(pod_state) = pod_state else {
300 return (0.2, 1);
301 };
302
303 let reason = pod_state
304 .reason
305 .as_deref()
306 .unwrap_or_default()
307 .to_ascii_lowercase();
308 let phase = pod_state.phase.to_ascii_lowercase();
309
310 if reason.contains("oomkilled") {
311 return (0.0, 3);
312 }
313 if reason.contains("imagepullbackoff") || reason.contains("errimagepull") {
314 return (0.0, 3);
315 }
316 if reason.contains("crashloopbackoff") {
317 return (0.1, 2);
318 }
319 if phase == "failed" {
320 return (0.1, 2);
321 }
322
323 let mut score = 1.0f32;
324 let mut unhealthy_signals = 0u32;
325
326 if !pod_state.ready {
327 score -= 0.2;
328 }
329 if !reason.is_empty() {
330 score -= 0.3;
331 unhealthy_signals += 1;
332 }
333 if pod_state.restart_count > 0 {
334 score -= (pod_state.restart_count.min(3) as f32) * 0.2;
335 unhealthy_signals += 1;
336 }
337
338 (score.clamp(0.0, 1.0), unhealthy_signals)
339}
340
341async fn process_large_result_with_rlm(
343 content: &str,
344 tool_name: &str,
345 provider: Arc<dyn Provider>,
346 model: &str,
347) -> String {
348 if content.len() <= SIMPLE_TRUNCATE_CHARS {
349 return content.to_string();
350 }
351
352 if content.len() <= RLM_THRESHOLD_CHARS {
354 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
355 }
356
357 tracing::info!(
359 tool = %tool_name,
360 content_len = content.len(),
361 "Using RLM to process large tool result"
362 );
363
364 let query = format!(
365 "Summarize the key information from this {} output. \
366 Focus on: errors, warnings, important findings, and actionable items. \
367 Be concise but thorough.",
368 tool_name
369 );
370
371 let mut executor =
372 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
373
374 match executor.analyze(&query).await {
375 Ok(result) => {
376 tracing::info!(
377 tool = %tool_name,
378 original_len = content.len(),
379 summary_len = result.answer.len(),
380 iterations = result.iterations,
381 "RLM summarized large result"
382 );
383
384 format!(
385 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
386 tool_name,
387 content.len(),
388 result.answer.len(),
389 result.answer
390 )
391 }
392 Err(e) => {
393 tracing::warn!(
394 tool = %tool_name,
395 error = %e,
396 "RLM analysis failed, falling back to truncation"
397 );
398 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
399 }
400 }
401}
402
403pub struct SwarmExecutor {
405 config: SwarmConfig,
406 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
408 event_tx: Option<mpsc::Sender<SwarmEvent>>,
410 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
412 cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
414 result_store: Arc<ResultStore>,
416 bus: Option<Arc<AgentBus>>,
418}
419
420impl SwarmExecutor {
421 pub fn new(config: SwarmConfig) -> Self {
423 Self {
424 config,
425 coordinator_agent: None,
426 event_tx: None,
427 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
428 cache: None,
429 result_store: ResultStore::new_arc(),
430 bus: None,
431 }
432 }
433
434 pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
436 let cache = SwarmCache::new(cache_config).await?;
437 Ok(Self {
438 config,
439 coordinator_agent: None,
440 event_tx: None,
441 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
442 cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
443 result_store: ResultStore::new_arc(),
444 bus: None,
445 })
446 }
447
448 pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
450 self.cache = Some(cache);
451 self
452 }
453
454 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
456 self.bus = Some(bus);
457 self
458 }
459
460 pub fn bus(&self) -> Option<&Arc<AgentBus>> {
462 self.bus.as_ref()
463 }
464
465 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
467 self.event_tx = Some(tx);
468 self
469 }
470
471 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
473 tracing::debug!("Setting coordinator agent for swarm execution");
474 self.coordinator_agent = Some(agent);
475 self
476 }
477
478 pub fn with_telemetry(
480 mut self,
481 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
482 ) -> Self {
483 self.telemetry = telemetry;
484 self
485 }
486
487 pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
489 Arc::clone(&self.telemetry)
490 }
491 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
493 tracing::debug!(
494 has_coordinator = self.coordinator_agent.is_some(),
495 "Getting coordinator agent"
496 );
497 self.coordinator_agent.as_ref()
498 }
499
500 pub fn result_store(&self) -> &Arc<ResultStore> {
502 &self.result_store
503 }
504
505 pub async fn cache_stats(&self) -> Option<CacheStats> {
507 if let Some(ref cache) = self.cache {
508 let cache_guard = cache.lock().await;
509 Some(cache_guard.stats().clone())
510 } else {
511 None
512 }
513 }
514
515 pub async fn clear_cache(&self) -> Result<()> {
517 if let Some(ref cache) = self.cache {
518 let mut cache_guard = cache.lock().await;
519 cache_guard.clear().await?;
520 }
521 Ok(())
522 }
523
524 fn try_send_event(&self, event: SwarmEvent) {
526 if let Some(ref bus) = self.bus {
528 let handle = bus.handle("swarm-executor");
529 match &event {
530 SwarmEvent::Started { task, .. } => {
531 handle.send(
532 "broadcast",
533 BusMessage::AgentReady {
534 agent_id: "swarm-executor".to_string(),
535 capabilities: vec![format!("executing:{task}")],
536 },
537 );
538 }
539 SwarmEvent::Complete { success, .. } => {
540 let state = if *success {
541 crate::a2a::types::TaskState::Completed
542 } else {
543 crate::a2a::types::TaskState::Failed
544 };
545 handle.send_task_update("swarm", state, None);
546 }
547 _ => {} }
549 }
550
551 if let Some(ref tx) = self.event_tx {
552 let _ = tx.try_send(event);
553 }
554 }
555
556 pub async fn execute(
558 &self,
559 task: &str,
560 strategy: DecompositionStrategy,
561 ) -> Result<SwarmResult> {
562 let start_time = Instant::now();
563
564 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
566
567 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
568
569 let subtasks = orchestrator.decompose(task, strategy).await?;
571
572 if subtasks.is_empty() {
573 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
574 return Ok(SwarmResult {
575 success: false,
576 result: String::new(),
577 subtask_results: Vec::new(),
578 stats: SwarmStats::default(),
579 artifacts: Vec::new(),
580 error: Some("No subtasks generated".to_string()),
581 });
582 }
583
584 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
585
586 self.try_send_event(SwarmEvent::Started {
587 task: task.to_string(),
588 total_subtasks: subtasks.len(),
589 });
590
591 self.try_send_event(SwarmEvent::Decomposed {
593 subtasks: subtasks
594 .iter()
595 .map(|s| SubTaskInfo {
596 id: s.id.clone(),
597 name: s.name.clone(),
598 status: SubTaskStatus::Pending,
599 stage: s.stage,
600 dependencies: s.dependencies.clone(),
601 agent_name: s.specialty.clone(),
602 current_tool: None,
603 steps: 0,
604 max_steps: self.config.max_steps_per_subagent,
605 tool_call_history: Vec::new(),
606 messages: Vec::new(),
607 output: None,
608 error: None,
609 })
610 .collect(),
611 });
612
613 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
615 let mut all_results: Vec<SubTaskResult> = Vec::new();
616 let artifacts: Vec<SwarmArtifact> = Vec::new();
617
618 let swarm_id = uuid::Uuid::new_v4().to_string();
620 let strategy_str = format!("{:?}", strategy);
621 self.telemetry.lock().await.start_swarm(
622 &swarm_id,
623 subtasks.len(),
624 &strategy_str,
625 );
626
627 let completed_results: Arc<RwLock<HashMap<String, String>>> =
629 Arc::new(RwLock::new(HashMap::new()));
630
631 for stage in 0..=max_stage {
632 let stage_start = Instant::now();
633
634 let stage_subtasks: Vec<SubTask> = orchestrator
635 .subtasks_for_stage(stage)
636 .into_iter()
637 .cloned()
638 .collect();
639
640 tracing::debug!(
641 "Stage {} has {} subtasks (max_stage={})",
642 stage,
643 stage_subtasks.len(),
644 max_stage
645 );
646
647 if stage_subtasks.is_empty() {
648 continue;
649 }
650
651 tracing::info!(
652 provider_name = %orchestrator.provider(),
653 "Executing stage {} with {} subtasks",
654 stage,
655 stage_subtasks.len()
656 );
657
658 let stage_results = self
660 .execute_stage(
661 &orchestrator,
662 stage_subtasks,
663 completed_results.clone(),
664 &swarm_id,
665 )
666 .await?;
667
668 {
670 let mut completed = completed_results.write().await;
671 for result in &stage_results {
672 completed.insert(result.subtask_id.clone(), result.result.clone());
673 let tags = vec![
675 format!("stage:{stage}"),
676 format!("subtask:{}", result.subtask_id),
677 ];
678 let _ = self
679 .result_store
680 .publish(
681 &result.subtask_id,
682 &result.subagent_id,
683 &result.result,
684 tags,
685 None,
686 )
687 .await;
688 }
689 }
690
691 let stage_time = stage_start.elapsed().as_millis() as u64;
693 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
694 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
695
696 orchestrator.stats_mut().stages.push(StageStats {
697 stage,
698 subagent_count: stage_results.len(),
699 max_steps,
700 total_steps,
701 execution_time_ms: stage_time,
702 });
703
704 for result in &stage_results {
706 orchestrator.complete_subtask(&result.subtask_id, result.clone());
707 }
708
709 let stage_completed = stage_results.iter().filter(|r| r.success).count();
711 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
712 self.try_send_event(SwarmEvent::StageComplete {
713 stage,
714 completed: stage_completed,
715 failed: stage_failed,
716 });
717
718 all_results.extend(stage_results);
719 }
720
721 let provider_name = orchestrator.provider().to_string();
723
724 self.telemetry
726 .lock()
727 .await
728 .record_swarm_latency("total_execution", start_time.elapsed());
729
730 let stats = orchestrator.stats_mut();
732 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
733 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
734 stats.calculate_critical_path();
735 stats.calculate_speedup();
736
737 let success = all_results.iter().all(|r| r.success);
739
740 let telemetry_metrics = self.telemetry.lock().await.complete_swarm(success).await;
742 let result = self.aggregate_results(&all_results).await?;
743
744 tracing::info!(
745 provider_name = %provider_name,
746 "Swarm execution complete: {} subtasks, {:.1}x speedup",
747 all_results.len(),
748 stats.speedup_factor
749 );
750
751 let final_stats = orchestrator.stats().clone();
752 self.try_send_event(SwarmEvent::Complete {
753 success,
754 stats: final_stats.clone(),
755 });
756
757 Ok(SwarmResult {
758 success,
759 result,
760 subtask_results: all_results,
761 stats: final_stats,
762 artifacts,
763 error: None,
764 })
765 }
766
767 async fn execute_stage(
769 &self,
770 orchestrator: &Orchestrator,
771 subtasks: Vec<SubTask>,
772 completed_results: Arc<RwLock<HashMap<String, String>>>,
773 swarm_id: &str,
774 ) -> Result<Vec<SubTaskResult>> {
775 if self.config.execution_mode == ExecutionMode::KubernetesPod {
776 return self
777 .execute_stage_kubernetes(orchestrator, subtasks, completed_results, swarm_id)
778 .await;
779 }
780
781 let mut handles: FuturesUnordered<
782 tokio::task::JoinHandle<(String, Result<SubTaskResult, anyhow::Error>)>,
783 > = FuturesUnordered::new();
784 let mut abort_handles: HashMap<String, AbortHandle> = HashMap::new();
785 let mut task_ids: HashMap<tokio::task::Id, String> = HashMap::new();
786 let mut active_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
787 let mut all_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
788 let mut cached_results: Vec<SubTaskResult> = Vec::new();
789 let mut completed_entries: Vec<(SubTaskResult, Option<WorktreeInfo>)> = Vec::new();
790 let mut kill_reasons: HashMap<String, String> = HashMap::new();
791 let mut promoted_subtask_id: Option<String> = None;
792
793 let semaphore = Arc::new(tokio::sync::Semaphore::new(
795 self.config.max_concurrent_requests,
796 ));
797 let delay_ms = self.config.request_delay_ms;
798
799 let model = orchestrator.model().to_string();
801 let provider_name = orchestrator.provider().to_string();
802 let providers = orchestrator.providers();
803 let provider = providers
804 .get(&provider_name)
805 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
806
807 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
808
809 let base_tool_registry =
811 ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
812 let mut tool_definitions: Vec<_> = base_tool_registry
815 .definitions()
816 .into_iter()
817 .filter(|t| t.name != "question")
818 .collect();
819
820 let swarm_share_def = crate::provider::ToolDefinition {
822 name: "swarm_share".to_string(),
823 description: "Share results with other sub-agents in the swarm. Actions: publish \
824 (share a result), get (retrieve a result by key), query_tags (find \
825 results by tags), query_prefix (find results by key prefix), list \
826 (show all shared results)."
827 .to_string(),
828 parameters: serde_json::json!({
829 "type": "object",
830 "properties": {
831 "action": {
832 "type": "string",
833 "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
834 "description": "Action to perform"
835 },
836 "key": {
837 "type": "string",
838 "description": "Result key (for publish/get)"
839 },
840 "value": {
841 "description": "Result value to publish (any JSON value)"
842 },
843 "tags": {
844 "type": "array",
845 "items": {"type": "string"},
846 "description": "Tags for publish or query_tags"
847 },
848 "prefix": {
849 "type": "string",
850 "description": "Key prefix for query_prefix"
851 }
852 },
853 "required": ["action"]
854 }),
855 };
856 tool_definitions.push(swarm_share_def);
857
858 let result_store = Arc::clone(&self.result_store);
860
861 let worktree_manager = if self.config.worktree_enabled {
863 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
864 std::env::current_dir()
865 .map(|p| p.to_string_lossy().to_string())
866 .unwrap_or_else(|_| ".".to_string())
867 });
868
869 let mgr = WorktreeManager::new(&working_dir);
870 tracing::info!(
871 working_dir = %working_dir,
872 "Worktree isolation enabled for parallel sub-agents"
873 );
874 Some(Arc::new(mgr) as Arc<WorktreeManager>)
875 } else {
876 None
877 };
878
879 for (idx, subtask) in subtasks.into_iter().enumerate() {
880 let model = model.clone();
881 let _provider_name = provider_name.clone();
882 let provider = Arc::clone(&provider);
883
884 if let Some(ref cache) = self.cache {
886 let mut cache_guard = cache.lock().await;
887 if let Some(cached_result) = cache_guard.get(&subtask).await {
888 tracing::info!(
889 subtask_id = %subtask.id,
890 task_name = %subtask.name,
891 "Cache hit for subtask, skipping execution"
892 );
893 self.try_send_event(SwarmEvent::SubTaskUpdate {
894 id: subtask.id.clone(),
895 name: subtask.name.clone(),
896 status: SubTaskStatus::Completed,
897 agent_name: Some("cached".to_string()),
898 });
899 cached_results.push(cached_result);
900 continue;
901 }
902 }
903
904 let context = {
906 let completed = completed_results.read().await;
907 let mut dep_context = String::new();
908 for dep_id in &subtask.dependencies {
909 if let Some(result) = completed.get(dep_id) {
910 dep_context.push_str(&format!(
911 "\n--- Result from dependency {} ---\n{}\n",
912 dep_id, result
913 ));
914 }
915 }
916 dep_context
917 };
918
919 let instruction = subtask.instruction.clone();
920 let subtask_name = subtask.name.clone();
921 let specialty = subtask.specialty.clone().unwrap_or_default();
922 let subtask_id = subtask.id.clone();
923 let subtask_id_for_handle = subtask_id.clone();
924 let max_steps = self.config.max_steps_per_subagent;
925 let timeout_secs = self.config.subagent_timeout_secs;
926
927 let worktree_info = if let Some(ref mgr) = worktree_manager {
930 let task_slug = subtask_id.replace("-", "_");
931 match mgr.create(&task_slug).await {
932 Ok(wt) => {
933 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
934 tracing::warn!(
935 subtask_id = %subtask_id,
936 error = %e,
937 "Failed to inject workspace stub into worktree"
938 );
939 }
940 tracing::info!(
941 subtask_id = %subtask_id,
942 worktree_path = %wt.path.display(),
943 worktree_branch = %wt.branch,
944 "Created worktree for sub-agent"
945 );
946 active_worktrees.insert(subtask_id.clone(), wt.clone());
947 all_worktrees.insert(subtask_id.clone(), wt.clone());
948 Some(wt)
949 }
950 Err(e) => {
951 tracing::warn!(
952 subtask_id = %subtask_id,
953 error = %e,
954 "Failed to create worktree, using shared directory"
955 );
956 None
957 }
958 }
959 } else {
960 None
961 };
962
963 let working_dir = worktree_info
964 .as_ref()
965 .map(|wt| wt.path.display().to_string())
966 .unwrap_or_else(|| ".".to_string());
967 let working_dir_path = worktree_info.as_ref().map(|wt| wt.path.clone());
968
969 let tools = tool_definitions.clone();
971 let _base_registry = Arc::clone(&base_tool_registry);
972 let agent_result_store = Arc::clone(&result_store);
973 let sem = Arc::clone(&semaphore);
974 let stagger_delay = delay_ms * idx as u64; let event_tx = self.event_tx.clone();
976
977 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
979
980 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
982
983 let handle = tokio::spawn(async move {
985 if stagger_delay > 0 {
987 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
988 }
989 let _permit = match sem.acquire().await {
990 Ok(permit) => permit,
991 Err(_) => {
992 return (
993 subtask_id.clone(),
994 Err(anyhow::anyhow!("Swarm execution cancelled")),
995 );
996 }
997 };
998
999 let _agent_start = Instant::now();
1000
1001 let start = Instant::now();
1002
1003 let working_path = std::path::Path::new(&working_dir);
1005 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
1006 .map(|(content, _)| {
1007 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
1008 })
1009 .unwrap_or_default();
1010
1011 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
1013 let system_prompt = format!(
1014 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
1015
1016WORKING DIRECTORY: {}
1017All file operations should be relative to this directory.
1018
1019IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
1020
1021Available tools:
1022- read: Read file contents
1023- write: Write/create files
1024- edit: Edit existing files (search and replace)
1025- multiedit: Make multiple edits at once
1026- glob: Find files by pattern
1027- grep: Search file contents
1028- bash: Run shell commands (use cwd: \"{}\" parameter)
1029- webfetch: Fetch web pages
1030- prd: Generate structured PRD for complex tasks
1031- ralph: Run autonomous agent loop on a PRD
1032- swarm_share: Share results with other sub-agents running in parallel
1033
1034SHARING RESULTS:
1035Use swarm_share to collaborate with other sub-agents:
1036- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
1037- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
1038- swarm_share({{action: 'list'}}) to see all shared results
1039- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
1040
1041COMPLEX TASKS:
1042If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
10431. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
10442. Break down into user stories with acceptance criteria
10453. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
10464. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
1047
1048NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
1049
1050When done, provide a brief summary of what you accomplished.{agents_md_content}",
1051 specialty,
1052 subtask_id,
1053 working_dir,
1054 working_dir,
1055 prd_filename,
1056 prd_filename,
1057 prd_filename
1058 );
1059
1060 let user_prompt = if context.is_empty() {
1061 format!("Complete this task:\n\n{}", instruction)
1062 } else {
1063 format!(
1064 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
1065 instruction, context
1066 )
1067 };
1068
1069 if let Some(ref tx) = event_tx {
1071 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1072 id: subtask_id.clone(),
1073 name: subtask_name.clone(),
1074 status: SubTaskStatus::Running,
1075 agent_name: Some(format!("agent-{}", subtask_id)),
1076 });
1077 let _ = tx.try_send(SwarmEvent::AgentStarted {
1078 subtask_id: subtask_id.clone(),
1079 agent_name: format!("agent-{}", subtask_id),
1080 specialty: specialty.clone(),
1081 });
1082 }
1083
1084 let mut agent_registry =
1087 ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1088 agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1089 Arc::clone(&agent_result_store),
1090 subtask_id.clone(),
1091 )));
1092 let registry = Arc::new(agent_registry);
1093
1094 let result = run_agent_loop(
1095 provider,
1096 &model,
1097 &system_prompt,
1098 &user_prompt,
1099 tools,
1100 registry,
1101 max_steps,
1102 timeout_secs,
1103 event_tx.clone(),
1104 subtask_id.clone(),
1105 None,
1106 working_dir_path.clone(),
1107 )
1108 .await;
1109
1110 let task_result = match result {
1111 Ok((output, steps, tool_calls, exit_reason)) => {
1112 let (success, status, error) = match exit_reason {
1113 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1114 AgentLoopExit::MaxStepsReached => (
1115 false,
1116 SubTaskStatus::Failed,
1117 Some(format!("Sub-agent hit max steps ({max_steps})")),
1118 ),
1119 AgentLoopExit::TimedOut => (
1120 false,
1121 SubTaskStatus::TimedOut,
1122 Some(format!("Sub-agent timed out after {timeout_secs}s")),
1123 ),
1124 };
1125
1126 if let Some(ref tx) = event_tx {
1128 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1129 id: subtask_id.clone(),
1130 name: subtask_name.clone(),
1131 status,
1132 agent_name: Some(format!("agent-{}", subtask_id)),
1133 });
1134 if let Some(ref message) = error {
1135 let _ = tx.try_send(SwarmEvent::AgentError {
1136 subtask_id: subtask_id.clone(),
1137 error: message.clone(),
1138 });
1139 }
1140 let _ = tx.try_send(SwarmEvent::AgentOutput {
1141 subtask_id: subtask_id.clone(),
1142 output: output.clone(),
1143 });
1144 let _ = tx.try_send(SwarmEvent::AgentComplete {
1145 subtask_id: subtask_id.clone(),
1146 success,
1147 steps,
1148 });
1149 }
1150 Ok(SubTaskResult {
1151 subtask_id: subtask_id.clone(),
1152 subagent_id: format!("agent-{}", subtask_id),
1153 success,
1154 result: output,
1155 steps,
1156 tool_calls,
1157 execution_time_ms: start.elapsed().as_millis() as u64,
1158 error,
1159 artifacts: Vec::new(),
1160 })
1161 }
1162 Err(e) => {
1163 if let Some(ref tx) = event_tx {
1165 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1166 id: subtask_id.clone(),
1167 name: subtask_name.clone(),
1168 status: SubTaskStatus::Failed,
1169 agent_name: Some(format!("agent-{}", subtask_id)),
1170 });
1171 let _ = tx.try_send(SwarmEvent::AgentError {
1172 subtask_id: subtask_id.clone(),
1173 error: e.to_string(),
1174 });
1175 let _ = tx.try_send(SwarmEvent::AgentComplete {
1176 subtask_id: subtask_id.clone(),
1177 success: false,
1178 steps: 0,
1179 });
1180 }
1181 Ok(SubTaskResult {
1182 subtask_id: subtask_id.clone(),
1183 subagent_id: format!("agent-{}", subtask_id),
1184 success: false,
1185 result: String::new(),
1186 steps: 0,
1187 tool_calls: 0,
1188 execution_time_ms: start.elapsed().as_millis() as u64,
1189 error: Some(e.to_string()),
1190 artifacts: Vec::new(),
1191 })
1192 }
1193 };
1194
1195 (subtask_id.clone(), task_result)
1196 });
1197
1198 let abort_handle = handle.abort_handle();
1199 abort_handles.insert(subtask_id_for_handle.clone(), abort_handle);
1200 task_ids.insert(handle.id(), subtask_id_for_handle.clone());
1201 handles.push(handle);
1202 }
1203
1204 let mut collapse_controller = if worktree_manager.is_some() && active_worktrees.len() > 1 {
1206 Some(CollapseController::new(CollapsePolicy::default()))
1207 } else {
1208 None
1209 };
1210 let mut collapse_tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1211 collapse_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1212 let _ = collapse_tick.tick().await;
1214
1215 while !handles.is_empty() {
1216 tokio::select! {
1217 maybe_join = handles.next() => {
1218 let Some(joined) = maybe_join else {
1219 continue;
1220 };
1221 match joined {
1222 Ok((subtask_id, Ok(result))) => {
1223 abort_handles.remove(&subtask_id);
1224 let wt = active_worktrees.remove(&subtask_id).or_else(|| all_worktrees.get(&subtask_id).cloned());
1225 completed_entries.push((result, wt));
1226 }
1227 Ok((subtask_id, Err(e))) => {
1228 abort_handles.remove(&subtask_id);
1229 active_worktrees.remove(&subtask_id);
1230 let wt = all_worktrees.get(&subtask_id).cloned();
1231 completed_entries.push((
1232 SubTaskResult {
1233 subtask_id: subtask_id.clone(),
1234 subagent_id: format!("agent-{subtask_id}"),
1235 success: false,
1236 result: String::new(),
1237 steps: 0,
1238 tool_calls: 0,
1239 execution_time_ms: 0,
1240 error: Some(e.to_string()),
1241 artifacts: Vec::new(),
1242 },
1243 wt,
1244 ));
1245 }
1246 Err(e) => {
1247 let subtask_id = task_ids
1248 .remove(&e.id())
1249 .unwrap_or_else(|| "unknown".to_string());
1250 abort_handles.remove(&subtask_id);
1251 active_worktrees.remove(&subtask_id);
1252 let wt = all_worktrees.get(&subtask_id).cloned();
1253 completed_entries.push((
1254 SubTaskResult {
1255 subtask_id: subtask_id.clone(),
1256 subagent_id: format!("agent-{subtask_id}"),
1257 success: false,
1258 result: String::new(),
1259 steps: 0,
1260 tool_calls: 0,
1261 execution_time_ms: 0,
1262 error: Some(format!("Task join error: {e}")),
1263 artifacts: Vec::new(),
1264 },
1265 wt,
1266 ));
1267 }
1268 }
1269 }
1270 _ = collapse_tick.tick(), if collapse_controller.is_some() && !active_worktrees.is_empty() => {
1271 let branches: Vec<BranchRuntimeState> = active_worktrees
1272 .iter()
1273 .map(|(subtask_id, wt)| BranchRuntimeState {
1274 subtask_id: subtask_id.clone(),
1275 branch: wt.branch.clone(),
1276 worktree_path: wt.path.clone(),
1277 })
1278 .collect();
1279
1280 if let Some(controller) = collapse_controller.as_mut() {
1281 match controller.sample(&branches) {
1282 Ok(tick) => {
1283 if promoted_subtask_id != tick.promoted_subtask_id {
1284 promoted_subtask_id = tick.promoted_subtask_id.clone();
1285 if let Some(ref promoted) = promoted_subtask_id {
1286 tracing::info!(
1287 subtask_id = %promoted,
1288 "Collapse controller promoted branch"
1289 );
1290 if let Some(audit) = crate::audit::try_audit_log() {
1291 audit.log_with_correlation(
1292 crate::audit::AuditCategory::Swarm,
1293 "collapse_promote_branch",
1294 crate::audit::AuditOutcome::Success,
1295 Some("collapse-controller".to_string()),
1296 Some(serde_json::json!({
1297 "swarm_id": swarm_id,
1298 "subtask_id": promoted,
1299 })),
1300 None,
1301 None,
1302 Some(swarm_id.to_string()),
1303 None,
1304 ).await;
1305 }
1306 }
1307 }
1308
1309 for kill in tick.kills {
1310 if kill_reasons.contains_key(&kill.subtask_id) {
1311 continue;
1312 }
1313 let Some(abort_handle) = abort_handles.get(&kill.subtask_id) else {
1314 continue;
1315 };
1316
1317 abort_handle.abort();
1318 abort_handles.remove(&kill.subtask_id);
1319 active_worktrees.remove(&kill.subtask_id);
1320 kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1321
1322 tracing::warn!(
1323 subtask_id = %kill.subtask_id,
1324 branch = %kill.branch,
1325 reason = %kill.reason,
1326 "Collapse controller killed branch"
1327 );
1328
1329 if let Some(ref tx) = self.event_tx {
1330 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1331 id: kill.subtask_id.clone(),
1332 name: kill.subtask_id.clone(),
1333 status: SubTaskStatus::Cancelled,
1334 agent_name: Some(format!("agent-{}", kill.subtask_id)),
1335 });
1336 let _ = tx.try_send(SwarmEvent::AgentError {
1337 subtask_id: kill.subtask_id.clone(),
1338 error: format!("Cancelled by collapse controller: {}", kill.reason),
1339 });
1340 }
1341
1342 if let Some(audit) = crate::audit::try_audit_log() {
1343 audit.log_with_correlation(
1344 crate::audit::AuditCategory::Swarm,
1345 "collapse_kill_branch",
1346 crate::audit::AuditOutcome::Success,
1347 Some("collapse-controller".to_string()),
1348 Some(serde_json::json!({
1349 "swarm_id": swarm_id,
1350 "subtask_id": kill.subtask_id,
1351 "branch": kill.branch,
1352 "reason": kill.reason,
1353 })),
1354 None,
1355 None,
1356 Some(swarm_id.to_string()),
1357 None,
1358 ).await;
1359 }
1360 }
1361 }
1362 Err(e) => {
1363 tracing::warn!(error = %e, "Collapse controller sampling failed");
1364 }
1365 }
1366 }
1367 }
1368 }
1369 }
1370
1371 if let Some(ref promoted) = promoted_subtask_id {
1373 completed_entries.sort_by_key(|(result, _)| {
1374 if &result.subtask_id == promoted {
1375 0usize
1376 } else {
1377 1usize
1378 }
1379 });
1380 }
1381
1382 let mut results = cached_results;
1383 let auto_merge = self.config.worktree_auto_merge;
1384
1385 for (mut result, worktree_info) in completed_entries {
1386 if let Some(wt) = worktree_info {
1388 if let Some(reason) = kill_reasons.get(&result.subtask_id) {
1389 result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1390 result.result.push_str(&format!(
1391 "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1392 ));
1393 if let Some(ref mgr) = worktree_manager {
1394 if let Err(e) = mgr.cleanup(&wt.name).await {
1395 tracing::warn!(error = %e, "Failed to cleanup killed worktree");
1396 }
1397 }
1398 } else if result.success && auto_merge {
1399 if let Some(ref mgr) = worktree_manager {
1400 match mgr.merge(&wt.name).await {
1401 Ok(merge_result) => {
1402 if merge_result.success {
1403 tracing::info!(
1404 subtask_id = %result.subtask_id,
1405 files_changed = merge_result.files_changed,
1406 "Merged worktree changes successfully"
1407 );
1408 result.result.push_str(&format!(
1409 "\n\n--- Merge Result ---\n{}",
1410 merge_result.summary
1411 ));
1412 } else if merge_result.aborted {
1413 tracing::warn!(
1414 subtask_id = %result.subtask_id,
1415 summary = %merge_result.summary,
1416 "Merge was aborted"
1417 );
1418 result.result.push_str(&format!(
1419 "\n\n--- Merge Aborted ---\n{}",
1420 merge_result.summary
1421 ));
1422 } else {
1423 tracing::warn!(
1424 subtask_id = %result.subtask_id,
1425 conflicts = ?merge_result.conflicts,
1426 "Merge had conflicts"
1427 );
1428 result.result.push_str(&format!(
1429 "\n\n--- Merge Conflicts ---\n{}",
1430 merge_result.summary
1431 ));
1432 }
1433 if let Err(e) = mgr.cleanup(&wt.name).await {
1434 tracing::warn!(error = %e, "Failed to cleanup worktree");
1435 }
1436 }
1437 Err(e) => {
1438 tracing::error!(
1439 subtask_id = %result.subtask_id,
1440 error = %e,
1441 "Failed to merge worktree"
1442 );
1443 }
1444 }
1445 }
1446 } else if !result.success {
1447 tracing::info!(
1448 subtask_id = %result.subtask_id,
1449 worktree_path = %wt.path.display(),
1450 "Keeping worktree for debugging (task failed)"
1451 );
1452 }
1453 }
1454
1455 if result.success {
1457 if let Some(ref cache_arc) = self.cache {
1458 let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1459 cache_arc.lock().await;
1460 let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1461 if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1462 tracing::warn!(
1463 subtask_id = %result.subtask_id,
1464 error = %e,
1465 "Failed to cache subtask result"
1466 );
1467 }
1468 }
1469 }
1470
1471 results.push(result);
1472 }
1473
1474 Ok(results)
1475 }
1476
1477 async fn execute_stage_kubernetes(
1478 &self,
1479 orchestrator: &Orchestrator,
1480 subtasks: Vec<SubTask>,
1481 completed_results: Arc<RwLock<HashMap<String, String>>>,
1482 swarm_id: &str,
1483 ) -> Result<Vec<SubTaskResult>> {
1484 let k8s = K8sManager::new().await;
1485 if !k8s.is_available() {
1486 anyhow::bail!(
1487 "Kubernetes execution mode requested but K8s client is unavailable in this environment"
1488 );
1489 }
1490
1491 let provider_name = orchestrator.provider().to_string();
1492 let model = orchestrator.model().to_string();
1493 let pod_budget = self.config.k8s_pod_budget.max(1);
1494 let mut pending: VecDeque<SubTask> = subtasks.into_iter().collect();
1495 let mut active: HashMap<String, ActiveK8sBranch> = HashMap::new();
1496 let mut subtask_names: HashMap<String, String> = HashMap::new();
1497 let mut results: Vec<SubTaskResult> = Vec::new();
1498 let mut kill_reasons: HashMap<String, String> = HashMap::new();
1499 let mut promoted_subtask_id: Option<String> = None;
1500 let mut collapse_controller = CollapseController::new(CollapsePolicy::default());
1501
1502 let mut tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1503 tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1504 let _ = tick.tick().await;
1505
1506 loop {
1507 while active.len() < pod_budget {
1508 let Some(subtask) = pending.pop_front() else {
1509 break;
1510 };
1511
1512 if let Some(ref cache) = self.cache {
1513 let mut cache_guard = cache.lock().await;
1514 if let Some(cached_result) = cache_guard.get(&subtask).await {
1515 tracing::info!(
1516 subtask_id = %subtask.id,
1517 task_name = %subtask.name,
1518 "Cache hit for subtask, skipping Kubernetes execution"
1519 );
1520 self.try_send_event(SwarmEvent::SubTaskUpdate {
1521 id: subtask.id.clone(),
1522 name: subtask.name.clone(),
1523 status: SubTaskStatus::Completed,
1524 agent_name: Some("cached".to_string()),
1525 });
1526 results.push(cached_result);
1527 continue;
1528 }
1529 }
1530
1531 let context = {
1532 let completed = completed_results.read().await;
1533 let mut dep_context = String::new();
1534 for dep_id in &subtask.dependencies {
1535 if let Some(result) = completed.get(dep_id) {
1536 dep_context.push_str(&format!(
1537 "\n--- Result from dependency {} ---\n{}\n",
1538 dep_id, result
1539 ));
1540 }
1541 }
1542 dep_context
1543 };
1544
1545 let payload = RemoteSubtaskPayload {
1546 swarm_id: swarm_id.to_string(),
1547 subtask_id: subtask.id.clone(),
1548 subtask_name: subtask.name.clone(),
1549 specialty: subtask.specialty.clone().unwrap_or_default(),
1550 instruction: subtask.instruction.clone(),
1551 context: context.clone(),
1552 provider: provider_name.clone(),
1553 model: model.clone(),
1554 max_steps: self.config.max_steps_per_subagent,
1555 timeout_secs: self.config.subagent_timeout_secs,
1556 working_dir: self.config.working_dir.clone(),
1557 probe_interval_secs: COLLAPSE_SAMPLE_SECS,
1558 };
1559 let payload_b64 = match encode_payload(&payload) {
1560 Ok(payload) => payload,
1561 Err(error) => {
1562 let error_text = format!("Failed to encode remote payload: {error}");
1563 tracing::error!(subtask_id = %subtask.id, error = %error, "K8s payload encoding failed");
1564 self.try_send_event(SwarmEvent::SubTaskUpdate {
1565 id: subtask.id.clone(),
1566 name: subtask.name.clone(),
1567 status: SubTaskStatus::Failed,
1568 agent_name: Some("k8s-encoder".to_string()),
1569 });
1570 self.try_send_event(SwarmEvent::AgentError {
1571 subtask_id: subtask.id.clone(),
1572 error: error_text.clone(),
1573 });
1574 results.push(SubTaskResult {
1575 subtask_id: subtask.id.clone(),
1576 subagent_id: format!("agent-{}", subtask.id),
1577 success: false,
1578 result: String::new(),
1579 steps: 0,
1580 tool_calls: 0,
1581 execution_time_ms: 0,
1582 error: Some(error_text),
1583 artifacts: Vec::new(),
1584 });
1585 continue;
1586 }
1587 };
1588
1589 let mut env_vars = HashMap::new();
1590 env_vars.insert(SWARM_SUBTASK_PAYLOAD_ENV.to_string(), payload_b64);
1591 for key in K8S_PASSTHROUGH_ENV_VARS {
1592 if let Ok(value) = std::env::var(key)
1593 && !value.trim().is_empty()
1594 {
1595 env_vars.insert((*key).to_string(), value);
1596 }
1597 }
1598 let fallback_prompt = if context.trim().is_empty() {
1599 format!(
1600 "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1601Return only the final subtask answer.",
1602 subtask.id, subtask.instruction
1603 )
1604 } else {
1605 format!(
1606 "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1607Dependency context:\n{}\n\nReturn only the final subtask answer.",
1608 subtask.id, subtask.instruction, context
1609 )
1610 };
1611 env_vars.insert(SWARM_FALLBACK_PROMPT_ENV.to_string(), fallback_prompt);
1612 env_vars.insert(SWARM_FALLBACK_MODEL_ENV.to_string(), model.clone());
1613
1614 let mut labels = HashMap::new();
1615 labels.insert("codetether.run/swarm-id".to_string(), swarm_id.to_string());
1616 labels.insert(
1617 "codetether.run/stage".to_string(),
1618 subtask.stage.to_string(),
1619 );
1620
1621 let spec = SubagentPodSpec {
1622 image: self.config.k8s_subagent_image.clone(),
1623 env_vars,
1624 labels,
1625 command: Some(vec!["sh".to_string(), "-lc".to_string()]),
1626 args: Some(vec![
1627 format!(
1628 "if codetether help swarm-subagent >/dev/null 2>&1; then \
1629exec codetether swarm-subagent --payload-env {payload_env}; \
1630else \
1631exec codetether run \"$${fallback_prompt_env}\" --model \"$${fallback_model_env}\"; \
1632fi",
1633 payload_env = SWARM_SUBTASK_PAYLOAD_ENV,
1634 fallback_prompt_env = SWARM_FALLBACK_PROMPT_ENV,
1635 fallback_model_env = SWARM_FALLBACK_MODEL_ENV,
1636 )
1637 .replace("$$", "$"),
1638 ]),
1639 };
1640
1641 if let Err(error) = k8s.spawn_subagent_pod_with_spec(&subtask.id, spec).await {
1642 let error_text = format!("Failed to spawn Kubernetes pod: {error}");
1643 tracing::error!(subtask_id = %subtask.id, error = %error, "K8s sub-agent pod spawn failed");
1644 self.try_send_event(SwarmEvent::SubTaskUpdate {
1645 id: subtask.id.clone(),
1646 name: subtask.name.clone(),
1647 status: SubTaskStatus::Failed,
1648 agent_name: Some("k8s-spawn".to_string()),
1649 });
1650 self.try_send_event(SwarmEvent::AgentError {
1651 subtask_id: subtask.id.clone(),
1652 error: error_text.clone(),
1653 });
1654 results.push(SubTaskResult {
1655 subtask_id: subtask.id.clone(),
1656 subagent_id: format!("agent-{}", subtask.id),
1657 success: false,
1658 result: String::new(),
1659 steps: 0,
1660 tool_calls: 0,
1661 execution_time_ms: 0,
1662 error: Some(error_text),
1663 artifacts: Vec::new(),
1664 });
1665 continue;
1666 }
1667
1668 let branch = K8sManager::subagent_pod_name(&subtask.id);
1669 subtask_names.insert(subtask.id.clone(), subtask.name.clone());
1670 active.insert(
1671 subtask.id.clone(),
1672 ActiveK8sBranch {
1673 branch: branch.clone(),
1674 started_at: Instant::now(),
1675 },
1676 );
1677
1678 self.try_send_event(SwarmEvent::SubTaskUpdate {
1679 id: subtask.id.clone(),
1680 name: subtask.name.clone(),
1681 status: SubTaskStatus::Running,
1682 agent_name: Some(format!("k8s-{branch}")),
1683 });
1684 self.try_send_event(SwarmEvent::AgentStarted {
1685 subtask_id: subtask.id.clone(),
1686 agent_name: format!("k8s-{branch}"),
1687 specialty: subtask
1688 .specialty
1689 .clone()
1690 .unwrap_or_else(|| "generalist".to_string()),
1691 });
1692
1693 tracing::info!(
1694 subtask_id = %subtask.id,
1695 pod = %branch,
1696 "Spawned Kubernetes sub-agent pod"
1697 );
1698 }
1699
1700 if pending.is_empty() && active.is_empty() {
1701 break;
1702 }
1703
1704 tick.tick().await;
1705
1706 let active_ids: Vec<String> = active.keys().cloned().collect();
1707 let mut finished_results: Vec<SubTaskResult> = Vec::new();
1708 for subtask_id in active_ids {
1709 let Some(active_state) = active.get(&subtask_id).cloned() else {
1710 continue;
1711 };
1712
1713 if active_state.started_at.elapsed()
1714 > Duration::from_secs(self.config.subagent_timeout_secs)
1715 {
1716 let reason = format!(
1717 "Timed out after {}s in Kubernetes pod",
1718 self.config.subagent_timeout_secs
1719 );
1720 kill_reasons.insert(subtask_id.clone(), reason.clone());
1721 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1722 tracing::warn!(
1723 subtask_id = %subtask_id,
1724 error = %error,
1725 "Failed deleting timed-out Kubernetes pod"
1726 );
1727 }
1728 active.remove(&subtask_id);
1729 finished_results.push(SubTaskResult {
1730 subtask_id: subtask_id.clone(),
1731 subagent_id: format!("agent-{subtask_id}"),
1732 success: false,
1733 result: String::new(),
1734 steps: 0,
1735 tool_calls: 0,
1736 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1737 error: Some(reason),
1738 artifacts: Vec::new(),
1739 });
1740 continue;
1741 }
1742
1743 let pod_state = match k8s.get_subagent_pod_state(&subtask_id).await {
1744 Ok(state) => state,
1745 Err(error) => {
1746 tracing::warn!(
1747 subtask_id = %subtask_id,
1748 error = %error,
1749 "Failed to query Kubernetes pod state for sub-agent"
1750 );
1751 continue;
1752 }
1753 };
1754 let Some(pod_state) = pod_state else {
1755 active.remove(&subtask_id);
1756 finished_results.push(SubTaskResult {
1757 subtask_id: subtask_id.clone(),
1758 subagent_id: format!("agent-{subtask_id}"),
1759 success: false,
1760 result: String::new(),
1761 steps: 0,
1762 tool_calls: 0,
1763 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1764 error: Some("Sub-agent pod disappeared".to_string()),
1765 artifacts: Vec::new(),
1766 });
1767 continue;
1768 };
1769
1770 let phase = pod_state.phase.to_ascii_lowercase();
1771 let finished = pod_state.terminated || phase == "succeeded" || phase == "failed";
1772 if !finished {
1773 continue;
1774 }
1775
1776 let logs = k8s
1777 .subagent_logs(&subtask_id, 10_000)
1778 .await
1779 .unwrap_or_default();
1780 let mut result = result_from_logs(&logs).unwrap_or_else(|| SubTaskResult {
1781 subtask_id: subtask_id.clone(),
1782 subagent_id: format!("agent-{subtask_id}"),
1783 success: pod_state.exit_code.unwrap_or(1) == 0,
1784 result: logs,
1785 steps: 0,
1786 tool_calls: 0,
1787 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1788 error: if pod_state.exit_code.unwrap_or(1) == 0 {
1789 None
1790 } else {
1791 Some(
1792 pod_state
1793 .reason
1794 .clone()
1795 .unwrap_or_else(|| "Remote sub-agent failed".to_string()),
1796 )
1797 },
1798 artifacts: Vec::new(),
1799 });
1800
1801 if let Some(reason) = kill_reasons.get(&subtask_id) {
1802 result.success = false;
1803 result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1804 result.result.push_str(&format!(
1805 "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1806 ));
1807 }
1808
1809 active.remove(&subtask_id);
1810 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1811 tracing::warn!(
1812 subtask_id = %subtask_id,
1813 error = %error,
1814 "Failed deleting completed Kubernetes pod"
1815 );
1816 }
1817 finished_results.push(result);
1818 }
1819
1820 for result in finished_results {
1821 if result.success {
1822 completed_results
1823 .write()
1824 .await
1825 .insert(result.subtask_id.clone(), result.result.clone());
1826 }
1827 if result.success {
1828 if let Some(ref cache_arc) = self.cache {
1829 let mut cache_guard = cache_arc.lock().await;
1830 let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1831 let _ = cache_guard.put(&cache_subtask, &result).await;
1832 }
1833 }
1834
1835 self.try_send_event(SwarmEvent::SubTaskUpdate {
1836 id: result.subtask_id.clone(),
1837 name: subtask_names
1838 .get(&result.subtask_id)
1839 .cloned()
1840 .unwrap_or_else(|| result.subtask_id.clone()),
1841 status: if result.success {
1842 SubTaskStatus::Completed
1843 } else {
1844 SubTaskStatus::Failed
1845 },
1846 agent_name: Some(format!("k8s-{}", result.subtask_id)),
1847 });
1848 if let Some(ref error) = result.error {
1849 self.try_send_event(SwarmEvent::AgentError {
1850 subtask_id: result.subtask_id.clone(),
1851 error: error.clone(),
1852 });
1853 }
1854 self.try_send_event(SwarmEvent::AgentOutput {
1855 subtask_id: result.subtask_id.clone(),
1856 output: result.result.clone(),
1857 });
1858 self.try_send_event(SwarmEvent::AgentComplete {
1859 subtask_id: result.subtask_id.clone(),
1860 success: result.success,
1861 steps: result.steps,
1862 });
1863 results.push(result);
1864 }
1865
1866 if active.len() > 1 {
1867 let mut observations = Vec::with_capacity(active.len());
1868 for (subtask_id, state) in &active {
1869 let pod_state = match k8s.get_subagent_pod_state(subtask_id).await {
1870 Ok(state) => state,
1871 Err(error) => {
1872 tracing::warn!(
1873 subtask_id = %subtask_id,
1874 error = %error,
1875 "Failed to query pod state while sampling branch observation"
1876 );
1877 None
1878 }
1879 };
1880 let (resource_health_score, infra_unhealthy_signals) =
1881 compute_resource_health(pod_state.as_ref());
1882
1883 let logs = k8s.subagent_logs(subtask_id, 500).await.unwrap_or_default();
1884 if let Some(probe) = latest_probe_from_logs(&logs) {
1885 let compile_ok = pod_state
1886 .as_ref()
1887 .map(|p| probe.compile_ok && p.phase.to_ascii_lowercase() != "failed")
1888 .unwrap_or(probe.compile_ok);
1889 observations.push(BranchObservation {
1890 subtask_id: subtask_id.clone(),
1891 branch: state.branch.clone(),
1892 compile_ok,
1893 changed_files: probe_changed_files_set(&probe),
1894 changed_lines: probe.changed_lines,
1895 resource_health_score,
1896 infra_unhealthy_signals,
1897 });
1898 continue;
1899 }
1900 let compile_ok = pod_state
1901 .as_ref()
1902 .map(|p| p.phase.to_ascii_lowercase() != "failed")
1903 .unwrap_or(false);
1904 observations.push(BranchObservation {
1905 subtask_id: subtask_id.clone(),
1906 branch: state.branch.clone(),
1907 compile_ok,
1908 changed_files: std::collections::HashSet::new(),
1909 changed_lines: 0,
1910 resource_health_score,
1911 infra_unhealthy_signals,
1912 });
1913 }
1914
1915 let tick = collapse_controller.sample_observations(&observations);
1916 if promoted_subtask_id != tick.promoted_subtask_id {
1917 promoted_subtask_id = tick.promoted_subtask_id.clone();
1918 if let Some(ref promoted) = promoted_subtask_id {
1919 tracing::info!(subtask_id = %promoted, "Collapse controller promoted branch");
1920 if let Some(audit) = crate::audit::try_audit_log() {
1921 audit
1922 .log_with_correlation(
1923 crate::audit::AuditCategory::Swarm,
1924 "collapse_promote_branch",
1925 crate::audit::AuditOutcome::Success,
1926 Some("collapse-controller".to_string()),
1927 Some(serde_json::json!({
1928 "swarm_id": swarm_id,
1929 "subtask_id": promoted,
1930 "execution_mode": "kubernetes_pod",
1931 })),
1932 None,
1933 None,
1934 Some(swarm_id.to_string()),
1935 None,
1936 )
1937 .await;
1938 }
1939 }
1940 }
1941
1942 for kill in tick.kills {
1943 if kill_reasons.contains_key(&kill.subtask_id) {
1944 continue;
1945 }
1946 if !active.contains_key(&kill.subtask_id) {
1947 continue;
1948 }
1949
1950 if let Err(error) = k8s.delete_subagent_pod(&kill.subtask_id).await {
1951 tracing::warn!(
1952 subtask_id = %kill.subtask_id,
1953 error = %error,
1954 "Failed deleting Kubernetes pod after collapse kill"
1955 );
1956 }
1957 kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1958 let elapsed_ms = active
1959 .remove(&kill.subtask_id)
1960 .map(|s| s.started_at.elapsed().as_millis() as u64)
1961 .unwrap_or(0);
1962
1963 tracing::warn!(
1964 subtask_id = %kill.subtask_id,
1965 branch = %kill.branch,
1966 reason = %kill.reason,
1967 "Collapse controller killed Kubernetes branch"
1968 );
1969
1970 if let Some(audit) = crate::audit::try_audit_log() {
1971 audit
1972 .log_with_correlation(
1973 crate::audit::AuditCategory::Swarm,
1974 "collapse_kill_branch",
1975 crate::audit::AuditOutcome::Success,
1976 Some("collapse-controller".to_string()),
1977 Some(serde_json::json!({
1978 "swarm_id": swarm_id,
1979 "subtask_id": kill.subtask_id.clone(),
1980 "branch": kill.branch.clone(),
1981 "reason": kill.reason.clone(),
1982 "execution_mode": "kubernetes_pod",
1983 })),
1984 None,
1985 None,
1986 Some(swarm_id.to_string()),
1987 None,
1988 )
1989 .await;
1990 }
1991
1992 self.try_send_event(SwarmEvent::SubTaskUpdate {
1993 id: kill.subtask_id.clone(),
1994 name: subtask_names
1995 .get(&kill.subtask_id)
1996 .cloned()
1997 .unwrap_or_else(|| kill.subtask_id.clone()),
1998 status: SubTaskStatus::Cancelled,
1999 agent_name: Some(format!("agent-{}", kill.subtask_id)),
2000 });
2001 self.try_send_event(SwarmEvent::AgentError {
2002 subtask_id: kill.subtask_id.clone(),
2003 error: format!("Cancelled by collapse controller: {}", kill.reason),
2004 });
2005
2006 results.push(SubTaskResult {
2007 subtask_id: kill.subtask_id.clone(),
2008 subagent_id: format!("agent-{}", kill.subtask_id),
2009 success: false,
2010 result: format!(
2011 "\n\n--- Collapse Controller ---\nBranch terminated: {}",
2012 kill.reason
2013 ),
2014 steps: 0,
2015 tool_calls: 0,
2016 execution_time_ms: elapsed_ms,
2017 error: Some(format!("Cancelled by collapse controller: {}", kill.reason)),
2018 artifacts: Vec::new(),
2019 });
2020 }
2021 }
2022 }
2023
2024 if let Some(ref promoted) = promoted_subtask_id {
2025 results.sort_by_key(|result| {
2026 if &result.subtask_id == promoted {
2027 0usize
2028 } else {
2029 1usize
2030 }
2031 });
2032 }
2033
2034 if !active.is_empty() {
2035 let residual_ids: Vec<String> = active.keys().cloned().collect();
2036 for subtask_id in residual_ids {
2037 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
2038 tracing::warn!(
2039 subtask_id = %subtask_id,
2040 error = %error,
2041 "Failed deleting residual Kubernetes pod at stage end"
2042 );
2043 }
2044 }
2045 }
2046
2047 Ok(results)
2048 }
2049
2050 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
2052 let mut aggregated = String::new();
2053
2054 for (i, result) in results.iter().enumerate() {
2055 if result.success {
2056 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
2057 } else {
2058 aggregated.push_str(&format!(
2059 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
2060 i + 1,
2061 result.error.as_deref().unwrap_or("Unknown error")
2062 ));
2063 }
2064 }
2065
2066 Ok(aggregated)
2067 }
2068
2069 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
2071 self.execute(task, DecompositionStrategy::None).await
2072 }
2073}
2074
2075pub struct SwarmExecutorBuilder {
2077 config: SwarmConfig,
2078}
2079
2080impl SwarmExecutorBuilder {
2081 pub fn new() -> Self {
2082 Self {
2083 config: SwarmConfig::default(),
2084 }
2085 }
2086
2087 pub fn max_subagents(mut self, max: usize) -> Self {
2088 self.config.max_subagents = max;
2089 self
2090 }
2091
2092 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
2093 self.config.max_steps_per_subagent = max;
2094 self
2095 }
2096
2097 pub fn max_total_steps(mut self, max: usize) -> Self {
2098 self.config.max_total_steps = max;
2099 self
2100 }
2101
2102 pub fn timeout_secs(mut self, secs: u64) -> Self {
2103 self.config.subagent_timeout_secs = secs;
2104 self
2105 }
2106
2107 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
2108 self.config.parallel_enabled = enabled;
2109 self
2110 }
2111
2112 pub fn build(self) -> SwarmExecutor {
2113 SwarmExecutor::new(self.config)
2114 }
2115}
2116
2117impl Default for SwarmExecutorBuilder {
2118 fn default() -> Self {
2119 Self::new()
2120 }
2121}
2122
2123#[allow(clippy::too_many_arguments)]
2125fn resolve_tool_paths(
2132 tool_name: &str,
2133 args: &mut serde_json::Value,
2134 working_dir: &std::path::Path,
2135) {
2136 match tool_name {
2137 "read" | "write" | "list" | "grep" | "codesearch" => {
2138 if let Some(path) = args.get("path").and_then(|v| v.as_str()).map(String::from) {
2139 if !std::path::Path::new(&path).is_absolute() {
2140 args["path"] = serde_json::json!(working_dir.join(&path).display().to_string());
2141 }
2142 }
2143 }
2144 "edit" => {
2145 if let Some(path) = args
2146 .get("filePath")
2147 .and_then(|v| v.as_str())
2148 .map(String::from)
2149 {
2150 if !std::path::Path::new(&path).is_absolute() {
2151 args["filePath"] =
2152 serde_json::json!(working_dir.join(&path).display().to_string());
2153 }
2154 }
2155 }
2156 "glob" => {
2157 if let Some(pattern) = args
2158 .get("pattern")
2159 .and_then(|v| v.as_str())
2160 .map(String::from)
2161 {
2162 if !std::path::Path::new(&pattern).is_absolute() && !pattern.starts_with("*") {
2163 args["pattern"] =
2164 serde_json::json!(working_dir.join(&pattern).display().to_string());
2165 }
2166 }
2167 }
2168 "multiedit" => {
2169 if let Some(edits) = args.get_mut("edits").and_then(|v| v.as_array_mut()) {
2170 for edit in edits.iter_mut() {
2171 if let Some(file) = edit.get("file").and_then(|v| v.as_str()).map(String::from)
2172 {
2173 if !std::path::Path::new(&file).is_absolute() {
2174 edit["file"] =
2175 serde_json::json!(working_dir.join(&file).display().to_string());
2176 }
2177 }
2178 }
2179 }
2180 }
2181 "patch" => {
2182 if let Some(path) = args.get("file").and_then(|v| v.as_str()).map(String::from) {
2183 if !std::path::Path::new(&path).is_absolute() {
2184 args["file"] = serde_json::json!(working_dir.join(&path).display().to_string());
2185 }
2186 }
2187 }
2188 "bash" => {
2189 if args.get("cwd").and_then(|v| v.as_str()).is_none() {
2191 args["cwd"] = serde_json::json!(working_dir.display().to_string());
2192 }
2193 }
2194 _ => {}
2195 }
2196}
2197
2198pub async fn run_agent_loop(
2199 provider: Arc<dyn Provider>,
2200 model: &str,
2201 system_prompt: &str,
2202 user_prompt: &str,
2203 tools: Vec<crate::provider::ToolDefinition>,
2204 registry: Arc<ToolRegistry>,
2205 max_steps: usize,
2206 timeout_secs: u64,
2207 event_tx: Option<mpsc::Sender<SwarmEvent>>,
2208 subtask_id: String,
2209 bus: Option<Arc<AgentBus>>,
2210 working_dir: Option<std::path::PathBuf>,
2211) -> Result<(String, usize, usize, AgentLoopExit)> {
2212 let temperature = 0.7;
2214
2215 tracing::info!(
2216 model = %model,
2217 max_steps = max_steps,
2218 timeout_secs = timeout_secs,
2219 "Sub-agent starting agentic loop"
2220 );
2221 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
2222 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
2223
2224 let mut messages = vec![
2226 Message {
2227 role: Role::System,
2228 content: vec![ContentPart::Text {
2229 text: system_prompt.to_string(),
2230 }],
2231 },
2232 Message {
2233 role: Role::User,
2234 content: vec![ContentPart::Text {
2235 text: user_prompt.to_string(),
2236 }],
2237 },
2238 ];
2239
2240 let mut steps = 0;
2241 let mut total_tool_calls = 0;
2242 let mut final_output = String::new();
2243
2244 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
2245
2246 let exit_reason = loop {
2247 if steps >= max_steps {
2248 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
2249 break AgentLoopExit::MaxStepsReached;
2250 }
2251
2252 if Instant::now() > deadline {
2253 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
2254 break AgentLoopExit::TimedOut;
2255 }
2256
2257 steps += 1;
2258 tracing::info!(step = steps, "Sub-agent step starting");
2259
2260 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
2262
2263 let request = CompletionRequest {
2264 messages: messages.clone(),
2265 tools: tools.clone(),
2266 model: model.to_string(),
2267 temperature: Some(temperature),
2268 top_p: None,
2269 max_tokens: Some(8192),
2270 stop: Vec::new(),
2271 };
2272
2273 let step_start = Instant::now();
2274 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
2275 let step_duration = step_start.elapsed();
2276
2277 tracing::info!(
2278 step = steps,
2279 duration_ms = step_duration.as_millis() as u64,
2280 finish_reason = ?response.finish_reason,
2281 prompt_tokens = response.usage.prompt_tokens,
2282 completion_tokens = response.usage.completion_tokens,
2283 "Sub-agent step completed LLM call"
2284 );
2285
2286 let mut text_parts = Vec::new();
2288 let mut thinking_parts = Vec::new();
2289 let mut tool_calls = Vec::new();
2290
2291 for part in &response.message.content {
2292 match part {
2293 ContentPart::Text { text } => {
2294 text_parts.push(text.clone());
2295 }
2296 ContentPart::Thinking { text } if !text.is_empty() => {
2297 thinking_parts.push(text.clone());
2298 }
2299 ContentPart::ToolCall {
2300 id,
2301 name,
2302 arguments,
2303 ..
2304 } => {
2305 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
2306 }
2307 _ => {}
2308 }
2309 }
2310
2311 if !thinking_parts.is_empty() {
2313 if let Some(ref bus) = bus {
2314 let thinking_text = thinking_parts.join("\n");
2315 let handle = bus.handle(&subtask_id);
2316 handle.send(
2317 format!("agent.{subtask_id}.thinking"),
2318 BusMessage::AgentThinking {
2319 agent_id: subtask_id.clone(),
2320 thinking: thinking_text,
2321 step: steps,
2322 },
2323 );
2324 }
2325 }
2326
2327 if !text_parts.is_empty() {
2329 let step_output = text_parts.join("\n");
2330 if !final_output.is_empty() {
2331 final_output.push('\n');
2332 }
2333 final_output.push_str(&step_output);
2334 tracing::info!(
2335 step = steps,
2336 output_len = final_output.len(),
2337 "Sub-agent text output"
2338 );
2339 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
2340
2341 if let Some(ref tx) = event_tx {
2343 let preview = if step_output.len() > 500 {
2344 let mut end = 500;
2345 while end > 0 && !step_output.is_char_boundary(end) {
2346 end -= 1;
2347 }
2348 format!("{}...", &step_output[..end])
2349 } else {
2350 step_output.clone()
2351 };
2352 let _ = tx.try_send(SwarmEvent::AgentMessage {
2353 subtask_id: subtask_id.clone(),
2354 entry: AgentMessageEntry {
2355 role: "assistant".to_string(),
2356 content: preview,
2357 is_tool_call: false,
2358 },
2359 });
2360 }
2361 }
2362
2363 if !tool_calls.is_empty() {
2365 tracing::info!(
2366 step = steps,
2367 num_tool_calls = tool_calls.len(),
2368 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
2369 "Sub-agent requesting tool calls"
2370 );
2371 }
2372
2373 messages.push(response.message.clone());
2375
2376 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
2378 tracing::info!(
2379 steps = steps,
2380 total_tool_calls = total_tool_calls,
2381 "Sub-agent finished"
2382 );
2383 break AgentLoopExit::Completed;
2384 }
2385
2386 let mut tool_results = Vec::new();
2388
2389 for (call_id, tool_name, arguments) in tool_calls {
2390 total_tool_calls += 1;
2391
2392 if let Some(ref tx) = event_tx {
2394 let _ = tx.try_send(SwarmEvent::AgentToolCall {
2395 subtask_id: subtask_id.clone(),
2396 tool_name: tool_name.clone(),
2397 });
2398 }
2399
2400 tracing::info!(
2401 step = steps,
2402 tool_call_id = %call_id,
2403 tool = %tool_name,
2404 "Executing tool"
2405 );
2406 tracing::debug!(
2407 tool = %tool_name,
2408 arguments = %arguments,
2409 "Tool call arguments"
2410 );
2411
2412 let tool_start = Instant::now();
2413 let mut tool_success = true;
2414 let result = if let Some(tool) = registry.get(&tool_name) {
2415 let mut args: serde_json::Value =
2417 serde_json::from_str(&arguments).unwrap_or_else(|e| {
2418 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
2419 serde_json::json!({})
2420 });
2421
2422 if let Some(ref wd) = working_dir {
2424 resolve_tool_paths(&tool_name, &mut args, wd);
2425 }
2426
2427 match tool.execute(args).await {
2428 Ok(r) => {
2429 if r.success {
2430 tracing::info!(
2431 tool = %tool_name,
2432 duration_ms = tool_start.elapsed().as_millis() as u64,
2433 success = true,
2434 "Tool execution completed"
2435 );
2436 r.output
2437 } else {
2438 tool_success = false;
2439 tracing::warn!(
2440 tool = %tool_name,
2441 error = %r.output,
2442 "Tool returned error"
2443 );
2444 format!("Tool error: {}", r.output)
2445 }
2446 }
2447 Err(e) => {
2448 tool_success = false;
2449 tracing::error!(
2450 tool = %tool_name,
2451 error = %e,
2452 "Tool execution failed"
2453 );
2454 format!("Tool execution failed: {}", e)
2455 }
2456 }
2457 } else {
2458 tool_success = false;
2459 tracing::error!(tool = %tool_name, "Unknown tool requested");
2460 format!("Unknown tool: {}", tool_name)
2461 };
2462
2463 if let Some(ref tx) = event_tx {
2465 let input_preview = if arguments.len() > 200 {
2466 let mut end = 200;
2467 while end > 0 && !arguments.is_char_boundary(end) {
2468 end -= 1;
2469 }
2470 format!("{}...", &arguments[..end])
2471 } else {
2472 arguments.clone()
2473 };
2474 let output_preview = if result.len() > 500 {
2475 let mut end = 500;
2476 while end > 0 && !result.is_char_boundary(end) {
2477 end -= 1;
2478 }
2479 format!("{}...", &result[..end])
2480 } else {
2481 result.clone()
2482 };
2483 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
2484 subtask_id: subtask_id.clone(),
2485 detail: AgentToolCallDetail {
2486 tool_name: tool_name.clone(),
2487 input_preview,
2488 output_preview,
2489 success: tool_success,
2490 },
2491 });
2492 }
2493
2494 tracing::debug!(
2495 tool = %tool_name,
2496 result_len = result.len(),
2497 "Tool result"
2498 );
2499
2500 if let Some(ref bus) = bus {
2503 let handle = bus.handle(&subtask_id);
2504 handle.send(
2505 format!("tools.{tool_name}"),
2506 BusMessage::ToolOutputFull {
2507 agent_id: subtask_id.clone(),
2508 tool_name: tool_name.clone(),
2509 output: result.clone(),
2510 success: tool_success,
2511 step: steps,
2512 },
2513 );
2514 }
2515
2516 let result = if result.len() > RLM_THRESHOLD_CHARS {
2518 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
2520 .await
2521 } else {
2522 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
2524 };
2525
2526 tool_results.push((call_id, tool_name, result));
2527 }
2528
2529 for (call_id, _tool_name, result) in tool_results {
2531 messages.push(Message {
2532 role: Role::Tool,
2533 content: vec![ContentPart::ToolResult {
2534 tool_call_id: call_id,
2535 content: result,
2536 }],
2537 });
2538 }
2539
2540 deadline = Instant::now() + Duration::from_secs(timeout_secs);
2542 };
2543
2544 Ok((final_output, steps, total_tool_calls, exit_reason))
2545}