1use super::{
7 BranchObservation, BranchRuntimeState, CacheConfig, CacheStats, CollapseController,
8 CollapsePolicy, DecompositionStrategy, ExecutionMode, StageStats, SwarmCache, SwarmConfig,
9 SwarmResult, k8s_result,
10 kubernetes_executor::{
11 RemoteSubtaskPayload, SWARM_SUBTASK_PAYLOAD_ENV, encode_payload, latest_probe_from_logs,
12 probe_changed_files_set,
13 },
14 orchestrator::Orchestrator,
15 result_store::ResultStore,
16 subtask::{SubTask, SubTaskResult, SubTaskStatus},
17 tool_policy,
18};
19use crate::bus::{AgentBus, BusMessage};
20use crate::k8s::{K8sManager, SubagentPodSpec, SubagentPodState};
21use crate::session::delegation::DelegationState;
22use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
23use std::sync::Mutex as StdMutex;
24
25pub use super::SwarmMessage;
27use crate::{
28 agent::Agent,
29 provenance::{ExecutionOrigin, ExecutionProvenance},
30 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
31 rlm::RlmExecutor,
32 session::helper::runtime::enrich_tool_input_with_runtime_context,
33 swarm::{SwarmArtifact, SwarmStats},
34 telemetry::SwarmTelemetryCollector,
35 tool::ToolRegistry,
36 worktree::{WorktreeInfo, WorktreeManager},
37};
38use anyhow::Result;
39use futures::stream::{self, FuturesUnordered, StreamExt};
40use std::collections::{HashMap, VecDeque};
41use std::sync::Arc;
42use std::time::Instant;
43use tokio::sync::{RwLock, mpsc};
44use tokio::task::AbortHandle;
45use tokio::time::{Duration, MissedTickBehavior, timeout};
46#[path = "path_guard/mod.rs"]
47mod path_guard;
48
49const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
51
52const RESPONSE_RESERVE_TOKENS: usize = 8_192;
54
55const TRUNCATION_THRESHOLD: f64 = 0.85;
57
58fn estimate_tokens(text: &str) -> usize {
60 (text.len() as f64 / 3.5).ceil() as usize
64}
65
66fn estimate_message_tokens(message: &Message) -> usize {
68 let mut tokens = 4; for part in &message.content {
71 tokens += match part {
72 ContentPart::Text { text } => estimate_tokens(text),
73 ContentPart::ToolCall {
74 id,
75 name,
76 arguments,
77 ..
78 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
79 ContentPart::ToolResult {
80 tool_call_id,
81 content,
82 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
83 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, ContentPart::Thinking { text } => estimate_tokens(text),
85 };
86 }
87
88 tokens
89}
90
91fn estimate_total_tokens(messages: &[Message]) -> usize {
93 messages.iter().map(estimate_message_tokens).sum()
94}
95
96fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
104 let target_tokens =
105 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
106
107 let current_tokens = estimate_total_tokens(messages);
108 if current_tokens <= target_tokens {
109 return;
110 }
111
112 tracing::warn!(
113 current_tokens = current_tokens,
114 target_tokens = target_tokens,
115 context_limit = context_limit,
116 "Context approaching limit, truncating conversation history"
117 );
118
119 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
123 if after_tool_truncation <= target_tokens {
124 tracing::info!(
125 old_tokens = current_tokens,
126 new_tokens = after_tool_truncation,
127 "Truncated large tool results, context now within limits"
128 );
129 return;
130 }
131
132 if messages.len() <= 6 {
134 tracing::warn!(
135 tokens = after_tool_truncation,
136 target = target_tokens,
137 "Cannot truncate further - conversation too short"
138 );
139 return;
140 }
141
142 let keep_start = 2;
145 let keep_end = 4;
146 let removable_count = messages.len() - keep_start - keep_end;
147
148 if removable_count == 0 {
149 return;
150 }
151
152 let removed_messages: Vec<_> = messages
154 .drain(keep_start..keep_start + removable_count)
155 .collect();
156 let summary = summarize_removed_messages(&removed_messages);
157
158 messages.insert(
160 keep_start,
161 Message {
162 role: Role::User,
163 content: vec![ContentPart::Text {
164 text: format!(
165 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
166 removed_messages.len(),
167 summary
168 ),
169 }],
170 },
171 );
172
173 let new_tokens = estimate_total_tokens(messages);
174 tracing::info!(
175 removed_messages = removed_messages.len(),
176 old_tokens = current_tokens,
177 new_tokens = new_tokens,
178 "Truncated conversation history"
179 );
180}
181
182fn summarize_removed_messages(messages: &[Message]) -> String {
184 let mut summary = String::new();
185 let mut tool_calls: Vec<String> = Vec::new();
186
187 for msg in messages {
188 for part in &msg.content {
189 if let ContentPart::ToolCall { name, .. } = part
190 && !tool_calls.contains(name)
191 {
192 tool_calls.push(name.clone());
193 }
194 }
195 }
196
197 if !tool_calls.is_empty() {
198 summary.push_str(&format!(
199 "Tools used in truncated history: {}",
200 tool_calls.join(", ")
201 ));
202 }
203
204 summary
205}
206
207fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
209 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
211 let mut saved_tokens = 0usize;
212
213 for message in messages.iter_mut() {
214 for part in message.content.iter_mut() {
215 if let ContentPart::ToolResult { content, .. } = part {
216 let tokens = estimate_tokens(content);
217 if tokens > max_tokens_per_result {
218 let old_len = content.len();
219 *content = truncate_single_result(content, char_limit);
220 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
221 if content.len() < old_len {
222 truncated_count += 1;
223 }
224 }
225 }
226 }
227 }
228
229 if truncated_count > 0 {
230 tracing::info!(
231 truncated_count = truncated_count,
232 saved_tokens = saved_tokens,
233 max_tokens_per_result = max_tokens_per_result,
234 "Truncated large tool results"
235 );
236 }
237}
238
239fn truncate_single_result(content: &str, max_chars: usize) -> String {
241 if content.len() <= max_chars {
242 return content.to_string();
243 }
244
245 let safe_limit = {
247 let mut limit = max_chars.min(content.len());
248 while limit > 0 && !content.is_char_boundary(limit) {
249 limit -= 1;
250 }
251 limit
252 };
253
254 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
256
257 let truncated = format!(
258 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
259 &content[..break_point],
260 content.len(),
261 break_point
262 );
263
264 tracing::debug!(
265 original_len = content.len(),
266 truncated_len = truncated.len(),
267 "Truncated large result"
268 );
269
270 truncated
271}
272
273const RLM_THRESHOLD_CHARS: usize = 50_000;
275
276const SIMPLE_TRUNCATE_CHARS: usize = 6000;
278
279const COLLAPSE_SAMPLE_SECS: u64 = 5;
281const K8S_PASSTHROUGH_ENV_VARS: &[&str] = &[
282 "VAULT_ADDR",
283 "VAULT_TOKEN",
284 "VAULT_MOUNT",
285 "VAULT_SECRETS_PATH",
286 "VAULT_NAMESPACE",
287 "CODETETHER_AUTH_TOKEN",
288];
289
290#[derive(Debug, Clone)]
291struct ActiveK8sBranch {
292 branch: String,
293 started_at: Instant,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum AgentLoopExit {
298 Completed,
299 MaxStepsReached,
300 TimedOut,
301}
302
303fn calculate_backoff_delay(
307 attempt: u32,
308 initial_delay_ms: u64,
309 max_delay_ms: u64,
310 multiplier: f64,
311) -> Duration {
312 let delay_ms =
313 (initial_delay_ms as f64 * multiplier.powi(attempt as i32)).min(max_delay_ms as f64);
314 Duration::from_millis(delay_ms as u64)
315}
316
317fn compute_resource_health(pod_state: Option<&SubagentPodState>) -> (f32, u32) {
318 let Some(pod_state) = pod_state else {
319 return (0.2, 1);
320 };
321
322 let reason = pod_state
323 .reason
324 .as_deref()
325 .unwrap_or_default()
326 .to_ascii_lowercase();
327 let phase = pod_state.phase.to_ascii_lowercase();
328
329 if reason.contains("oomkilled") {
330 return (0.0, 3);
331 }
332 if reason.contains("imagepullbackoff") || reason.contains("errimagepull") {
333 return (0.0, 3);
334 }
335 if reason.contains("crashloopbackoff") {
336 return (0.1, 2);
337 }
338 if phase == "failed" {
339 return (0.1, 2);
340 }
341
342 let mut score = 1.0f32;
343 let mut unhealthy_signals = 0u32;
344
345 if !pod_state.ready {
346 score -= 0.2;
347 }
348 if !reason.is_empty() {
349 score -= 0.3;
350 unhealthy_signals += 1;
351 }
352 if pod_state.restart_count > 0 {
353 score -= (pod_state.restart_count.min(3) as f32) * 0.2;
354 unhealthy_signals += 1;
355 }
356
357 (score.clamp(0.0, 1.0), unhealthy_signals)
358}
359
360async fn process_large_result_with_rlm(
362 content: &str,
363 tool_name: &str,
364 provider: Arc<dyn Provider>,
365 model: &str,
366) -> String {
367 if content.len() <= SIMPLE_TRUNCATE_CHARS {
368 return content.to_string();
369 }
370
371 if content.len() <= RLM_THRESHOLD_CHARS {
373 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
374 }
375
376 tracing::info!(
378 tool = %tool_name,
379 content_len = content.len(),
380 "Using RLM to process large tool result"
381 );
382
383 let query = format!(
384 "Summarize the key information from this {} output. \
385 Focus on: errors, warnings, important findings, and actionable items. \
386 Be concise but thorough.",
387 tool_name
388 );
389
390 let mut executor =
391 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
392
393 match executor.analyze(&query).await {
394 Ok(result) => {
395 let bounded_answer = truncate_single_result(&result.answer, SIMPLE_TRUNCATE_CHARS * 2);
396 tracing::info!(
397 tool = %tool_name,
398 original_len = content.len(),
399 summary_len = bounded_answer.len(),
400 iterations = result.iterations,
401 "RLM summarized large result"
402 );
403
404 format!(
405 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
406 tool_name,
407 content.len(),
408 bounded_answer.len(),
409 bounded_answer
410 )
411 }
412 Err(e) => {
413 tracing::warn!(
414 tool = %tool_name,
415 error = %e,
416 "RLM analysis failed, falling back to truncation"
417 );
418 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
419 }
420 }
421}
422
423pub struct SwarmExecutor {
425 config: SwarmConfig,
426 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
428 event_tx: Option<mpsc::Sender<SwarmEvent>>,
430 telemetry: Arc<SwarmTelemetryCollector>,
432 cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
434 result_store: Arc<ResultStore>,
436 bus: Option<Arc<AgentBus>>,
438 delegation: Option<Arc<StdMutex<DelegationState>>>,
444}
445
446impl SwarmExecutor {
447 pub fn new(config: SwarmConfig) -> Self {
449 Self {
450 config,
451 coordinator_agent: None,
452 event_tx: None,
453 telemetry: Arc::new(SwarmTelemetryCollector::default()),
454 cache: None,
455 result_store: ResultStore::new_arc(),
456 bus: None,
457 delegation: None,
458 }
459 }
460
461 pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
463 let cache = SwarmCache::new(cache_config).await?;
464 Ok(Self {
465 config,
466 coordinator_agent: None,
467 event_tx: None,
468 telemetry: Arc::new(SwarmTelemetryCollector::default()),
469 cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
470 result_store: ResultStore::new_arc(),
471 bus: None,
472 delegation: None,
473 })
474 }
475
476 pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
478 self.cache = Some(cache);
479 self
480 }
481
482 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
484 self.bus = Some(bus);
485 self
486 }
487
488 pub fn with_delegation(mut self, state: DelegationState) -> Self {
490 self.delegation = Some(Arc::new(StdMutex::new(state)));
491 self
492 }
493
494 pub fn bus(&self) -> Option<&Arc<AgentBus>> {
496 self.bus.as_ref()
497 }
498
499 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
501 self.event_tx = Some(tx);
502 self
503 }
504
505 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
507 tracing::debug!("Setting coordinator agent for swarm execution");
508 self.coordinator_agent = Some(agent);
509 self
510 }
511
512 pub fn with_telemetry(mut self, telemetry: Arc<SwarmTelemetryCollector>) -> Self {
514 self.telemetry = telemetry;
515 self
516 }
517
518 pub fn telemetry_arc(&self) -> Arc<SwarmTelemetryCollector> {
520 Arc::clone(&self.telemetry)
521 }
522 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
524 tracing::debug!(
525 has_coordinator = self.coordinator_agent.is_some(),
526 "Getting coordinator agent"
527 );
528 self.coordinator_agent.as_ref()
529 }
530
531 pub fn result_store(&self) -> &Arc<ResultStore> {
533 &self.result_store
534 }
535
536 pub async fn cache_stats(&self) -> Option<CacheStats> {
538 if let Some(ref cache) = self.cache {
539 let cache_guard = cache.lock().await;
540 Some(cache_guard.stats().clone())
541 } else {
542 None
543 }
544 }
545
546 pub async fn clear_cache(&self) -> Result<()> {
548 if let Some(ref cache) = self.cache {
549 let mut cache_guard = cache.lock().await;
550 cache_guard.clear().await?;
551 }
552 Ok(())
553 }
554
555 pub fn retry_config(&self) -> (u32, u64, u64, f64) {
557 (
558 self.config.max_retries,
559 self.config.base_delay_ms,
560 self.config.max_delay_ms,
561 2.0, )
563 }
564
565 pub fn retries_enabled(&self) -> bool {
567 self.config.max_retries > 0
568 }
569
570 fn try_send_event(&self, event: SwarmEvent) {
572 if let Some(ref bus) = self.bus {
574 let handle = bus.handle("swarm-executor");
575 match &event {
576 SwarmEvent::Started { task, .. } => {
577 handle.send(
578 "broadcast",
579 BusMessage::AgentReady {
580 agent_id: "swarm-executor".to_string(),
581 capabilities: vec![format!("executing:{task}")],
582 },
583 );
584 }
585 SwarmEvent::Complete { success, .. } => {
586 let state = if *success {
587 crate::a2a::types::TaskState::Completed
588 } else {
589 crate::a2a::types::TaskState::Failed
590 };
591 handle.send_task_update("swarm", state, None);
592 }
593 _ => {} }
595 }
596
597 if let Some(ref tx) = self.event_tx {
598 let _ = tx.try_send(event);
599 }
600 }
601
602 pub async fn execute(
604 &self,
605 task: &str,
606 strategy: DecompositionStrategy,
607 ) -> Result<SwarmResult> {
608 let start_time = Instant::now();
609
610 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
612
613 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
614
615 let subtasks = orchestrator.decompose(task, strategy).await?;
617
618 if subtasks.is_empty() {
619 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
620 return Ok(SwarmResult {
621 success: false,
622 result: String::new(),
623 subtask_results: Vec::new(),
624 stats: SwarmStats::default(),
625 artifacts: Vec::new(),
626 error: Some("No subtasks generated".to_string()),
627 });
628 }
629
630 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
631
632 self.try_send_event(SwarmEvent::Started {
633 task: task.to_string(),
634 total_subtasks: subtasks.len(),
635 });
636
637 self.try_send_event(SwarmEvent::Decomposed {
639 subtasks: subtasks
640 .iter()
641 .map(|s| SubTaskInfo {
642 id: s.id.clone(),
643 name: s.name.clone(),
644 status: SubTaskStatus::Pending,
645 stage: s.stage,
646 dependencies: s.dependencies.clone(),
647 agent_name: s.specialty.clone(),
648 current_tool: None,
649 steps: 0,
650 max_steps: self.config.max_steps_per_subagent,
651 tool_call_history: Vec::new(),
652 messages: Vec::new(),
653 output: None,
654 error: None,
655 })
656 .collect(),
657 });
658
659 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
661 let mut all_results: Vec<SubTaskResult> = Vec::new();
662 let artifacts: Vec<SwarmArtifact> = Vec::new();
663
664 let swarm_id = uuid::Uuid::new_v4().to_string();
666 let strategy_str = format!("{:?}", strategy);
667 self.telemetry
668 .start_swarm(&swarm_id, subtasks.len(), &strategy_str)
669 .await;
670
671 let completed_results: Arc<RwLock<HashMap<String, String>>> =
673 Arc::new(RwLock::new(HashMap::new()));
674
675 for stage in 0..=max_stage {
676 let stage_start = Instant::now();
677
678 let stage_subtasks: Vec<SubTask> = orchestrator
679 .subtasks_for_stage(stage)
680 .into_iter()
681 .cloned()
682 .collect();
683
684 tracing::debug!(
685 "Stage {} has {} subtasks (max_stage={})",
686 stage,
687 stage_subtasks.len(),
688 max_stage
689 );
690
691 if stage_subtasks.is_empty() {
692 continue;
693 }
694
695 tracing::info!(
696 provider_name = %orchestrator.provider(),
697 "Executing stage {} with {} subtasks",
698 stage,
699 stage_subtasks.len()
700 );
701
702 let stage_results = self
704 .execute_stage(
705 &orchestrator,
706 stage_subtasks,
707 completed_results.clone(),
708 &swarm_id,
709 )
710 .await?;
711
712 {
714 let mut completed = completed_results.write().await;
715 for result in &stage_results {
716 completed.insert(result.subtask_id.clone(), result.result.clone());
717 let tags = vec![
719 format!("stage:{stage}"),
720 format!("subtask:{}", result.subtask_id),
721 ];
722 let _ = self
723 .result_store
724 .publish(
725 &result.subtask_id,
726 &result.subagent_id,
727 &result.result,
728 tags,
729 None,
730 )
731 .await;
732 }
733 }
734
735 let stage_time = stage_start.elapsed().as_millis() as u64;
737 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
738 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
739
740 orchestrator.stats_mut().stages.push(StageStats {
741 stage,
742 subagent_count: stage_results.len(),
743 max_steps,
744 total_steps,
745 execution_time_ms: stage_time,
746 });
747
748 for result in &stage_results {
750 orchestrator.complete_subtask(&result.subtask_id, result.clone());
751 }
752
753 let stage_completed = stage_results.iter().filter(|r| r.success).count();
755 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
756 self.try_send_event(SwarmEvent::StageComplete {
757 stage,
758 completed: stage_completed,
759 failed: stage_failed,
760 });
761
762 all_results.extend(stage_results);
763 }
764
765 let provider_name = orchestrator.provider().to_string();
767
768 self.telemetry
770 .record_swarm_latency("total_execution", start_time.elapsed())
771 .await;
772
773 let stats = orchestrator.stats_mut();
775 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
776 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
777 stats.calculate_critical_path();
778 stats.calculate_speedup();
779
780 let success = all_results.iter().all(|r| r.success);
782
783 let _telemetry_metrics = self.telemetry.complete_swarm(success).await;
785 let result = self.aggregate_results(&all_results).await?;
786
787 tracing::info!(
788 provider_name = %provider_name,
789 "Swarm execution complete: {} subtasks, {:.1}x speedup",
790 all_results.len(),
791 stats.speedup_factor
792 );
793
794 let final_stats = orchestrator.stats().clone();
795 self.try_send_event(SwarmEvent::Complete {
796 success,
797 stats: final_stats.clone(),
798 });
799
800 Ok(SwarmResult {
801 success,
802 result,
803 subtask_results: all_results,
804 stats: final_stats,
805 artifacts,
806 error: None,
807 })
808 }
809
810 async fn execute_stage(
812 &self,
813 orchestrator: &Orchestrator,
814 subtasks: Vec<SubTask>,
815 completed_results: Arc<RwLock<HashMap<String, String>>>,
816 swarm_id: &str,
817 ) -> Result<Vec<SubTaskResult>> {
818 if self.config.execution_mode == ExecutionMode::KubernetesPod {
819 return self
820 .execute_stage_kubernetes(orchestrator, subtasks, completed_results, swarm_id)
821 .await;
822 }
823
824 let mut handles: FuturesUnordered<
825 tokio::task::JoinHandle<(String, Result<SubTaskResult, anyhow::Error>)>,
826 > = FuturesUnordered::new();
827 let mut abort_handles: HashMap<String, AbortHandle> = HashMap::new();
828 let mut task_ids: HashMap<tokio::task::Id, String> = HashMap::new();
829 let mut active_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
830 let mut all_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
831 let mut cached_results: Vec<SubTaskResult> = Vec::new();
832 let mut completed_entries: Vec<(SubTaskResult, Option<WorktreeInfo>)> = Vec::new();
833 let mut subtask_assignments: HashMap<String, (String, String)> = HashMap::new();
836 let mut kill_reasons: HashMap<String, String> = HashMap::new();
837 let mut promoted_subtask_id: Option<String> = None;
838
839 let semaphore = Arc::new(tokio::sync::Semaphore::new(
841 self.config.max_concurrent_requests,
842 ));
843 let delay_ms = self.config.request_delay_ms;
844
845 let model = orchestrator.model().to_string();
847 let provider_name = orchestrator.provider().to_string();
848 let providers = orchestrator.providers();
849 let provider = providers
850 .get(&provider_name)
851 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
852
853 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
854
855 let base_tool_registry =
857 ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
858 let tool_definitions = base_tool_registry.definitions();
859
860 let result_store = Arc::clone(&self.result_store);
862
863 let worktree_manager = if self.config.worktree_enabled {
865 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
866 std::env::current_dir()
867 .map(|p| p.to_string_lossy().to_string())
868 .unwrap_or_else(|_| ".".to_string())
869 });
870
871 let mgr = WorktreeManager::new(&working_dir);
872 tracing::info!(
873 working_dir = %working_dir,
874 "Worktree isolation enabled for parallel sub-agents"
875 );
876 Some(Arc::new(mgr) as Arc<WorktreeManager>)
877 } else {
878 None
879 };
880
881 let mut pending_subtasks: Vec<SubTask> = Vec::with_capacity(subtasks.len());
885 for subtask in subtasks.into_iter() {
886 if let Some(ref cache) = self.cache {
887 let mut cache_guard = cache.lock().await;
888 if let Some(cached_result) = cache_guard.get(&subtask).await {
889 tracing::info!(
890 subtask_id = %subtask.id,
891 task_name = %subtask.name,
892 "Cache hit for subtask, skipping execution"
893 );
894 self.try_send_event(SwarmEvent::SubTaskUpdate {
895 id: subtask.id.clone(),
896 name: subtask.name.clone(),
897 status: SubTaskStatus::Completed,
898 agent_name: Some("cached".to_string()),
899 });
900 cached_results.push(cached_result);
901 continue;
902 }
903 }
904 pending_subtasks.push(subtask);
905 }
906
907 let mut worktrees_by_id: HashMap<String, WorktreeInfo> = HashMap::new();
912 if let Some(ref mgr) = worktree_manager {
913 let ids: Vec<String> = pending_subtasks
919 .iter()
920 .filter(|s| s.needs_worktree())
921 .map(|s| s.id.clone())
922 .collect();
923 let skipped = pending_subtasks.len().saturating_sub(ids.len());
924 if skipped > 0 {
925 tracing::info!(
926 skipped,
927 total = pending_subtasks.len(),
928 "Skipping worktree creation for read-only sub-agents"
929 );
930 }
931 worktrees_by_id = precreate_worktrees(Arc::clone(mgr), ids).await;
932 for (sid, wt) in &worktrees_by_id {
933 active_worktrees.insert(sid.clone(), wt.clone());
934 all_worktrees.insert(sid.clone(), wt.clone());
935 }
936 let mut runnable = Vec::with_capacity(pending_subtasks.len());
937 for subtask in pending_subtasks.into_iter() {
938 if subtask.needs_worktree() && !worktrees_by_id.contains_key(&subtask.id) {
939 cached_results.push(required_worktree_failure(&subtask));
940 } else {
941 runnable.push(subtask);
942 }
943 }
944 pending_subtasks = runnable;
945 }
946
947 for (idx, subtask) in pending_subtasks.into_iter().enumerate() {
948 let (chosen_provider, chosen_model) = if let Some(ref state) = self.delegation {
950 match state.lock() {
951 Ok(guard) => super::delegation::choose_provider_for_subtask(
952 &providers,
953 &guard,
954 subtask.specialty.as_deref().unwrap_or("General"),
955 &provider_name,
956 &model,
957 ),
958 Err(e) => {
959 tracing::warn!("delegation state mutex poisoned; recovering");
960 let guard = e.into_inner();
961 super::delegation::choose_provider_for_subtask(
962 &providers,
963 &guard,
964 subtask.specialty.as_deref().unwrap_or("General"),
965 &provider_name,
966 &model,
967 )
968 }
969 }
970 } else {
971 (provider_name.clone(), model.clone())
972 };
973 let subtask_provider = providers
974 .get(&chosen_provider)
975 .unwrap_or_else(|| Arc::clone(&provider));
976 let model = chosen_model;
977 subtask_assignments.insert(
978 subtask.id.clone(),
979 (
980 chosen_provider.clone(),
981 subtask
982 .specialty
983 .clone()
984 .unwrap_or_else(|| "General".into()),
985 ),
986 );
987 let _provider_name = chosen_provider;
988 let provider = subtask_provider;
989
990 let context = {
992 let completed = completed_results.read().await;
993 let mut dep_context = String::new();
994 for dep_id in &subtask.dependencies {
995 if let Some(result) = completed.get(dep_id) {
996 dep_context.push_str(&format!(
997 "\n--- Result from dependency {} ---\n{}\n",
998 dep_id, result
999 ));
1000 }
1001 }
1002 dep_context
1003 };
1004
1005 let instruction = subtask.instruction.clone();
1006 let subtask_name = subtask.name.clone();
1007 let specialty = subtask.specialty.clone().unwrap_or_default();
1008 let read_only_task = tool_policy::is_read_only_task(&subtask);
1009 let subtask_id = subtask.id.clone();
1010 let subtask_id_for_handle = subtask_id.clone();
1011 let max_steps = self.config.max_steps_per_subagent;
1012 let timeout_secs = self.config.subagent_timeout_secs;
1013
1014 let max_retries = self.config.max_retries;
1016 let base_delay_ms = self.config.base_delay_ms;
1017 let max_delay_ms = self.config.max_delay_ms;
1018
1019 let worktree_info = worktrees_by_id.remove(&subtask_id);
1022
1023 let working_dir = worktree_info
1024 .as_ref()
1025 .map(|wt| wt.path.display().to_string())
1026 .unwrap_or_else(|| ".".to_string());
1027 let working_dir_path = worktree_info.as_ref().map(|wt| wt.path.clone());
1028
1029 let tools = tool_policy::definitions(&tool_definitions, read_only_task);
1031 let _base_registry = Arc::clone(&base_tool_registry);
1032 let agent_result_store = Arc::clone(&result_store);
1033 let sem = Arc::clone(&semaphore);
1034 let stagger_delay = delay_ms * idx as u64; let event_tx = self.event_tx.clone();
1036
1037 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
1039
1040 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
1042
1043 let handle = tokio::spawn(async move {
1045 if stagger_delay > 0 {
1047 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
1048 }
1049 let _permit = match sem.acquire().await {
1050 Ok(permit) => permit,
1051 Err(_) => {
1052 return (
1053 subtask_id.clone(),
1054 Err(anyhow::anyhow!("Swarm execution cancelled")),
1055 );
1056 }
1057 };
1058
1059 let _agent_start = Instant::now();
1060
1061 let start = Instant::now();
1062
1063 let working_path = std::path::Path::new(&working_dir);
1065 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
1066 .map(|(content, _)| {
1067 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
1068 })
1069 .unwrap_or_default();
1070
1071 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
1073 let system_prompt = tool_policy::system_prompt(tool_policy::SystemPromptInput {
1074 specialty: &specialty,
1075 subtask_id: &subtask_id,
1076 working_dir: &working_dir,
1077 model: &model,
1078 prd_filename: &prd_filename,
1079 agents_md: &agents_md_content,
1080 read_only: read_only_task,
1081 });
1082
1083 let user_prompt = if context.is_empty() {
1084 format!("Complete this task:\n\n{}", instruction)
1085 } else {
1086 format!(
1087 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
1088 instruction, context
1089 )
1090 };
1091
1092 if let Some(ref tx) = event_tx {
1094 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1095 id: subtask_id.clone(),
1096 name: subtask_name.clone(),
1097 status: SubTaskStatus::Running,
1098 agent_name: Some(format!("agent-{}", subtask_id)),
1099 });
1100 let _ = tx.try_send(SwarmEvent::AgentStarted {
1101 subtask_id: subtask_id.clone(),
1102 agent_name: format!("agent-{}", subtask_id),
1103 specialty: specialty.clone(),
1104 });
1105 }
1106
1107 let mut agent_registry =
1110 ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1111 agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1112 Arc::clone(&agent_result_store),
1113 subtask_id.clone(),
1114 )));
1115 let batch_tool = Arc::new(crate::tool::batch::BatchTool::new());
1116 agent_registry.register(batch_tool.clone());
1117 let mut search_providers = crate::provider::ProviderRegistry::new();
1118 search_providers.register(Arc::clone(&provider));
1119 agent_registry.register_search(Arc::new(search_providers));
1120 tool_policy::restrict_registry(&mut agent_registry, read_only_task);
1121 let registry = Arc::new(agent_registry);
1122 batch_tool.set_registry(Arc::downgrade(®istry));
1123
1124 let mut attempt = 0u32;
1126 let mut result: Result<(String, usize, usize, AgentLoopExit), anyhow::Error> =
1127 Err(anyhow::anyhow!("Not executed"));
1128
1129 while attempt <= max_retries {
1130 let _attempt_start = Instant::now();
1131
1132 result = run_agent_loop(
1134 Arc::clone(&provider),
1135 &model,
1136 &system_prompt,
1137 &user_prompt,
1138 tools.clone(),
1139 Arc::clone(®istry),
1140 max_steps,
1141 timeout_secs,
1142 event_tx.clone(),
1143 subtask_id.clone(),
1144 None,
1145 working_dir_path.clone(),
1146 )
1147 .await;
1148
1149 match &result {
1151 Ok((_, _, _, exit_reason)) => {
1152 if *exit_reason == AgentLoopExit::Completed {
1153 tracing::info!(
1155 subtask_id = %subtask_id,
1156 attempt = attempt + 1,
1157 "Sub-agent completed successfully on first attempt"
1158 );
1159 break;
1160 }
1161 let should_retry = attempt < max_retries;
1163 if should_retry {
1164 let delay = calculate_backoff_delay(
1165 attempt,
1166 base_delay_ms,
1167 max_delay_ms,
1168 2.0,
1169 );
1170 tracing::warn!(
1171 subtask_id = %subtask_id,
1172 attempt = attempt + 1,
1173 max_retries = max_retries,
1174 exit_reason = ?exit_reason,
1175 delay_ms = delay.as_millis(),
1176 "Sub-agent did not complete, retrying with backoff"
1177 );
1178 tokio::time::sleep(delay).await;
1179 } else {
1180 tracing::warn!(
1181 subtask_id = %subtask_id,
1182 attempt = attempt + 1,
1183 max_retries = max_retries,
1184 exit_reason = ?exit_reason,
1185 "Sub-agent did not complete, retries exhausted"
1186 );
1187 }
1188 }
1189 Err(e) => {
1190 let should_retry = attempt < max_retries;
1192 if should_retry {
1193 let delay = calculate_backoff_delay(
1194 attempt,
1195 base_delay_ms,
1196 max_delay_ms,
1197 2.0,
1198 );
1199 tracing::warn!(
1200 subtask_id = %subtask_id,
1201 attempt = attempt + 1,
1202 max_retries = max_retries,
1203 error = %e,
1204 delay_ms = delay.as_millis(),
1205 "Sub-agent error, retrying with backoff"
1206 );
1207 tokio::time::sleep(delay).await;
1208 } else {
1209 tracing::error!(
1210 subtask_id = %subtask_id,
1211 attempt = attempt + 1,
1212 max_retries = max_retries,
1213 error = %e,
1214 "Sub-agent error, retries exhausted"
1215 );
1216 }
1217 }
1218 }
1219
1220 attempt += 1;
1221 }
1222
1223 let result = result;
1224
1225 let task_result = match result {
1226 Ok((output, steps, tool_calls, exit_reason)) => {
1227 let (success, status, error) = match exit_reason {
1228 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1229 AgentLoopExit::MaxStepsReached => (
1230 false,
1231 SubTaskStatus::Failed,
1232 Some(format!("Sub-agent hit max steps ({max_steps})")),
1233 ),
1234 AgentLoopExit::TimedOut => (
1235 false,
1236 SubTaskStatus::TimedOut,
1237 Some(format!("Sub-agent timed out after {timeout_secs}s")),
1238 ),
1239 };
1240
1241 let total_attempts = attempt + 1;
1243 let actual_retry_attempts = if total_attempts > 1 {
1244 total_attempts - 1
1245 } else {
1246 0
1247 };
1248
1249 if let Some(ref tx) = event_tx {
1251 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1252 id: subtask_id.clone(),
1253 name: subtask_name.clone(),
1254 status,
1255 agent_name: Some(format!("agent-{}", subtask_id)),
1256 });
1257 if let Some(ref message) = error {
1258 let _ = tx.try_send(SwarmEvent::AgentError {
1259 subtask_id: subtask_id.clone(),
1260 error: message.clone(),
1261 });
1262 }
1263 let _ = tx.try_send(SwarmEvent::AgentOutput {
1264 subtask_id: subtask_id.clone(),
1265 output: output.clone(),
1266 });
1267 let _ = tx.try_send(SwarmEvent::AgentComplete {
1268 subtask_id: subtask_id.clone(),
1269 success,
1270 steps,
1271 });
1272 }
1273 Ok(SubTaskResult {
1274 subtask_id: subtask_id.clone(),
1275 subagent_id: format!("agent-{}", subtask_id),
1276 success,
1277 result: output,
1278 steps,
1279 tool_calls,
1280 execution_time_ms: start.elapsed().as_millis() as u64,
1281 error,
1282 artifacts: Vec::new(),
1283 retry_count: actual_retry_attempts,
1284 })
1285 }
1286 Err(e) => {
1287 let total_attempts = attempt + 1;
1289 let actual_retry_attempts = if total_attempts > 1 {
1290 total_attempts - 1
1291 } else {
1292 0
1293 };
1294
1295 if let Some(ref tx) = event_tx {
1297 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1298 id: subtask_id.clone(),
1299 name: subtask_name.clone(),
1300 status: SubTaskStatus::Failed,
1301 agent_name: Some(format!("agent-{}", subtask_id)),
1302 });
1303 let _ = tx.try_send(SwarmEvent::AgentError {
1304 subtask_id: subtask_id.clone(),
1305 error: e.to_string(),
1306 });
1307 let _ = tx.try_send(SwarmEvent::AgentComplete {
1308 subtask_id: subtask_id.clone(),
1309 success: false,
1310 steps: 0,
1311 });
1312 }
1313 Ok(SubTaskResult {
1314 subtask_id: subtask_id.clone(),
1315 subagent_id: format!("agent-{}", subtask_id),
1316 success: false,
1317 result: String::new(),
1318 steps: 0,
1319 tool_calls: 0,
1320 execution_time_ms: start.elapsed().as_millis() as u64,
1321 error: Some(e.to_string()),
1322 artifacts: Vec::new(),
1323 retry_count: actual_retry_attempts,
1324 })
1325 }
1326 };
1327
1328 (subtask_id.clone(), task_result)
1329 });
1330
1331 let abort_handle = handle.abort_handle();
1332 abort_handles.insert(subtask_id_for_handle.clone(), abort_handle);
1333 task_ids.insert(handle.id(), subtask_id_for_handle.clone());
1334 handles.push(handle);
1335 }
1336
1337 let mut collapse_controller = if worktree_manager.is_some() && active_worktrees.len() > 1 {
1339 Some(CollapseController::new(CollapsePolicy::default()))
1340 } else {
1341 None
1342 };
1343 let mut collapse_tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1344 collapse_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1345 let _ = collapse_tick.tick().await;
1347
1348 while !handles.is_empty() {
1349 tokio::select! {
1350 maybe_join = handles.next() => {
1351 let Some(joined) = maybe_join else {
1352 continue;
1353 };
1354 match joined {
1355 Ok((subtask_id, Ok(result))) => {
1356 abort_handles.remove(&subtask_id);
1357 let wt = active_worktrees.remove(&subtask_id).or_else(|| all_worktrees.get(&subtask_id).cloned());
1358 if let (Some(state), Some((prov, spec))) =
1359 (self.delegation.as_ref(), subtask_assignments.get(&subtask_id))
1360 {
1361 super::delegation_outcome::record_subtask_outcome(
1362 state,
1363 prov,
1364 spec,
1365 result.success,
1366 );
1367 }
1368 completed_entries.push((result, wt));
1369 }
1370 Ok((subtask_id, Err(e))) => {
1371 abort_handles.remove(&subtask_id);
1372 active_worktrees.remove(&subtask_id);
1373 let wt = all_worktrees.get(&subtask_id).cloned();
1374 if let (Some(state), Some((prov, spec))) =
1375 (self.delegation.as_ref(), subtask_assignments.get(&subtask_id))
1376 {
1377 super::delegation_outcome::record_subtask_outcome(
1378 state, prov, spec, false,
1379 );
1380 }
1381 completed_entries.push((
1382 SubTaskResult {
1383 subtask_id: subtask_id.clone(),
1384 subagent_id: format!("agent-{subtask_id}"),
1385 success: false,
1386 result: String::new(),
1387 steps: 0,
1388 tool_calls: 0,
1389 execution_time_ms: 0,
1390 error: Some(e.to_string()),
1391 artifacts: Vec::new(),
1392 retry_count: 0,
1393 },
1394 wt,
1395 ));
1396 }
1397 Err(e) => {
1398 let subtask_id = task_ids
1399 .remove(&e.id())
1400 .unwrap_or_else(|| "unknown".to_string());
1401 abort_handles.remove(&subtask_id);
1402 active_worktrees.remove(&subtask_id);
1403 let wt = all_worktrees.get(&subtask_id).cloned();
1404 if let (Some(state), Some((prov, spec))) =
1405 (self.delegation.as_ref(), subtask_assignments.get(&subtask_id))
1406 {
1407 super::delegation_outcome::record_subtask_outcome(
1408 state, prov, spec, false,
1409 );
1410 }
1411 completed_entries.push((
1412 SubTaskResult {
1413 subtask_id: subtask_id.clone(),
1414 subagent_id: format!("agent-{subtask_id}"),
1415 success: false,
1416 result: String::new(),
1417 steps: 0,
1418 tool_calls: 0,
1419 execution_time_ms: 0,
1420 error: Some(format!("Task join error: {e}")),
1421 artifacts: Vec::new(),
1422 retry_count: 0,
1423 },
1424 wt,
1425 ));
1426 }
1427 }
1428 }
1429 _ = collapse_tick.tick(), if collapse_controller.is_some() && !active_worktrees.is_empty() => {
1430 let branches: Vec<BranchRuntimeState> = active_worktrees
1431 .iter()
1432 .map(|(subtask_id, wt)| BranchRuntimeState {
1433 subtask_id: subtask_id.clone(),
1434 branch: wt.branch.clone(),
1435 worktree_path: wt.path.clone(),
1436 })
1437 .collect();
1438
1439 if let Some(controller) = collapse_controller.as_mut() {
1440 match controller.sample(&branches) {
1441 Ok(tick) => {
1442 if promoted_subtask_id != tick.promoted_subtask_id {
1443 promoted_subtask_id = tick.promoted_subtask_id.clone();
1444 if let Some(ref promoted) = promoted_subtask_id {
1445 tracing::info!(
1446 subtask_id = %promoted,
1447 "Collapse controller promoted branch"
1448 );
1449 if let Some(audit) = crate::audit::try_audit_log() {
1450 audit.log_with_correlation(
1451 crate::audit::AuditCategory::Swarm,
1452 "collapse_promote_branch",
1453 crate::audit::AuditOutcome::Success,
1454 Some("collapse-controller".to_string()),
1455 Some(serde_json::json!({
1456 "swarm_id": swarm_id,
1457 "subtask_id": promoted,
1458 })),
1459 None,
1460 None,
1461 Some(swarm_id.to_string()),
1462 None,
1463 ).await;
1464 }
1465 }
1466 }
1467
1468 for kill in tick.kills {
1469 if kill_reasons.contains_key(&kill.subtask_id) {
1470 continue;
1471 }
1472 let Some(abort_handle) = abort_handles.get(&kill.subtask_id) else {
1473 continue;
1474 };
1475
1476 abort_handle.abort();
1477 abort_handles.remove(&kill.subtask_id);
1478 active_worktrees.remove(&kill.subtask_id);
1479 kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1480
1481 tracing::warn!(
1482 subtask_id = %kill.subtask_id,
1483 branch = %kill.branch,
1484 reason = %kill.reason,
1485 "Collapse controller killed branch"
1486 );
1487
1488 if let Some(ref tx) = self.event_tx {
1489 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1490 id: kill.subtask_id.clone(),
1491 name: kill.subtask_id.clone(),
1492 status: SubTaskStatus::Cancelled,
1493 agent_name: Some(format!("agent-{}", kill.subtask_id)),
1494 });
1495 let _ = tx.try_send(SwarmEvent::AgentError {
1496 subtask_id: kill.subtask_id.clone(),
1497 error: format!("Cancelled by collapse controller: {}", kill.reason),
1498 });
1499 }
1500
1501 if let Some(audit) = crate::audit::try_audit_log() {
1502 audit.log_with_correlation(
1503 crate::audit::AuditCategory::Swarm,
1504 "collapse_kill_branch",
1505 crate::audit::AuditOutcome::Success,
1506 Some("collapse-controller".to_string()),
1507 Some(serde_json::json!({
1508 "swarm_id": swarm_id,
1509 "subtask_id": kill.subtask_id,
1510 "branch": kill.branch,
1511 "reason": kill.reason,
1512 })),
1513 None,
1514 None,
1515 Some(swarm_id.to_string()),
1516 None,
1517 ).await;
1518 }
1519 }
1520 }
1521 Err(e) => {
1522 tracing::warn!(error = %e, "Collapse controller sampling failed");
1523 }
1524 }
1525 }
1526 }
1527 }
1528 }
1529
1530 if let Some(ref promoted) = promoted_subtask_id {
1532 completed_entries.sort_by_key(|(result, _)| {
1533 if &result.subtask_id == promoted {
1534 0usize
1535 } else {
1536 1usize
1537 }
1538 });
1539 }
1540
1541 let mut results = cached_results;
1542 let auto_merge = self.config.worktree_auto_merge;
1543
1544 for (mut result, worktree_info) in completed_entries {
1545 if let Some(wt) = worktree_info {
1547 if let Some(reason) = kill_reasons.get(&result.subtask_id) {
1548 result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1549 result.result.push_str(&format!(
1550 "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1551 ));
1552 if let Some(ref mgr) = worktree_manager
1553 && let Err(e) = mgr.cleanup(&wt.name).await
1554 {
1555 tracing::warn!(error = %e, "Failed to cleanup killed worktree");
1556 }
1557 } else if result.success && auto_merge {
1558 if let Some(ref mgr) = worktree_manager {
1559 match mgr.merge(&wt.name).await {
1560 Ok(merge_result) => {
1561 if merge_result.success {
1562 tracing::info!(
1563 subtask_id = %result.subtask_id,
1564 files_changed = merge_result.files_changed,
1565 "Merged worktree changes successfully"
1566 );
1567 result.result.push_str(&format!(
1568 "\n\n--- Merge Result ---\n{}",
1569 merge_result.summary
1570 ));
1571 } else if merge_result.aborted {
1572 tracing::warn!(
1573 subtask_id = %result.subtask_id,
1574 summary = %merge_result.summary,
1575 "Merge was aborted"
1576 );
1577 result.result.push_str(&format!(
1578 "\n\n--- Merge Aborted ---\n{}",
1579 merge_result.summary
1580 ));
1581 } else {
1582 tracing::warn!(
1583 subtask_id = %result.subtask_id,
1584 conflicts = ?merge_result.conflicts,
1585 "Merge had conflicts"
1586 );
1587 result.result.push_str(&format!(
1588 "\n\n--- Merge Conflicts ---\n{}",
1589 merge_result.summary
1590 ));
1591 }
1592 if let Err(e) = mgr.cleanup(&wt.name).await {
1593 tracing::warn!(error = %e, "Failed to cleanup worktree");
1594 }
1595 }
1596 Err(e) => {
1597 tracing::error!(
1598 subtask_id = %result.subtask_id,
1599 error = %e,
1600 "Failed to merge worktree"
1601 );
1602 }
1603 }
1604 }
1605 } else if !result.success {
1606 tracing::info!(
1607 subtask_id = %result.subtask_id,
1608 worktree_path = %wt.path.display(),
1609 "Keeping worktree for debugging (task failed)"
1610 );
1611 }
1612 }
1613
1614 if result.success
1616 && let Some(ref cache_arc) = self.cache
1617 {
1618 let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1619 cache_arc.lock().await;
1620 let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1621 if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1622 tracing::warn!(
1623 subtask_id = %result.subtask_id,
1624 error = %e,
1625 "Failed to cache subtask result"
1626 );
1627 }
1628 }
1629
1630 results.push(result);
1631 }
1632
1633 Ok(results)
1634 }
1635
1636 async fn execute_stage_kubernetes(
1637 &self,
1638 orchestrator: &Orchestrator,
1639 subtasks: Vec<SubTask>,
1640 completed_results: Arc<RwLock<HashMap<String, String>>>,
1641 swarm_id: &str,
1642 ) -> Result<Vec<SubTaskResult>> {
1643 let k8s = K8sManager::new().await;
1644 if !k8s.is_available() {
1645 anyhow::bail!(
1646 "Kubernetes execution mode requested but K8s client is unavailable in this environment"
1647 );
1648 }
1649
1650 let provider_name = orchestrator.provider().to_string();
1651 let model = orchestrator.model().to_string();
1652 let pod_budget = self.config.k8s_pod_budget.max(1);
1653 let mut pending: VecDeque<SubTask> = subtasks.into_iter().collect();
1654 let mut active: HashMap<String, ActiveK8sBranch> = HashMap::new();
1655 let mut subtask_names: HashMap<String, String> = HashMap::new();
1656 let mut results: Vec<SubTaskResult> = Vec::new();
1657 let mut kill_reasons: HashMap<String, String> = HashMap::new();
1658 let mut promoted_subtask_id: Option<String> = None;
1659 let mut collapse_controller = CollapseController::new(CollapsePolicy::default());
1660
1661 let mut tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1662 tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1663 let _ = tick.tick().await;
1664
1665 loop {
1666 while active.len() < pod_budget {
1667 let Some(subtask) = pending.pop_front() else {
1668 break;
1669 };
1670
1671 if let Some(ref cache) = self.cache {
1672 let mut cache_guard = cache.lock().await;
1673 if let Some(cached_result) = cache_guard.get(&subtask).await {
1674 tracing::info!(
1675 subtask_id = %subtask.id,
1676 task_name = %subtask.name,
1677 "Cache hit for subtask, skipping Kubernetes execution"
1678 );
1679 self.try_send_event(SwarmEvent::SubTaskUpdate {
1680 id: subtask.id.clone(),
1681 name: subtask.name.clone(),
1682 status: SubTaskStatus::Completed,
1683 agent_name: Some("cached".to_string()),
1684 });
1685 results.push(cached_result);
1686 continue;
1687 }
1688 }
1689
1690 let context = {
1691 let completed = completed_results.read().await;
1692 let mut dep_context = String::new();
1693 for dep_id in &subtask.dependencies {
1694 if let Some(result) = completed.get(dep_id) {
1695 dep_context.push_str(&format!(
1696 "\n--- Result from dependency {} ---\n{}\n",
1697 dep_id, result
1698 ));
1699 }
1700 }
1701 dep_context
1702 };
1703
1704 let payload = RemoteSubtaskPayload {
1705 swarm_id: swarm_id.to_string(),
1706 subtask_id: subtask.id.clone(),
1707 subtask_name: subtask.name.clone(),
1708 specialty: subtask.specialty.clone().unwrap_or_default(),
1709 instruction: subtask.instruction.clone(),
1710 context: context.clone(),
1711 provider: provider_name.clone(),
1712 model: model.clone(),
1713 max_steps: self.config.max_steps_per_subagent,
1714 timeout_secs: self.config.subagent_timeout_secs,
1715 working_dir: self.config.working_dir.clone(),
1716 read_only: tool_policy::is_read_only_task(&subtask),
1717 probe_interval_secs: COLLAPSE_SAMPLE_SECS,
1718 };
1719 let payload_b64 = match encode_payload(&payload) {
1720 Ok(payload) => payload,
1721 Err(error) => {
1722 let error_text = format!("Failed to encode remote payload: {error}");
1723 tracing::error!(subtask_id = %subtask.id, error = %error, "K8s payload encoding failed");
1724 self.try_send_event(SwarmEvent::SubTaskUpdate {
1725 id: subtask.id.clone(),
1726 name: subtask.name.clone(),
1727 status: SubTaskStatus::Failed,
1728 agent_name: Some("k8s-encoder".to_string()),
1729 });
1730 self.try_send_event(SwarmEvent::AgentError {
1731 subtask_id: subtask.id.clone(),
1732 error: error_text.clone(),
1733 });
1734 results.push(SubTaskResult {
1735 subtask_id: subtask.id.clone(),
1736 subagent_id: format!("agent-{}", subtask.id),
1737 success: false,
1738 result: String::new(),
1739 steps: 0,
1740 tool_calls: 0,
1741 execution_time_ms: 0,
1742 error: Some(error_text),
1743 artifacts: Vec::new(),
1744 retry_count: 0,
1745 });
1746 continue;
1747 }
1748 };
1749
1750 let mut env_vars = HashMap::new();
1751 env_vars.insert(SWARM_SUBTASK_PAYLOAD_ENV.to_string(), payload_b64);
1752 for key in K8S_PASSTHROUGH_ENV_VARS {
1753 if let Ok(value) = std::env::var(key)
1754 && !value.trim().is_empty()
1755 {
1756 env_vars.insert((*key).to_string(), value);
1757 }
1758 }
1759 let mut labels = HashMap::new();
1760 labels.insert("codetether.run/swarm-id".to_string(), swarm_id.to_string());
1761 labels.insert(
1762 "codetether.run/stage".to_string(),
1763 subtask.stage.to_string(),
1764 );
1765
1766 let spec = SubagentPodSpec {
1767 image: self.config.k8s_subagent_image.clone(),
1768 env_vars,
1769 labels,
1770 command: Some(vec!["sh".to_string(), "-lc".to_string()]),
1771 args: Some(vec![format!(
1772 "exec codetether swarm-subagent --payload-env {payload_env}",
1773 payload_env = SWARM_SUBTASK_PAYLOAD_ENV,
1774 )]),
1775 };
1776
1777 if let Err(error) = k8s.spawn_subagent_pod_with_spec(&subtask.id, spec).await {
1778 let error_text = format!("Failed to spawn Kubernetes pod: {error}");
1779 tracing::error!(subtask_id = %subtask.id, error = %error, "K8s sub-agent pod spawn failed");
1780 self.try_send_event(SwarmEvent::SubTaskUpdate {
1781 id: subtask.id.clone(),
1782 name: subtask.name.clone(),
1783 status: SubTaskStatus::Failed,
1784 agent_name: Some("k8s-spawn".to_string()),
1785 });
1786 self.try_send_event(SwarmEvent::AgentError {
1787 subtask_id: subtask.id.clone(),
1788 error: error_text.clone(),
1789 });
1790 results.push(SubTaskResult {
1791 subtask_id: subtask.id.clone(),
1792 subagent_id: format!("agent-{}", subtask.id),
1793 success: false,
1794 result: String::new(),
1795 steps: 0,
1796 tool_calls: 0,
1797 execution_time_ms: 0,
1798 error: Some(error_text),
1799 artifacts: Vec::new(),
1800 retry_count: 0,
1801 });
1802 continue;
1803 }
1804
1805 let branch = K8sManager::subagent_pod_name(&subtask.id);
1806 subtask_names.insert(subtask.id.clone(), subtask.name.clone());
1807 active.insert(
1808 subtask.id.clone(),
1809 ActiveK8sBranch {
1810 branch: branch.clone(),
1811 started_at: Instant::now(),
1812 },
1813 );
1814
1815 self.try_send_event(SwarmEvent::SubTaskUpdate {
1816 id: subtask.id.clone(),
1817 name: subtask.name.clone(),
1818 status: SubTaskStatus::Running,
1819 agent_name: Some(format!("k8s-{branch}")),
1820 });
1821 self.try_send_event(SwarmEvent::AgentStarted {
1822 subtask_id: subtask.id.clone(),
1823 agent_name: format!("k8s-{branch}"),
1824 specialty: subtask
1825 .specialty
1826 .clone()
1827 .unwrap_or_else(|| "generalist".to_string()),
1828 });
1829
1830 tracing::info!(
1831 subtask_id = %subtask.id,
1832 pod = %branch,
1833 "Spawned Kubernetes sub-agent pod"
1834 );
1835 }
1836
1837 if pending.is_empty() && active.is_empty() {
1838 break;
1839 }
1840
1841 tick.tick().await;
1842
1843 let active_ids: Vec<String> = active.keys().cloned().collect();
1844 let mut finished_results: Vec<SubTaskResult> = Vec::new();
1845 for subtask_id in active_ids {
1846 let Some(active_state) = active.get(&subtask_id).cloned() else {
1847 continue;
1848 };
1849
1850 if active_state.started_at.elapsed()
1851 > Duration::from_secs(self.config.subagent_timeout_secs)
1852 {
1853 let reason = format!(
1854 "Timed out after {}s in Kubernetes pod",
1855 self.config.subagent_timeout_secs
1856 );
1857 kill_reasons.insert(subtask_id.clone(), reason.clone());
1858 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1859 tracing::warn!(
1860 subtask_id = %subtask_id,
1861 error = %error,
1862 "Failed deleting timed-out Kubernetes pod"
1863 );
1864 }
1865 active.remove(&subtask_id);
1866 finished_results.push(SubTaskResult {
1867 subtask_id: subtask_id.clone(),
1868 subagent_id: format!("agent-{subtask_id}"),
1869 success: false,
1870 result: String::new(),
1871 steps: 0,
1872 tool_calls: 0,
1873 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1874 error: Some(reason),
1875 artifacts: Vec::new(),
1876 retry_count: 0,
1877 });
1878 continue;
1879 }
1880
1881 let pod_state = match k8s.get_subagent_pod_state(&subtask_id).await {
1882 Ok(state) => state,
1883 Err(error) => {
1884 tracing::warn!(
1885 subtask_id = %subtask_id,
1886 error = %error,
1887 "Failed to query Kubernetes pod state for sub-agent"
1888 );
1889 continue;
1890 }
1891 };
1892 let Some(pod_state) = pod_state else {
1893 active.remove(&subtask_id);
1894 finished_results.push(SubTaskResult {
1895 subtask_id: subtask_id.clone(),
1896 subagent_id: format!("agent-{subtask_id}"),
1897 success: false,
1898 result: String::new(),
1899 steps: 0,
1900 tool_calls: 0,
1901 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1902 error: Some("Sub-agent pod disappeared".to_string()),
1903 artifacts: Vec::new(),
1904 retry_count: 0,
1905 });
1906 continue;
1907 };
1908
1909 let phase = pod_state.phase.to_ascii_lowercase();
1910 let finished = pod_state.terminated || phase == "succeeded" || phase == "failed";
1911 if !finished {
1912 continue;
1913 }
1914
1915 let logs = k8s
1916 .subagent_logs(&subtask_id, 10_000)
1917 .await
1918 .unwrap_or_default();
1919 let elapsed_ms = active_state.started_at.elapsed().as_millis() as u64;
1920 let mut result = k8s_result::final_result(
1921 &subtask_id,
1922 elapsed_ms,
1923 pod_state.exit_code,
1924 pod_state.reason.as_deref(),
1925 &logs,
1926 );
1927
1928 if let Some(reason) = kill_reasons.get(&subtask_id) {
1929 result.success = false;
1930 result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1931 result.result.push_str(&format!(
1932 "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1933 ));
1934 }
1935
1936 active.remove(&subtask_id);
1937 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1938 tracing::warn!(
1939 subtask_id = %subtask_id,
1940 error = %error,
1941 "Failed deleting completed Kubernetes pod"
1942 );
1943 }
1944 finished_results.push(result);
1945 }
1946
1947 for result in finished_results {
1948 if result.success {
1949 completed_results
1950 .write()
1951 .await
1952 .insert(result.subtask_id.clone(), result.result.clone());
1953 }
1954 if result.success
1955 && let Some(ref cache_arc) = self.cache
1956 {
1957 let mut cache_guard = cache_arc.lock().await;
1958 let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1959 let _ = cache_guard.put(&cache_subtask, &result).await;
1960 }
1961
1962 self.try_send_event(SwarmEvent::SubTaskUpdate {
1963 id: result.subtask_id.clone(),
1964 name: subtask_names
1965 .get(&result.subtask_id)
1966 .cloned()
1967 .unwrap_or_else(|| result.subtask_id.clone()),
1968 status: if result.success {
1969 SubTaskStatus::Completed
1970 } else {
1971 SubTaskStatus::Failed
1972 },
1973 agent_name: Some(format!("k8s-{}", result.subtask_id)),
1974 });
1975 if let Some(ref error) = result.error {
1976 self.try_send_event(SwarmEvent::AgentError {
1977 subtask_id: result.subtask_id.clone(),
1978 error: error.clone(),
1979 });
1980 }
1981 self.try_send_event(SwarmEvent::AgentOutput {
1982 subtask_id: result.subtask_id.clone(),
1983 output: result.result.clone(),
1984 });
1985 self.try_send_event(SwarmEvent::AgentComplete {
1986 subtask_id: result.subtask_id.clone(),
1987 success: result.success,
1988 steps: result.steps,
1989 });
1990 results.push(result);
1991 }
1992
1993 if active.len() > 1 {
1994 let mut observations = Vec::with_capacity(active.len());
1995 for (subtask_id, state) in &active {
1996 let pod_state = match k8s.get_subagent_pod_state(subtask_id).await {
1997 Ok(state) => state,
1998 Err(error) => {
1999 tracing::warn!(
2000 subtask_id = %subtask_id,
2001 error = %error,
2002 "Failed to query pod state while sampling branch observation"
2003 );
2004 None
2005 }
2006 };
2007 let (resource_health_score, infra_unhealthy_signals) =
2008 compute_resource_health(pod_state.as_ref());
2009
2010 let logs = k8s.subagent_logs(subtask_id, 500).await.unwrap_or_default();
2011 if let Some(probe) = latest_probe_from_logs(&logs) {
2012 let compile_ok = pod_state
2013 .as_ref()
2014 .map(|p| probe.compile_ok && !p.phase.eq_ignore_ascii_case("failed"))
2015 .unwrap_or(probe.compile_ok);
2016 observations.push(BranchObservation {
2017 subtask_id: subtask_id.clone(),
2018 branch: state.branch.clone(),
2019 compile_ok,
2020 changed_files: probe_changed_files_set(&probe),
2021 changed_lines: probe.changed_lines,
2022 resource_health_score,
2023 infra_unhealthy_signals,
2024 });
2025 continue;
2026 }
2027 let compile_ok = pod_state
2028 .as_ref()
2029 .map(|p| !p.phase.eq_ignore_ascii_case("failed"))
2030 .unwrap_or(false);
2031 observations.push(BranchObservation {
2032 subtask_id: subtask_id.clone(),
2033 branch: state.branch.clone(),
2034 compile_ok,
2035 changed_files: std::collections::HashSet::new(),
2036 changed_lines: 0,
2037 resource_health_score,
2038 infra_unhealthy_signals,
2039 });
2040 }
2041
2042 let tick = collapse_controller.sample_observations(&observations);
2043 if promoted_subtask_id != tick.promoted_subtask_id {
2044 promoted_subtask_id = tick.promoted_subtask_id.clone();
2045 if let Some(ref promoted) = promoted_subtask_id {
2046 tracing::info!(subtask_id = %promoted, "Collapse controller promoted branch");
2047 if let Some(audit) = crate::audit::try_audit_log() {
2048 audit
2049 .log_with_correlation(
2050 crate::audit::AuditCategory::Swarm,
2051 "collapse_promote_branch",
2052 crate::audit::AuditOutcome::Success,
2053 Some("collapse-controller".to_string()),
2054 Some(serde_json::json!({
2055 "swarm_id": swarm_id,
2056 "subtask_id": promoted,
2057 "execution_mode": "kubernetes_pod",
2058 })),
2059 None,
2060 None,
2061 Some(swarm_id.to_string()),
2062 None,
2063 )
2064 .await;
2065 }
2066 }
2067 }
2068
2069 for kill in tick.kills {
2070 if kill_reasons.contains_key(&kill.subtask_id) {
2071 continue;
2072 }
2073 if !active.contains_key(&kill.subtask_id) {
2074 continue;
2075 }
2076
2077 if let Err(error) = k8s.delete_subagent_pod(&kill.subtask_id).await {
2078 tracing::warn!(
2079 subtask_id = %kill.subtask_id,
2080 error = %error,
2081 "Failed deleting Kubernetes pod after collapse kill"
2082 );
2083 }
2084 kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
2085 let elapsed_ms = active
2086 .remove(&kill.subtask_id)
2087 .map(|s| s.started_at.elapsed().as_millis() as u64)
2088 .unwrap_or(0);
2089
2090 tracing::warn!(
2091 subtask_id = %kill.subtask_id,
2092 branch = %kill.branch,
2093 reason = %kill.reason,
2094 "Collapse controller killed Kubernetes branch"
2095 );
2096
2097 if let Some(audit) = crate::audit::try_audit_log() {
2098 audit
2099 .log_with_correlation(
2100 crate::audit::AuditCategory::Swarm,
2101 "collapse_kill_branch",
2102 crate::audit::AuditOutcome::Success,
2103 Some("collapse-controller".to_string()),
2104 Some(serde_json::json!({
2105 "swarm_id": swarm_id,
2106 "subtask_id": kill.subtask_id.clone(),
2107 "branch": kill.branch.clone(),
2108 "reason": kill.reason.clone(),
2109 "execution_mode": "kubernetes_pod",
2110 })),
2111 None,
2112 None,
2113 Some(swarm_id.to_string()),
2114 None,
2115 )
2116 .await;
2117 }
2118
2119 self.try_send_event(SwarmEvent::SubTaskUpdate {
2120 id: kill.subtask_id.clone(),
2121 name: subtask_names
2122 .get(&kill.subtask_id)
2123 .cloned()
2124 .unwrap_or_else(|| kill.subtask_id.clone()),
2125 status: SubTaskStatus::Cancelled,
2126 agent_name: Some(format!("agent-{}", kill.subtask_id)),
2127 });
2128 self.try_send_event(SwarmEvent::AgentError {
2129 subtask_id: kill.subtask_id.clone(),
2130 error: format!("Cancelled by collapse controller: {}", kill.reason),
2131 });
2132
2133 results.push(SubTaskResult {
2134 subtask_id: kill.subtask_id.clone(),
2135 subagent_id: format!("agent-{}", kill.subtask_id),
2136 success: false,
2137 result: format!(
2138 "\n\n--- Collapse Controller ---\nBranch terminated: {}",
2139 kill.reason
2140 ),
2141 steps: 0,
2142 tool_calls: 0,
2143 execution_time_ms: elapsed_ms,
2144 error: Some(format!("Cancelled by collapse controller: {}", kill.reason)),
2145 artifacts: Vec::new(),
2146 retry_count: 0,
2147 });
2148 }
2149 }
2150 }
2151
2152 if let Some(ref promoted) = promoted_subtask_id {
2153 results.sort_by_key(|result| {
2154 if &result.subtask_id == promoted {
2155 0usize
2156 } else {
2157 1usize
2158 }
2159 });
2160 }
2161
2162 if !active.is_empty() {
2163 let residual_ids: Vec<String> = active.keys().cloned().collect();
2164 for subtask_id in residual_ids {
2165 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
2166 tracing::warn!(
2167 subtask_id = %subtask_id,
2168 error = %error,
2169 "Failed deleting residual Kubernetes pod at stage end"
2170 );
2171 }
2172 }
2173 }
2174
2175 Ok(results)
2176 }
2177
2178 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
2180 let mut aggregated = String::new();
2181
2182 for (i, result) in results.iter().enumerate() {
2183 if result.success {
2184 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
2185 } else {
2186 aggregated.push_str(&format!(
2187 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
2188 i + 1,
2189 result.error.as_deref().unwrap_or("Unknown error")
2190 ));
2191 }
2192 }
2193
2194 Ok(aggregated)
2195 }
2196
2197 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
2199 self.execute(task, DecompositionStrategy::None).await
2200 }
2201}
2202
2203pub struct SwarmExecutorBuilder {
2205 config: SwarmConfig,
2206}
2207
2208impl SwarmExecutorBuilder {
2209 pub fn new() -> Self {
2210 Self {
2211 config: SwarmConfig::default(),
2212 }
2213 }
2214
2215 pub fn max_subagents(mut self, max: usize) -> Self {
2216 self.config.max_subagents = max;
2217 self
2218 }
2219
2220 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
2221 self.config.max_steps_per_subagent = max;
2222 self
2223 }
2224
2225 pub fn max_total_steps(mut self, max: usize) -> Self {
2226 self.config.max_total_steps = max;
2227 self
2228 }
2229
2230 pub fn timeout_secs(mut self, secs: u64) -> Self {
2231 self.config.subagent_timeout_secs = secs;
2232 self
2233 }
2234
2235 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
2236 self.config.parallel_enabled = enabled;
2237 self
2238 }
2239
2240 pub fn build(self) -> SwarmExecutor {
2241 SwarmExecutor::new(self.config)
2242 }
2243}
2244
2245impl Default for SwarmExecutorBuilder {
2246 fn default() -> Self {
2247 Self::new()
2248 }
2249}
2250
2251const WORKTREE_CREATE_PARALLELISM: usize = 8;
2256
2257fn required_worktree_failure(subtask: &SubTask) -> SubTaskResult {
2258 let error = "Failed to create required worktree for mutating subtask".to_string();
2259 SubTaskResult {
2260 subtask_id: subtask.id.clone(),
2261 subagent_id: "worktree-preflight".to_string(),
2262 success: false,
2263 result: error.clone(),
2264 steps: 0,
2265 tool_calls: 0,
2266 execution_time_ms: 0,
2267 error: Some(error),
2268 artifacts: Vec::new(),
2269 retry_count: 0,
2270 }
2271}
2272
2273async fn precreate_worktrees(
2284 mgr: Arc<WorktreeManager>,
2285 subtask_ids: Vec<String>,
2286) -> HashMap<String, WorktreeInfo> {
2287 stream::iter(subtask_ids.into_iter().map(|sid| {
2288 let mgr = Arc::clone(&mgr);
2289 async move {
2290 let slug = sid.replace("-", "_");
2291 match mgr.create(&slug).await {
2292 Ok(wt) => {
2293 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
2294 tracing::warn!(
2295 subtask_id = %sid,
2296 error = %e,
2297 "Failed to inject workspace stub into worktree"
2298 );
2299 }
2300 tracing::info!(
2301 subtask_id = %sid,
2302 worktree_path = %wt.path.display(),
2303 worktree_branch = %wt.branch,
2304 "Created worktree for sub-agent"
2305 );
2306 Some((sid, wt))
2307 }
2308 Err(e) => {
2309 tracing::warn!(
2310 subtask_id = %sid,
2311 error = %e,
2312 "Failed to create worktree, sub-agent will use shared directory"
2313 );
2314 None
2315 }
2316 }
2317 }
2318 }))
2319 .buffer_unordered(WORKTREE_CREATE_PARALLELISM)
2320 .filter_map(|x| async move { x })
2321 .collect()
2322 .await
2323}
2324
2325#[allow(clippy::too_many_arguments)]
2327pub async fn run_agent_loop(
2330 provider: Arc<dyn Provider>,
2331 model: &str,
2332 system_prompt: &str,
2333 user_prompt: &str,
2334 tools: Vec<crate::provider::ToolDefinition>,
2335 registry: Arc<ToolRegistry>,
2336 max_steps: usize,
2337 timeout_secs: u64,
2338 event_tx: Option<mpsc::Sender<SwarmEvent>>,
2339 subtask_id: String,
2340 bus: Option<Arc<AgentBus>>,
2341 working_dir: Option<std::path::PathBuf>,
2342) -> Result<(String, usize, usize, AgentLoopExit)> {
2343 let temperature = 0.7;
2345
2346 tracing::info!(
2347 model = %model,
2348 max_steps = max_steps,
2349 timeout_secs = timeout_secs,
2350 "Sub-agent starting agentic loop"
2351 );
2352 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
2353 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
2354
2355 let mut messages = vec![
2357 Message {
2358 role: Role::System,
2359 content: vec![ContentPart::Text {
2360 text: system_prompt.to_string(),
2361 }],
2362 },
2363 Message {
2364 role: Role::User,
2365 content: vec![ContentPart::Text {
2366 text: user_prompt.to_string(),
2367 }],
2368 },
2369 ];
2370
2371 let mut steps = 0;
2372 let mut total_tool_calls = 0;
2373 let mut final_output = String::new();
2374
2375 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
2376
2377 let exit_reason = loop {
2378 if steps >= max_steps {
2379 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
2380 break AgentLoopExit::MaxStepsReached;
2381 }
2382
2383 if Instant::now() > deadline {
2384 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
2385 break AgentLoopExit::TimedOut;
2386 }
2387
2388 steps += 1;
2389 tracing::info!(step = steps, "Sub-agent step starting");
2390
2391 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
2393
2394 let request = CompletionRequest {
2395 messages: messages.clone(),
2396 tools: tools.clone(),
2397 model: model.to_string(),
2398 temperature: Some(temperature),
2399 top_p: None,
2400 max_tokens: Some(8192),
2401 stop: Vec::new(),
2402 };
2403
2404 let step_start = Instant::now();
2405 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
2406 let step_duration = step_start.elapsed();
2407
2408 tracing::info!(
2409 step = steps,
2410 duration_ms = step_duration.as_millis() as u64,
2411 finish_reason = ?response.finish_reason,
2412 prompt_tokens = response.usage.prompt_tokens,
2413 completion_tokens = response.usage.completion_tokens,
2414 "Sub-agent step completed LLM call"
2415 );
2416
2417 let mut text_parts = Vec::new();
2419 let mut thinking_parts = Vec::new();
2420 let mut tool_calls = Vec::new();
2421
2422 for part in &response.message.content {
2423 match part {
2424 ContentPart::Text { text } => {
2425 text_parts.push(text.clone());
2426 }
2427 ContentPart::Thinking { text } if !text.is_empty() => {
2428 thinking_parts.push(text.clone());
2429 }
2430 ContentPart::ToolCall {
2431 id,
2432 name,
2433 arguments,
2434 ..
2435 } => {
2436 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
2437 }
2438 _ => {}
2439 }
2440 }
2441
2442 if !thinking_parts.is_empty()
2444 && let Some(ref bus) = bus
2445 {
2446 let thinking_text = thinking_parts.join("\n");
2447 let handle = bus.handle(&subtask_id);
2448 handle.send(
2449 format!("agent.{subtask_id}.thinking"),
2450 BusMessage::AgentThinking {
2451 agent_id: subtask_id.clone(),
2452 thinking: crate::swarm::live_bus::thinking(&thinking_text),
2453 step: steps,
2454 },
2455 );
2456 }
2457
2458 if !text_parts.is_empty() {
2460 let step_output = text_parts.join("\n");
2461 if !final_output.is_empty() {
2462 final_output.push('\n');
2463 }
2464 final_output.push_str(&step_output);
2465 tracing::info!(
2466 step = steps,
2467 output_len = final_output.len(),
2468 "Sub-agent text output"
2469 );
2470 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
2471
2472 if let Some(ref tx) = event_tx {
2474 let preview = if step_output.len() > 500 {
2475 let mut end = 500;
2476 while end > 0 && !step_output.is_char_boundary(end) {
2477 end -= 1;
2478 }
2479 format!("{}...", &step_output[..end])
2480 } else {
2481 step_output.clone()
2482 };
2483 let _ = tx.try_send(SwarmEvent::AgentMessage {
2484 subtask_id: subtask_id.clone(),
2485 entry: AgentMessageEntry {
2486 role: "assistant".to_string(),
2487 content: preview,
2488 is_tool_call: false,
2489 },
2490 });
2491 }
2492 }
2493
2494 if !tool_calls.is_empty() {
2496 tracing::info!(
2497 step = steps,
2498 num_tool_calls = tool_calls.len(),
2499 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
2500 "Sub-agent requesting tool calls"
2501 );
2502 }
2503
2504 messages.push(response.message.clone());
2506
2507 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
2509 tracing::info!(
2510 steps = steps,
2511 total_tool_calls = total_tool_calls,
2512 "Sub-agent finished"
2513 );
2514 break AgentLoopExit::Completed;
2515 }
2516
2517 let mut tool_results = Vec::new();
2519
2520 for (call_id, tool_name, arguments) in tool_calls {
2521 total_tool_calls += 1;
2522
2523 if let Some(ref tx) = event_tx {
2525 let _ = tx.try_send(SwarmEvent::AgentToolCall {
2526 subtask_id: subtask_id.clone(),
2527 tool_name: tool_name.clone(),
2528 });
2529 }
2530
2531 tracing::info!(
2532 step = steps,
2533 tool_call_id = %call_id,
2534 tool = %tool_name,
2535 "Executing tool"
2536 );
2537 tracing::debug!(
2538 tool = %tool_name,
2539 arguments = %arguments,
2540 "Tool call arguments"
2541 );
2542
2543 let tool_start = Instant::now();
2544 let mut tool_success = true;
2545 let result = if let Some(tool) = registry.get(&tool_name) {
2546 let mut args: serde_json::Value =
2548 serde_json::from_str(&arguments).unwrap_or_else(|e| {
2549 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
2550 serde_json::json!({})
2551 });
2552
2553 let path_denial = working_dir.as_ref().and_then(|wd| {
2555 path_guard::normalize_tool_args(&tool_name, &mut args, wd).err()
2556 });
2557 if let Some(e) = path_denial {
2558 tool_success = false;
2559 tracing::warn!(tool = %tool_name, error = %e, "Tool path policy denied");
2560 format!("Tool path policy denied: {e}")
2561 } else {
2562 let agent_name = format!("agent-{subtask_id}");
2563 let provenance =
2564 ExecutionProvenance::for_operation(&agent_name, ExecutionOrigin::Swarm);
2565 args = enrich_tool_input_with_runtime_context(
2566 &args,
2567 working_dir
2568 .as_deref()
2569 .unwrap_or_else(|| std::path::Path::new(".")),
2570 Some(model),
2571 &subtask_id,
2572 &agent_name,
2573 Some(&provenance),
2574 );
2575
2576 match tool.execute(args).await {
2577 Ok(r) => {
2578 if r.success {
2579 tracing::info!(
2580 tool = %tool_name,
2581 duration_ms = tool_start.elapsed().as_millis() as u64,
2582 success = true,
2583 "Tool execution completed"
2584 );
2585 r.output
2586 } else {
2587 tool_success = false;
2588 tracing::warn!(
2589 tool = %tool_name,
2590 error = %r.output,
2591 "Tool returned error"
2592 );
2593 format!("Tool error: {}", r.output)
2594 }
2595 }
2596 Err(e) => {
2597 tool_success = false;
2598 tracing::error!(
2599 tool = %tool_name,
2600 error = %e,
2601 "Tool execution failed"
2602 );
2603 format!("Tool execution failed: {}", e)
2604 }
2605 }
2606 }
2607 } else {
2608 tool_success = false;
2609 tracing::error!(tool = %tool_name, "Unknown tool requested");
2610 format!("Unknown tool: {}", tool_name)
2611 };
2612
2613 if let Some(ref tx) = event_tx {
2615 let input_preview = if arguments.len() > 200 {
2616 let mut end = 200;
2617 while end > 0 && !arguments.is_char_boundary(end) {
2618 end -= 1;
2619 }
2620 format!("{}...", &arguments[..end])
2621 } else {
2622 arguments.clone()
2623 };
2624 let output_preview = if result.len() > 500 {
2625 let mut end = 500;
2626 while end > 0 && !result.is_char_boundary(end) {
2627 end -= 1;
2628 }
2629 format!("{}...", &result[..end])
2630 } else {
2631 result.clone()
2632 };
2633 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
2634 subtask_id: subtask_id.clone(),
2635 detail: AgentToolCallDetail {
2636 tool_name: tool_name.clone(),
2637 input_preview,
2638 output_preview,
2639 success: tool_success,
2640 },
2641 });
2642 }
2643
2644 tracing::debug!(
2645 tool = %tool_name,
2646 result_len = result.len(),
2647 "Tool result"
2648 );
2649
2650 if let Some(ref bus) = bus {
2653 let handle = bus.handle(&subtask_id);
2654 handle.send(
2655 format!("tools.{tool_name}"),
2656 BusMessage::ToolOutputFull {
2657 agent_id: subtask_id.clone(),
2658 tool_name: tool_name.clone(),
2659 output: crate::swarm::live_bus::tool_output(&result),
2660 success: tool_success,
2661 step: steps,
2662 },
2663 );
2664 }
2665
2666 let result = if result.len() > RLM_THRESHOLD_CHARS {
2668 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
2670 .await
2671 } else {
2672 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
2674 };
2675
2676 tool_results.push((call_id, tool_name, result));
2677 }
2678
2679 for (call_id, _tool_name, result) in tool_results {
2681 messages.push(Message {
2682 role: Role::Tool,
2683 content: vec![ContentPart::ToolResult {
2684 tool_call_id: call_id,
2685 content: result,
2686 }],
2687 });
2688 }
2689
2690 deadline = Instant::now() + Duration::from_secs(timeout_secs);
2692 };
2693
2694 Ok((final_output, steps, total_tool_calls, exit_reason))
2695}