1use super::{
7 BranchObservation, BranchRuntimeState, CacheConfig, CacheStats, CollapseController,
8 CollapsePolicy, DecompositionStrategy, ExecutionMode, StageStats, SwarmCache, SwarmConfig,
9 SwarmResult,
10 kubernetes_executor::{
11 RemoteSubtaskPayload, SWARM_SUBTASK_PAYLOAD_ENV, encode_payload, latest_probe_from_logs,
12 probe_changed_files_set, result_from_logs,
13 },
14 orchestrator::Orchestrator,
15 result_store::ResultStore,
16 subtask::{SubTask, SubTaskResult, SubTaskStatus},
17};
18use crate::bus::{AgentBus, BusMessage};
19use crate::k8s::{K8sManager, SubagentPodSpec, SubagentPodState};
20use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
21
22pub use super::SwarmMessage;
24use crate::{
25 agent::Agent,
26 provenance::{ExecutionOrigin, ExecutionProvenance},
27 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
28 rlm::RlmExecutor,
29 session::helper::runtime::enrich_tool_input_with_runtime_context,
30 swarm::{SwarmArtifact, SwarmStats},
31 telemetry::SwarmTelemetryCollector,
32 tool::ToolRegistry,
33 worktree::{WorktreeInfo, WorktreeManager},
34};
35use anyhow::Result;
36use futures::stream::{self, FuturesUnordered, StreamExt};
37use std::collections::{HashMap, VecDeque};
38use std::sync::Arc;
39use std::time::Instant;
40use tokio::sync::{RwLock, mpsc};
41use tokio::task::AbortHandle;
42use tokio::time::{Duration, MissedTickBehavior, timeout};
43
44const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
46
47const RESPONSE_RESERVE_TOKENS: usize = 8_192;
49
50const TRUNCATION_THRESHOLD: f64 = 0.85;
52
53fn estimate_tokens(text: &str) -> usize {
55 (text.len() as f64 / 3.5).ceil() as usize
59}
60
61fn estimate_message_tokens(message: &Message) -> usize {
63 let mut tokens = 4; for part in &message.content {
66 tokens += match part {
67 ContentPart::Text { text } => estimate_tokens(text),
68 ContentPart::ToolCall {
69 id,
70 name,
71 arguments,
72 ..
73 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
74 ContentPart::ToolResult {
75 tool_call_id,
76 content,
77 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
78 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, ContentPart::Thinking { text } => estimate_tokens(text),
80 };
81 }
82
83 tokens
84}
85
86fn estimate_total_tokens(messages: &[Message]) -> usize {
88 messages.iter().map(estimate_message_tokens).sum()
89}
90
91fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
99 let target_tokens =
100 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
101
102 let current_tokens = estimate_total_tokens(messages);
103 if current_tokens <= target_tokens {
104 return;
105 }
106
107 tracing::warn!(
108 current_tokens = current_tokens,
109 target_tokens = target_tokens,
110 context_limit = context_limit,
111 "Context approaching limit, truncating conversation history"
112 );
113
114 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
118 if after_tool_truncation <= target_tokens {
119 tracing::info!(
120 old_tokens = current_tokens,
121 new_tokens = after_tool_truncation,
122 "Truncated large tool results, context now within limits"
123 );
124 return;
125 }
126
127 if messages.len() <= 6 {
129 tracing::warn!(
130 tokens = after_tool_truncation,
131 target = target_tokens,
132 "Cannot truncate further - conversation too short"
133 );
134 return;
135 }
136
137 let keep_start = 2;
140 let keep_end = 4;
141 let removable_count = messages.len() - keep_start - keep_end;
142
143 if removable_count == 0 {
144 return;
145 }
146
147 let removed_messages: Vec<_> = messages
149 .drain(keep_start..keep_start + removable_count)
150 .collect();
151 let summary = summarize_removed_messages(&removed_messages);
152
153 messages.insert(
155 keep_start,
156 Message {
157 role: Role::User,
158 content: vec![ContentPart::Text {
159 text: format!(
160 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
161 removed_messages.len(),
162 summary
163 ),
164 }],
165 },
166 );
167
168 let new_tokens = estimate_total_tokens(messages);
169 tracing::info!(
170 removed_messages = removed_messages.len(),
171 old_tokens = current_tokens,
172 new_tokens = new_tokens,
173 "Truncated conversation history"
174 );
175}
176
177fn summarize_removed_messages(messages: &[Message]) -> String {
179 let mut summary = String::new();
180 let mut tool_calls: Vec<String> = Vec::new();
181
182 for msg in messages {
183 for part in &msg.content {
184 if let ContentPart::ToolCall { name, .. } = part
185 && !tool_calls.contains(name)
186 {
187 tool_calls.push(name.clone());
188 }
189 }
190 }
191
192 if !tool_calls.is_empty() {
193 summary.push_str(&format!(
194 "Tools used in truncated history: {}",
195 tool_calls.join(", ")
196 ));
197 }
198
199 summary
200}
201
202fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
204 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
206 let mut saved_tokens = 0usize;
207
208 for message in messages.iter_mut() {
209 for part in message.content.iter_mut() {
210 if let ContentPart::ToolResult { content, .. } = part {
211 let tokens = estimate_tokens(content);
212 if tokens > max_tokens_per_result {
213 let old_len = content.len();
214 *content = truncate_single_result(content, char_limit);
215 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
216 if content.len() < old_len {
217 truncated_count += 1;
218 }
219 }
220 }
221 }
222 }
223
224 if truncated_count > 0 {
225 tracing::info!(
226 truncated_count = truncated_count,
227 saved_tokens = saved_tokens,
228 max_tokens_per_result = max_tokens_per_result,
229 "Truncated large tool results"
230 );
231 }
232}
233
234fn truncate_single_result(content: &str, max_chars: usize) -> String {
236 if content.len() <= max_chars {
237 return content.to_string();
238 }
239
240 let safe_limit = {
242 let mut limit = max_chars.min(content.len());
243 while limit > 0 && !content.is_char_boundary(limit) {
244 limit -= 1;
245 }
246 limit
247 };
248
249 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
251
252 let truncated = format!(
253 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
254 &content[..break_point],
255 content.len(),
256 break_point
257 );
258
259 tracing::debug!(
260 original_len = content.len(),
261 truncated_len = truncated.len(),
262 "Truncated large result"
263 );
264
265 truncated
266}
267
268const RLM_THRESHOLD_CHARS: usize = 50_000;
270
271const SIMPLE_TRUNCATE_CHARS: usize = 6000;
273
274const COLLAPSE_SAMPLE_SECS: u64 = 5;
276const SWARM_FALLBACK_PROMPT_ENV: &str = "CODETETHER_SWARM_FALLBACK_PROMPT";
277const SWARM_FALLBACK_MODEL_ENV: &str = "CODETETHER_SWARM_FALLBACK_MODEL";
278const K8S_PASSTHROUGH_ENV_VARS: &[&str] = &[
279 "VAULT_ADDR",
280 "VAULT_TOKEN",
281 "VAULT_MOUNT",
282 "VAULT_SECRETS_PATH",
283 "VAULT_NAMESPACE",
284 "CODETETHER_AUTH_TOKEN",
285];
286
287#[derive(Debug, Clone)]
288struct ActiveK8sBranch {
289 branch: String,
290 started_at: Instant,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294pub enum AgentLoopExit {
295 Completed,
296 MaxStepsReached,
297 TimedOut,
298}
299
300fn calculate_backoff_delay(
304 attempt: u32,
305 initial_delay_ms: u64,
306 max_delay_ms: u64,
307 multiplier: f64,
308) -> Duration {
309 let delay_ms =
310 (initial_delay_ms as f64 * multiplier.powi(attempt as i32)).min(max_delay_ms as f64);
311 Duration::from_millis(delay_ms as u64)
312}
313
314fn compute_resource_health(pod_state: Option<&SubagentPodState>) -> (f32, u32) {
315 let Some(pod_state) = pod_state else {
316 return (0.2, 1);
317 };
318
319 let reason = pod_state
320 .reason
321 .as_deref()
322 .unwrap_or_default()
323 .to_ascii_lowercase();
324 let phase = pod_state.phase.to_ascii_lowercase();
325
326 if reason.contains("oomkilled") {
327 return (0.0, 3);
328 }
329 if reason.contains("imagepullbackoff") || reason.contains("errimagepull") {
330 return (0.0, 3);
331 }
332 if reason.contains("crashloopbackoff") {
333 return (0.1, 2);
334 }
335 if phase == "failed" {
336 return (0.1, 2);
337 }
338
339 let mut score = 1.0f32;
340 let mut unhealthy_signals = 0u32;
341
342 if !pod_state.ready {
343 score -= 0.2;
344 }
345 if !reason.is_empty() {
346 score -= 0.3;
347 unhealthy_signals += 1;
348 }
349 if pod_state.restart_count > 0 {
350 score -= (pod_state.restart_count.min(3) as f32) * 0.2;
351 unhealthy_signals += 1;
352 }
353
354 (score.clamp(0.0, 1.0), unhealthy_signals)
355}
356
357async fn process_large_result_with_rlm(
359 content: &str,
360 tool_name: &str,
361 provider: Arc<dyn Provider>,
362 model: &str,
363) -> String {
364 if content.len() <= SIMPLE_TRUNCATE_CHARS {
365 return content.to_string();
366 }
367
368 if content.len() <= RLM_THRESHOLD_CHARS {
370 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
371 }
372
373 tracing::info!(
375 tool = %tool_name,
376 content_len = content.len(),
377 "Using RLM to process large tool result"
378 );
379
380 let query = format!(
381 "Summarize the key information from this {} output. \
382 Focus on: errors, warnings, important findings, and actionable items. \
383 Be concise but thorough.",
384 tool_name
385 );
386
387 let mut executor =
388 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
389
390 match executor.analyze(&query).await {
391 Ok(result) => {
392 let bounded_answer = truncate_single_result(&result.answer, SIMPLE_TRUNCATE_CHARS * 2);
393 tracing::info!(
394 tool = %tool_name,
395 original_len = content.len(),
396 summary_len = bounded_answer.len(),
397 iterations = result.iterations,
398 "RLM summarized large result"
399 );
400
401 format!(
402 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
403 tool_name,
404 content.len(),
405 bounded_answer.len(),
406 bounded_answer
407 )
408 }
409 Err(e) => {
410 tracing::warn!(
411 tool = %tool_name,
412 error = %e,
413 "RLM analysis failed, falling back to truncation"
414 );
415 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
416 }
417 }
418}
419
420pub struct SwarmExecutor {
422 config: SwarmConfig,
423 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
425 event_tx: Option<mpsc::Sender<SwarmEvent>>,
427 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
429 cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
431 result_store: Arc<ResultStore>,
433 bus: Option<Arc<AgentBus>>,
435}
436
437impl SwarmExecutor {
438 pub fn new(config: SwarmConfig) -> Self {
440 Self {
441 config,
442 coordinator_agent: None,
443 event_tx: None,
444 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
445 cache: None,
446 result_store: ResultStore::new_arc(),
447 bus: None,
448 }
449 }
450
451 pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
453 let cache = SwarmCache::new(cache_config).await?;
454 Ok(Self {
455 config,
456 coordinator_agent: None,
457 event_tx: None,
458 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
459 cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
460 result_store: ResultStore::new_arc(),
461 bus: None,
462 })
463 }
464
465 pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
467 self.cache = Some(cache);
468 self
469 }
470
471 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
473 self.bus = Some(bus);
474 self
475 }
476
477 pub fn bus(&self) -> Option<&Arc<AgentBus>> {
479 self.bus.as_ref()
480 }
481
482 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
484 self.event_tx = Some(tx);
485 self
486 }
487
488 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
490 tracing::debug!("Setting coordinator agent for swarm execution");
491 self.coordinator_agent = Some(agent);
492 self
493 }
494
495 pub fn with_telemetry(
497 mut self,
498 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
499 ) -> Self {
500 self.telemetry = telemetry;
501 self
502 }
503
504 pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
506 Arc::clone(&self.telemetry)
507 }
508 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
510 tracing::debug!(
511 has_coordinator = self.coordinator_agent.is_some(),
512 "Getting coordinator agent"
513 );
514 self.coordinator_agent.as_ref()
515 }
516
517 pub fn result_store(&self) -> &Arc<ResultStore> {
519 &self.result_store
520 }
521
522 pub async fn cache_stats(&self) -> Option<CacheStats> {
524 if let Some(ref cache) = self.cache {
525 let cache_guard = cache.lock().await;
526 Some(cache_guard.stats().clone())
527 } else {
528 None
529 }
530 }
531
532 pub async fn clear_cache(&self) -> Result<()> {
534 if let Some(ref cache) = self.cache {
535 let mut cache_guard = cache.lock().await;
536 cache_guard.clear().await?;
537 }
538 Ok(())
539 }
540
541 pub fn retry_config(&self) -> (u32, u64, u64, f64) {
543 (
544 self.config.max_retries,
545 self.config.base_delay_ms,
546 self.config.max_delay_ms,
547 2.0, )
549 }
550
551 pub fn retries_enabled(&self) -> bool {
553 self.config.max_retries > 0
554 }
555
556 fn try_send_event(&self, event: SwarmEvent) {
558 if let Some(ref bus) = self.bus {
560 let handle = bus.handle("swarm-executor");
561 match &event {
562 SwarmEvent::Started { task, .. } => {
563 handle.send(
564 "broadcast",
565 BusMessage::AgentReady {
566 agent_id: "swarm-executor".to_string(),
567 capabilities: vec![format!("executing:{task}")],
568 },
569 );
570 }
571 SwarmEvent::Complete { success, .. } => {
572 let state = if *success {
573 crate::a2a::types::TaskState::Completed
574 } else {
575 crate::a2a::types::TaskState::Failed
576 };
577 handle.send_task_update("swarm", state, None);
578 }
579 _ => {} }
581 }
582
583 if let Some(ref tx) = self.event_tx {
584 let _ = tx.try_send(event);
585 }
586 }
587
588 pub async fn execute(
590 &self,
591 task: &str,
592 strategy: DecompositionStrategy,
593 ) -> Result<SwarmResult> {
594 let start_time = Instant::now();
595
596 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
598
599 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
600
601 let subtasks = orchestrator.decompose(task, strategy).await?;
603
604 if subtasks.is_empty() {
605 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
606 return Ok(SwarmResult {
607 success: false,
608 result: String::new(),
609 subtask_results: Vec::new(),
610 stats: SwarmStats::default(),
611 artifacts: Vec::new(),
612 error: Some("No subtasks generated".to_string()),
613 });
614 }
615
616 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
617
618 self.try_send_event(SwarmEvent::Started {
619 task: task.to_string(),
620 total_subtasks: subtasks.len(),
621 });
622
623 self.try_send_event(SwarmEvent::Decomposed {
625 subtasks: subtasks
626 .iter()
627 .map(|s| SubTaskInfo {
628 id: s.id.clone(),
629 name: s.name.clone(),
630 status: SubTaskStatus::Pending,
631 stage: s.stage,
632 dependencies: s.dependencies.clone(),
633 agent_name: s.specialty.clone(),
634 current_tool: None,
635 steps: 0,
636 max_steps: self.config.max_steps_per_subagent,
637 tool_call_history: Vec::new(),
638 messages: Vec::new(),
639 output: None,
640 error: None,
641 })
642 .collect(),
643 });
644
645 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
647 let mut all_results: Vec<SubTaskResult> = Vec::new();
648 let artifacts: Vec<SwarmArtifact> = Vec::new();
649
650 let swarm_id = uuid::Uuid::new_v4().to_string();
652 let strategy_str = format!("{:?}", strategy);
653 self.telemetry
654 .lock()
655 .await
656 .start_swarm(&swarm_id, subtasks.len(), &strategy_str)
657 .await;
658
659 let completed_results: Arc<RwLock<HashMap<String, String>>> =
661 Arc::new(RwLock::new(HashMap::new()));
662
663 for stage in 0..=max_stage {
664 let stage_start = Instant::now();
665
666 let stage_subtasks: Vec<SubTask> = orchestrator
667 .subtasks_for_stage(stage)
668 .into_iter()
669 .cloned()
670 .collect();
671
672 tracing::debug!(
673 "Stage {} has {} subtasks (max_stage={})",
674 stage,
675 stage_subtasks.len(),
676 max_stage
677 );
678
679 if stage_subtasks.is_empty() {
680 continue;
681 }
682
683 tracing::info!(
684 provider_name = %orchestrator.provider(),
685 "Executing stage {} with {} subtasks",
686 stage,
687 stage_subtasks.len()
688 );
689
690 let stage_results = self
692 .execute_stage(
693 &orchestrator,
694 stage_subtasks,
695 completed_results.clone(),
696 &swarm_id,
697 )
698 .await?;
699
700 {
702 let mut completed = completed_results.write().await;
703 for result in &stage_results {
704 completed.insert(result.subtask_id.clone(), result.result.clone());
705 let tags = vec![
707 format!("stage:{stage}"),
708 format!("subtask:{}", result.subtask_id),
709 ];
710 let _ = self
711 .result_store
712 .publish(
713 &result.subtask_id,
714 &result.subagent_id,
715 &result.result,
716 tags,
717 None,
718 )
719 .await;
720 }
721 }
722
723 let stage_time = stage_start.elapsed().as_millis() as u64;
725 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
726 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
727
728 orchestrator.stats_mut().stages.push(StageStats {
729 stage,
730 subagent_count: stage_results.len(),
731 max_steps,
732 total_steps,
733 execution_time_ms: stage_time,
734 });
735
736 for result in &stage_results {
738 orchestrator.complete_subtask(&result.subtask_id, result.clone());
739 }
740
741 let stage_completed = stage_results.iter().filter(|r| r.success).count();
743 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
744 self.try_send_event(SwarmEvent::StageComplete {
745 stage,
746 completed: stage_completed,
747 failed: stage_failed,
748 });
749
750 all_results.extend(stage_results);
751 }
752
753 let provider_name = orchestrator.provider().to_string();
755
756 self.telemetry
758 .lock()
759 .await
760 .record_swarm_latency("total_execution", start_time.elapsed())
761 .await;
762
763 let stats = orchestrator.stats_mut();
765 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
766 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
767 stats.calculate_critical_path();
768 stats.calculate_speedup();
769
770 let success = all_results.iter().all(|r| r.success);
772
773 let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success).await;
775 let result = self.aggregate_results(&all_results).await?;
776
777 tracing::info!(
778 provider_name = %provider_name,
779 "Swarm execution complete: {} subtasks, {:.1}x speedup",
780 all_results.len(),
781 stats.speedup_factor
782 );
783
784 let final_stats = orchestrator.stats().clone();
785 self.try_send_event(SwarmEvent::Complete {
786 success,
787 stats: final_stats.clone(),
788 });
789
790 Ok(SwarmResult {
791 success,
792 result,
793 subtask_results: all_results,
794 stats: final_stats,
795 artifacts,
796 error: None,
797 })
798 }
799
800 async fn execute_stage(
802 &self,
803 orchestrator: &Orchestrator,
804 subtasks: Vec<SubTask>,
805 completed_results: Arc<RwLock<HashMap<String, String>>>,
806 swarm_id: &str,
807 ) -> Result<Vec<SubTaskResult>> {
808 if self.config.execution_mode == ExecutionMode::KubernetesPod {
809 return self
810 .execute_stage_kubernetes(orchestrator, subtasks, completed_results, swarm_id)
811 .await;
812 }
813
814 let mut handles: FuturesUnordered<
815 tokio::task::JoinHandle<(String, Result<SubTaskResult, anyhow::Error>)>,
816 > = FuturesUnordered::new();
817 let mut abort_handles: HashMap<String, AbortHandle> = HashMap::new();
818 let mut task_ids: HashMap<tokio::task::Id, String> = HashMap::new();
819 let mut active_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
820 let mut all_worktrees: HashMap<String, WorktreeInfo> = HashMap::new();
821 let mut cached_results: Vec<SubTaskResult> = Vec::new();
822 let mut completed_entries: Vec<(SubTaskResult, Option<WorktreeInfo>)> = Vec::new();
823 let mut kill_reasons: HashMap<String, String> = HashMap::new();
824 let mut promoted_subtask_id: Option<String> = None;
825
826 let semaphore = Arc::new(tokio::sync::Semaphore::new(
828 self.config.max_concurrent_requests,
829 ));
830 let delay_ms = self.config.request_delay_ms;
831
832 let model = orchestrator.model().to_string();
834 let provider_name = orchestrator.provider().to_string();
835 let providers = orchestrator.providers();
836 let provider = providers
837 .get(&provider_name)
838 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
839
840 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
841
842 let base_tool_registry =
844 ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
845 let mut tool_definitions: Vec<_> = base_tool_registry
848 .definitions()
849 .into_iter()
850 .filter(|t| t.name != "question")
851 .collect();
852
853 let swarm_share_def = crate::provider::ToolDefinition {
855 name: "swarm_share".to_string(),
856 description: "Share results with other sub-agents in the swarm. Actions: publish \
857 (share a result), get (retrieve a result by key), query_tags (find \
858 results by tags), query_prefix (find results by key prefix), list \
859 (show all shared results)."
860 .to_string(),
861 parameters: serde_json::json!({
862 "type": "object",
863 "properties": {
864 "action": {
865 "type": "string",
866 "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
867 "description": "Action to perform"
868 },
869 "key": {
870 "type": "string",
871 "description": "Result key (for publish/get)"
872 },
873 "value": {
874 "description": "Result value to publish (any JSON value)"
875 },
876 "tags": {
877 "type": "array",
878 "items": {"type": "string"},
879 "description": "Tags for publish or query_tags"
880 },
881 "prefix": {
882 "type": "string",
883 "description": "Key prefix for query_prefix"
884 }
885 },
886 "required": ["action"]
887 }),
888 };
889 tool_definitions.push(swarm_share_def);
890
891 let result_store = Arc::clone(&self.result_store);
893
894 let worktree_manager = if self.config.worktree_enabled {
896 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
897 std::env::current_dir()
898 .map(|p| p.to_string_lossy().to_string())
899 .unwrap_or_else(|_| ".".to_string())
900 });
901
902 let mgr = WorktreeManager::new(&working_dir);
903 tracing::info!(
904 working_dir = %working_dir,
905 "Worktree isolation enabled for parallel sub-agents"
906 );
907 Some(Arc::new(mgr) as Arc<WorktreeManager>)
908 } else {
909 None
910 };
911
912 let mut pending_subtasks: Vec<SubTask> = Vec::with_capacity(subtasks.len());
916 for subtask in subtasks.into_iter() {
917 if let Some(ref cache) = self.cache {
918 let mut cache_guard = cache.lock().await;
919 if let Some(cached_result) = cache_guard.get(&subtask).await {
920 tracing::info!(
921 subtask_id = %subtask.id,
922 task_name = %subtask.name,
923 "Cache hit for subtask, skipping execution"
924 );
925 self.try_send_event(SwarmEvent::SubTaskUpdate {
926 id: subtask.id.clone(),
927 name: subtask.name.clone(),
928 status: SubTaskStatus::Completed,
929 agent_name: Some("cached".to_string()),
930 });
931 cached_results.push(cached_result);
932 continue;
933 }
934 }
935 pending_subtasks.push(subtask);
936 }
937
938 let mut worktrees_by_id: HashMap<String, WorktreeInfo> = HashMap::new();
944 if let Some(ref mgr) = worktree_manager {
945 let ids: Vec<String> = pending_subtasks
951 .iter()
952 .filter(|s| s.needs_worktree())
953 .map(|s| s.id.clone())
954 .collect();
955 let skipped = pending_subtasks.len().saturating_sub(ids.len());
956 if skipped > 0 {
957 tracing::info!(
958 skipped,
959 total = pending_subtasks.len(),
960 "Skipping worktree creation for read-only sub-agents"
961 );
962 }
963 worktrees_by_id = precreate_worktrees(Arc::clone(mgr), ids).await;
964 for (sid, wt) in &worktrees_by_id {
965 active_worktrees.insert(sid.clone(), wt.clone());
966 all_worktrees.insert(sid.clone(), wt.clone());
967 }
968 }
969
970 for (idx, subtask) in pending_subtasks.into_iter().enumerate() {
971 let model = model.clone();
972 let _provider_name = provider_name.clone();
973 let provider = Arc::clone(&provider);
974
975 let context = {
977 let completed = completed_results.read().await;
978 let mut dep_context = String::new();
979 for dep_id in &subtask.dependencies {
980 if let Some(result) = completed.get(dep_id) {
981 dep_context.push_str(&format!(
982 "\n--- Result from dependency {} ---\n{}\n",
983 dep_id, result
984 ));
985 }
986 }
987 dep_context
988 };
989
990 let instruction = subtask.instruction.clone();
991 let subtask_name = subtask.name.clone();
992 let specialty = subtask.specialty.clone().unwrap_or_default();
993 let subtask_id = subtask.id.clone();
994 let subtask_id_for_handle = subtask_id.clone();
995 let max_steps = self.config.max_steps_per_subagent;
996 let timeout_secs = self.config.subagent_timeout_secs;
997
998 let max_retries = self.config.max_retries;
1000 let base_delay_ms = self.config.base_delay_ms;
1001 let max_delay_ms = self.config.max_delay_ms;
1002
1003 let worktree_info = worktrees_by_id.remove(&subtask_id);
1008
1009 let working_dir = worktree_info
1010 .as_ref()
1011 .map(|wt| wt.path.display().to_string())
1012 .unwrap_or_else(|| ".".to_string());
1013 let working_dir_path = worktree_info.as_ref().map(|wt| wt.path.clone());
1014
1015 let tools = tool_definitions.clone();
1017 let _base_registry = Arc::clone(&base_tool_registry);
1018 let agent_result_store = Arc::clone(&result_store);
1019 let sem = Arc::clone(&semaphore);
1020 let stagger_delay = delay_ms * idx as u64; let event_tx = self.event_tx.clone();
1022
1023 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
1025
1026 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
1028
1029 let handle = tokio::spawn(async move {
1031 if stagger_delay > 0 {
1033 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
1034 }
1035 let _permit = match sem.acquire().await {
1036 Ok(permit) => permit,
1037 Err(_) => {
1038 return (
1039 subtask_id.clone(),
1040 Err(anyhow::anyhow!("Swarm execution cancelled")),
1041 );
1042 }
1043 };
1044
1045 let _agent_start = Instant::now();
1046
1047 let start = Instant::now();
1048
1049 let working_path = std::path::Path::new(&working_dir);
1051 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
1052 .map(|(content, _)| {
1053 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
1054 })
1055 .unwrap_or_default();
1056
1057 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
1059 let system_prompt = format!(
1060 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
1061
1062WORKING DIRECTORY: {}
1063All file operations should be relative to this directory.
1064
1065IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
1066
1067Available tools:
1068- read: Read file contents
1069- write: Write/create files
1070- edit: Edit existing files (search and replace)
1071- multiedit: Make multiple edits at once
1072- glob: Find files by pattern
1073- grep: Search file contents
1074- bash: Run shell commands (use cwd: \"{}\" parameter)
1075- webfetch: Fetch web pages
1076- prd: Generate structured PRD for complex tasks
1077- ralph: Run autonomous agent loop on a PRD
1078- swarm_share: Share results with other sub-agents running in parallel
1079- agent: Spawn specialized helper agents when needed (smart delegation)
1080
1081SMART SPAWN POLICY (mandatory):
1082- Any spawned agent MUST use a different model than your current model ('{}')
1083- Spawned model MUST be free/subscription-eligible (e.g. '*:free', openai-codex/*, github-copilot/*, gemini-web/*, local_cuda/*)
1084- Include `model` when calling agent.spawn
1085
1086SHARING RESULTS:
1087Use swarm_share to collaborate with other sub-agents:
1088- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
1089- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
1090- swarm_share({{action: 'list'}}) to see all shared results
1091- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
1092
1093COMPLEX TASKS:
1094If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
10951. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
10962. Break down into user stories with acceptance criteria
10973. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
10984. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
1099
1100NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
1101
1102When done, provide a brief summary of what you accomplished.{agents_md_content}",
1103 specialty,
1104 subtask_id,
1105 working_dir,
1106 working_dir,
1107 model,
1108 prd_filename,
1109 prd_filename,
1110 prd_filename
1111 );
1112
1113 let user_prompt = if context.is_empty() {
1114 format!("Complete this task:\n\n{}", instruction)
1115 } else {
1116 format!(
1117 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
1118 instruction, context
1119 )
1120 };
1121
1122 if let Some(ref tx) = event_tx {
1124 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1125 id: subtask_id.clone(),
1126 name: subtask_name.clone(),
1127 status: SubTaskStatus::Running,
1128 agent_name: Some(format!("agent-{}", subtask_id)),
1129 });
1130 let _ = tx.try_send(SwarmEvent::AgentStarted {
1131 subtask_id: subtask_id.clone(),
1132 agent_name: format!("agent-{}", subtask_id),
1133 specialty: specialty.clone(),
1134 });
1135 }
1136
1137 let mut agent_registry =
1140 ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1141 agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1142 Arc::clone(&agent_result_store),
1143 subtask_id.clone(),
1144 )));
1145 let registry = Arc::new(agent_registry);
1146
1147 let mut attempt = 0u32;
1149 let mut result: Result<(String, usize, usize, AgentLoopExit), anyhow::Error> =
1150 Err(anyhow::anyhow!("Not executed"));
1151
1152 while attempt <= max_retries {
1153 let _attempt_start = Instant::now();
1154
1155 result = run_agent_loop(
1157 Arc::clone(&provider),
1158 &model,
1159 &system_prompt,
1160 &user_prompt,
1161 tools.clone(),
1162 Arc::clone(®istry),
1163 max_steps,
1164 timeout_secs,
1165 event_tx.clone(),
1166 subtask_id.clone(),
1167 None,
1168 working_dir_path.clone(),
1169 )
1170 .await;
1171
1172 match &result {
1174 Ok((_, _, _, exit_reason)) => {
1175 if *exit_reason == AgentLoopExit::Completed {
1176 tracing::info!(
1178 subtask_id = %subtask_id,
1179 attempt = attempt + 1,
1180 "Sub-agent completed successfully on first attempt"
1181 );
1182 break;
1183 }
1184 let should_retry = attempt < max_retries;
1186 if should_retry {
1187 let delay = calculate_backoff_delay(
1188 attempt,
1189 base_delay_ms,
1190 max_delay_ms,
1191 2.0,
1192 );
1193 tracing::warn!(
1194 subtask_id = %subtask_id,
1195 attempt = attempt + 1,
1196 max_retries = max_retries,
1197 exit_reason = ?exit_reason,
1198 delay_ms = delay.as_millis(),
1199 "Sub-agent did not complete, retrying with backoff"
1200 );
1201 tokio::time::sleep(delay).await;
1202 } else {
1203 tracing::warn!(
1204 subtask_id = %subtask_id,
1205 attempt = attempt + 1,
1206 max_retries = max_retries,
1207 exit_reason = ?exit_reason,
1208 "Sub-agent did not complete, retries exhausted"
1209 );
1210 }
1211 }
1212 Err(e) => {
1213 let should_retry = attempt < max_retries;
1215 if should_retry {
1216 let delay = calculate_backoff_delay(
1217 attempt,
1218 base_delay_ms,
1219 max_delay_ms,
1220 2.0,
1221 );
1222 tracing::warn!(
1223 subtask_id = %subtask_id,
1224 attempt = attempt + 1,
1225 max_retries = max_retries,
1226 error = %e,
1227 delay_ms = delay.as_millis(),
1228 "Sub-agent error, retrying with backoff"
1229 );
1230 tokio::time::sleep(delay).await;
1231 } else {
1232 tracing::error!(
1233 subtask_id = %subtask_id,
1234 attempt = attempt + 1,
1235 max_retries = max_retries,
1236 error = %e,
1237 "Sub-agent error, retries exhausted"
1238 );
1239 }
1240 }
1241 }
1242
1243 attempt += 1;
1244 }
1245
1246 let result = result;
1247
1248 let task_result = match result {
1249 Ok((output, steps, tool_calls, exit_reason)) => {
1250 let (success, status, error) = match exit_reason {
1251 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1252 AgentLoopExit::MaxStepsReached => (
1253 false,
1254 SubTaskStatus::Failed,
1255 Some(format!("Sub-agent hit max steps ({max_steps})")),
1256 ),
1257 AgentLoopExit::TimedOut => (
1258 false,
1259 SubTaskStatus::TimedOut,
1260 Some(format!("Sub-agent timed out after {timeout_secs}s")),
1261 ),
1262 };
1263
1264 let total_attempts = attempt + 1;
1266 let actual_retry_attempts = if total_attempts > 1 {
1267 total_attempts - 1
1268 } else {
1269 0
1270 };
1271
1272 if let Some(ref tx) = event_tx {
1274 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1275 id: subtask_id.clone(),
1276 name: subtask_name.clone(),
1277 status,
1278 agent_name: Some(format!("agent-{}", subtask_id)),
1279 });
1280 if let Some(ref message) = error {
1281 let _ = tx.try_send(SwarmEvent::AgentError {
1282 subtask_id: subtask_id.clone(),
1283 error: message.clone(),
1284 });
1285 }
1286 let _ = tx.try_send(SwarmEvent::AgentOutput {
1287 subtask_id: subtask_id.clone(),
1288 output: output.clone(),
1289 });
1290 let _ = tx.try_send(SwarmEvent::AgentComplete {
1291 subtask_id: subtask_id.clone(),
1292 success,
1293 steps,
1294 });
1295 }
1296 Ok(SubTaskResult {
1297 subtask_id: subtask_id.clone(),
1298 subagent_id: format!("agent-{}", subtask_id),
1299 success,
1300 result: output,
1301 steps,
1302 tool_calls,
1303 execution_time_ms: start.elapsed().as_millis() as u64,
1304 error,
1305 artifacts: Vec::new(),
1306 retry_count: actual_retry_attempts,
1307 })
1308 }
1309 Err(e) => {
1310 let total_attempts = attempt + 1;
1312 let actual_retry_attempts = if total_attempts > 1 {
1313 total_attempts - 1
1314 } else {
1315 0
1316 };
1317
1318 if let Some(ref tx) = event_tx {
1320 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1321 id: subtask_id.clone(),
1322 name: subtask_name.clone(),
1323 status: SubTaskStatus::Failed,
1324 agent_name: Some(format!("agent-{}", subtask_id)),
1325 });
1326 let _ = tx.try_send(SwarmEvent::AgentError {
1327 subtask_id: subtask_id.clone(),
1328 error: e.to_string(),
1329 });
1330 let _ = tx.try_send(SwarmEvent::AgentComplete {
1331 subtask_id: subtask_id.clone(),
1332 success: false,
1333 steps: 0,
1334 });
1335 }
1336 Ok(SubTaskResult {
1337 subtask_id: subtask_id.clone(),
1338 subagent_id: format!("agent-{}", subtask_id),
1339 success: false,
1340 result: String::new(),
1341 steps: 0,
1342 tool_calls: 0,
1343 execution_time_ms: start.elapsed().as_millis() as u64,
1344 error: Some(e.to_string()),
1345 artifacts: Vec::new(),
1346 retry_count: actual_retry_attempts,
1347 })
1348 }
1349 };
1350
1351 (subtask_id.clone(), task_result)
1352 });
1353
1354 let abort_handle = handle.abort_handle();
1355 abort_handles.insert(subtask_id_for_handle.clone(), abort_handle);
1356 task_ids.insert(handle.id(), subtask_id_for_handle.clone());
1357 handles.push(handle);
1358 }
1359
1360 let mut collapse_controller = if worktree_manager.is_some() && active_worktrees.len() > 1 {
1362 Some(CollapseController::new(CollapsePolicy::default()))
1363 } else {
1364 None
1365 };
1366 let mut collapse_tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1367 collapse_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1368 let _ = collapse_tick.tick().await;
1370
1371 while !handles.is_empty() {
1372 tokio::select! {
1373 maybe_join = handles.next() => {
1374 let Some(joined) = maybe_join else {
1375 continue;
1376 };
1377 match joined {
1378 Ok((subtask_id, Ok(result))) => {
1379 abort_handles.remove(&subtask_id);
1380 let wt = active_worktrees.remove(&subtask_id).or_else(|| all_worktrees.get(&subtask_id).cloned());
1381 completed_entries.push((result, wt));
1382 }
1383 Ok((subtask_id, Err(e))) => {
1384 abort_handles.remove(&subtask_id);
1385 active_worktrees.remove(&subtask_id);
1386 let wt = all_worktrees.get(&subtask_id).cloned();
1387 completed_entries.push((
1388 SubTaskResult {
1389 subtask_id: subtask_id.clone(),
1390 subagent_id: format!("agent-{subtask_id}"),
1391 success: false,
1392 result: String::new(),
1393 steps: 0,
1394 tool_calls: 0,
1395 execution_time_ms: 0,
1396 error: Some(e.to_string()),
1397 artifacts: Vec::new(),
1398 retry_count: 0,
1399 },
1400 wt,
1401 ));
1402 }
1403 Err(e) => {
1404 let subtask_id = task_ids
1405 .remove(&e.id())
1406 .unwrap_or_else(|| "unknown".to_string());
1407 abort_handles.remove(&subtask_id);
1408 active_worktrees.remove(&subtask_id);
1409 let wt = all_worktrees.get(&subtask_id).cloned();
1410 completed_entries.push((
1411 SubTaskResult {
1412 subtask_id: subtask_id.clone(),
1413 subagent_id: format!("agent-{subtask_id}"),
1414 success: false,
1415 result: String::new(),
1416 steps: 0,
1417 tool_calls: 0,
1418 execution_time_ms: 0,
1419 error: Some(format!("Task join error: {e}")),
1420 artifacts: Vec::new(),
1421 retry_count: 0,
1422 },
1423 wt,
1424 ));
1425 }
1426 }
1427 }
1428 _ = collapse_tick.tick(), if collapse_controller.is_some() && !active_worktrees.is_empty() => {
1429 let branches: Vec<BranchRuntimeState> = active_worktrees
1430 .iter()
1431 .map(|(subtask_id, wt)| BranchRuntimeState {
1432 subtask_id: subtask_id.clone(),
1433 branch: wt.branch.clone(),
1434 worktree_path: wt.path.clone(),
1435 })
1436 .collect();
1437
1438 if let Some(controller) = collapse_controller.as_mut() {
1439 match controller.sample(&branches) {
1440 Ok(tick) => {
1441 if promoted_subtask_id != tick.promoted_subtask_id {
1442 promoted_subtask_id = tick.promoted_subtask_id.clone();
1443 if let Some(ref promoted) = promoted_subtask_id {
1444 tracing::info!(
1445 subtask_id = %promoted,
1446 "Collapse controller promoted branch"
1447 );
1448 if let Some(audit) = crate::audit::try_audit_log() {
1449 audit.log_with_correlation(
1450 crate::audit::AuditCategory::Swarm,
1451 "collapse_promote_branch",
1452 crate::audit::AuditOutcome::Success,
1453 Some("collapse-controller".to_string()),
1454 Some(serde_json::json!({
1455 "swarm_id": swarm_id,
1456 "subtask_id": promoted,
1457 })),
1458 None,
1459 None,
1460 Some(swarm_id.to_string()),
1461 None,
1462 ).await;
1463 }
1464 }
1465 }
1466
1467 for kill in tick.kills {
1468 if kill_reasons.contains_key(&kill.subtask_id) {
1469 continue;
1470 }
1471 let Some(abort_handle) = abort_handles.get(&kill.subtask_id) else {
1472 continue;
1473 };
1474
1475 abort_handle.abort();
1476 abort_handles.remove(&kill.subtask_id);
1477 active_worktrees.remove(&kill.subtask_id);
1478 kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
1479
1480 tracing::warn!(
1481 subtask_id = %kill.subtask_id,
1482 branch = %kill.branch,
1483 reason = %kill.reason,
1484 "Collapse controller killed branch"
1485 );
1486
1487 if let Some(ref tx) = self.event_tx {
1488 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1489 id: kill.subtask_id.clone(),
1490 name: kill.subtask_id.clone(),
1491 status: SubTaskStatus::Cancelled,
1492 agent_name: Some(format!("agent-{}", kill.subtask_id)),
1493 });
1494 let _ = tx.try_send(SwarmEvent::AgentError {
1495 subtask_id: kill.subtask_id.clone(),
1496 error: format!("Cancelled by collapse controller: {}", kill.reason),
1497 });
1498 }
1499
1500 if let Some(audit) = crate::audit::try_audit_log() {
1501 audit.log_with_correlation(
1502 crate::audit::AuditCategory::Swarm,
1503 "collapse_kill_branch",
1504 crate::audit::AuditOutcome::Success,
1505 Some("collapse-controller".to_string()),
1506 Some(serde_json::json!({
1507 "swarm_id": swarm_id,
1508 "subtask_id": kill.subtask_id,
1509 "branch": kill.branch,
1510 "reason": kill.reason,
1511 })),
1512 None,
1513 None,
1514 Some(swarm_id.to_string()),
1515 None,
1516 ).await;
1517 }
1518 }
1519 }
1520 Err(e) => {
1521 tracing::warn!(error = %e, "Collapse controller sampling failed");
1522 }
1523 }
1524 }
1525 }
1526 }
1527 }
1528
1529 if let Some(ref promoted) = promoted_subtask_id {
1531 completed_entries.sort_by_key(|(result, _)| {
1532 if &result.subtask_id == promoted {
1533 0usize
1534 } else {
1535 1usize
1536 }
1537 });
1538 }
1539
1540 let mut results = cached_results;
1541 let auto_merge = self.config.worktree_auto_merge;
1542
1543 for (mut result, worktree_info) in completed_entries {
1544 if let Some(wt) = worktree_info {
1546 if let Some(reason) = kill_reasons.get(&result.subtask_id) {
1547 result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1548 result.result.push_str(&format!(
1549 "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1550 ));
1551 if let Some(ref mgr) = worktree_manager
1552 && let Err(e) = mgr.cleanup(&wt.name).await
1553 {
1554 tracing::warn!(error = %e, "Failed to cleanup killed worktree");
1555 }
1556 } else if result.success && auto_merge {
1557 if let Some(ref mgr) = worktree_manager {
1558 match mgr.merge(&wt.name).await {
1559 Ok(merge_result) => {
1560 if merge_result.success {
1561 tracing::info!(
1562 subtask_id = %result.subtask_id,
1563 files_changed = merge_result.files_changed,
1564 "Merged worktree changes successfully"
1565 );
1566 result.result.push_str(&format!(
1567 "\n\n--- Merge Result ---\n{}",
1568 merge_result.summary
1569 ));
1570 } else if merge_result.aborted {
1571 tracing::warn!(
1572 subtask_id = %result.subtask_id,
1573 summary = %merge_result.summary,
1574 "Merge was aborted"
1575 );
1576 result.result.push_str(&format!(
1577 "\n\n--- Merge Aborted ---\n{}",
1578 merge_result.summary
1579 ));
1580 } else {
1581 tracing::warn!(
1582 subtask_id = %result.subtask_id,
1583 conflicts = ?merge_result.conflicts,
1584 "Merge had conflicts"
1585 );
1586 result.result.push_str(&format!(
1587 "\n\n--- Merge Conflicts ---\n{}",
1588 merge_result.summary
1589 ));
1590 }
1591 if let Err(e) = mgr.cleanup(&wt.name).await {
1592 tracing::warn!(error = %e, "Failed to cleanup worktree");
1593 }
1594 }
1595 Err(e) => {
1596 tracing::error!(
1597 subtask_id = %result.subtask_id,
1598 error = %e,
1599 "Failed to merge worktree"
1600 );
1601 }
1602 }
1603 }
1604 } else if !result.success {
1605 tracing::info!(
1606 subtask_id = %result.subtask_id,
1607 worktree_path = %wt.path.display(),
1608 "Keeping worktree for debugging (task failed)"
1609 );
1610 }
1611 }
1612
1613 if result.success
1615 && let Some(ref cache_arc) = self.cache
1616 {
1617 let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1618 cache_arc.lock().await;
1619 let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1620 if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1621 tracing::warn!(
1622 subtask_id = %result.subtask_id,
1623 error = %e,
1624 "Failed to cache subtask result"
1625 );
1626 }
1627 }
1628
1629 results.push(result);
1630 }
1631
1632 Ok(results)
1633 }
1634
1635 async fn execute_stage_kubernetes(
1636 &self,
1637 orchestrator: &Orchestrator,
1638 subtasks: Vec<SubTask>,
1639 completed_results: Arc<RwLock<HashMap<String, String>>>,
1640 swarm_id: &str,
1641 ) -> Result<Vec<SubTaskResult>> {
1642 let k8s = K8sManager::new().await;
1643 if !k8s.is_available() {
1644 anyhow::bail!(
1645 "Kubernetes execution mode requested but K8s client is unavailable in this environment"
1646 );
1647 }
1648
1649 let provider_name = orchestrator.provider().to_string();
1650 let model = orchestrator.model().to_string();
1651 let pod_budget = self.config.k8s_pod_budget.max(1);
1652 let mut pending: VecDeque<SubTask> = subtasks.into_iter().collect();
1653 let mut active: HashMap<String, ActiveK8sBranch> = HashMap::new();
1654 let mut subtask_names: HashMap<String, String> = HashMap::new();
1655 let mut results: Vec<SubTaskResult> = Vec::new();
1656 let mut kill_reasons: HashMap<String, String> = HashMap::new();
1657 let mut promoted_subtask_id: Option<String> = None;
1658 let mut collapse_controller = CollapseController::new(CollapsePolicy::default());
1659
1660 let mut tick = tokio::time::interval(Duration::from_secs(COLLAPSE_SAMPLE_SECS));
1661 tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
1662 let _ = tick.tick().await;
1663
1664 loop {
1665 while active.len() < pod_budget {
1666 let Some(subtask) = pending.pop_front() else {
1667 break;
1668 };
1669
1670 if let Some(ref cache) = self.cache {
1671 let mut cache_guard = cache.lock().await;
1672 if let Some(cached_result) = cache_guard.get(&subtask).await {
1673 tracing::info!(
1674 subtask_id = %subtask.id,
1675 task_name = %subtask.name,
1676 "Cache hit for subtask, skipping Kubernetes execution"
1677 );
1678 self.try_send_event(SwarmEvent::SubTaskUpdate {
1679 id: subtask.id.clone(),
1680 name: subtask.name.clone(),
1681 status: SubTaskStatus::Completed,
1682 agent_name: Some("cached".to_string()),
1683 });
1684 results.push(cached_result);
1685 continue;
1686 }
1687 }
1688
1689 let context = {
1690 let completed = completed_results.read().await;
1691 let mut dep_context = String::new();
1692 for dep_id in &subtask.dependencies {
1693 if let Some(result) = completed.get(dep_id) {
1694 dep_context.push_str(&format!(
1695 "\n--- Result from dependency {} ---\n{}\n",
1696 dep_id, result
1697 ));
1698 }
1699 }
1700 dep_context
1701 };
1702
1703 let payload = RemoteSubtaskPayload {
1704 swarm_id: swarm_id.to_string(),
1705 subtask_id: subtask.id.clone(),
1706 subtask_name: subtask.name.clone(),
1707 specialty: subtask.specialty.clone().unwrap_or_default(),
1708 instruction: subtask.instruction.clone(),
1709 context: context.clone(),
1710 provider: provider_name.clone(),
1711 model: model.clone(),
1712 max_steps: self.config.max_steps_per_subagent,
1713 timeout_secs: self.config.subagent_timeout_secs,
1714 working_dir: self.config.working_dir.clone(),
1715 probe_interval_secs: COLLAPSE_SAMPLE_SECS,
1716 };
1717 let payload_b64 = match encode_payload(&payload) {
1718 Ok(payload) => payload,
1719 Err(error) => {
1720 let error_text = format!("Failed to encode remote payload: {error}");
1721 tracing::error!(subtask_id = %subtask.id, error = %error, "K8s payload encoding failed");
1722 self.try_send_event(SwarmEvent::SubTaskUpdate {
1723 id: subtask.id.clone(),
1724 name: subtask.name.clone(),
1725 status: SubTaskStatus::Failed,
1726 agent_name: Some("k8s-encoder".to_string()),
1727 });
1728 self.try_send_event(SwarmEvent::AgentError {
1729 subtask_id: subtask.id.clone(),
1730 error: error_text.clone(),
1731 });
1732 results.push(SubTaskResult {
1733 subtask_id: subtask.id.clone(),
1734 subagent_id: format!("agent-{}", subtask.id),
1735 success: false,
1736 result: String::new(),
1737 steps: 0,
1738 tool_calls: 0,
1739 execution_time_ms: 0,
1740 error: Some(error_text),
1741 artifacts: Vec::new(),
1742 retry_count: 0,
1743 });
1744 continue;
1745 }
1746 };
1747
1748 let mut env_vars = HashMap::new();
1749 env_vars.insert(SWARM_SUBTASK_PAYLOAD_ENV.to_string(), payload_b64);
1750 for key in K8S_PASSTHROUGH_ENV_VARS {
1751 if let Ok(value) = std::env::var(key)
1752 && !value.trim().is_empty()
1753 {
1754 env_vars.insert((*key).to_string(), value);
1755 }
1756 }
1757 let fallback_prompt = if context.trim().is_empty() {
1758 format!(
1759 "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1760Return only the final subtask answer.",
1761 subtask.id, subtask.instruction
1762 )
1763 } else {
1764 format!(
1765 "You are executing swarm subtask '{}'.\n\nTask:\n{}\n\n\
1766Dependency context:\n{}\n\nReturn only the final subtask answer.",
1767 subtask.id, subtask.instruction, context
1768 )
1769 };
1770 env_vars.insert(SWARM_FALLBACK_PROMPT_ENV.to_string(), fallback_prompt);
1771 env_vars.insert(SWARM_FALLBACK_MODEL_ENV.to_string(), model.clone());
1772
1773 let mut labels = HashMap::new();
1774 labels.insert("codetether.run/swarm-id".to_string(), swarm_id.to_string());
1775 labels.insert(
1776 "codetether.run/stage".to_string(),
1777 subtask.stage.to_string(),
1778 );
1779
1780 let spec = SubagentPodSpec {
1781 image: self.config.k8s_subagent_image.clone(),
1782 env_vars,
1783 labels,
1784 command: Some(vec!["sh".to_string(), "-lc".to_string()]),
1785 args: Some(vec![
1786 format!(
1787 "if codetether help swarm-subagent >/dev/null 2>&1; then \
1788exec codetether swarm-subagent --payload-env {payload_env}; \
1789else \
1790exec codetether run \"$${fallback_prompt_env}\" --model \"$${fallback_model_env}\"; \
1791fi",
1792 payload_env = SWARM_SUBTASK_PAYLOAD_ENV,
1793 fallback_prompt_env = SWARM_FALLBACK_PROMPT_ENV,
1794 fallback_model_env = SWARM_FALLBACK_MODEL_ENV,
1795 )
1796 .replace("$$", "$"),
1797 ]),
1798 };
1799
1800 if let Err(error) = k8s.spawn_subagent_pod_with_spec(&subtask.id, spec).await {
1801 let error_text = format!("Failed to spawn Kubernetes pod: {error}");
1802 tracing::error!(subtask_id = %subtask.id, error = %error, "K8s sub-agent pod spawn failed");
1803 self.try_send_event(SwarmEvent::SubTaskUpdate {
1804 id: subtask.id.clone(),
1805 name: subtask.name.clone(),
1806 status: SubTaskStatus::Failed,
1807 agent_name: Some("k8s-spawn".to_string()),
1808 });
1809 self.try_send_event(SwarmEvent::AgentError {
1810 subtask_id: subtask.id.clone(),
1811 error: error_text.clone(),
1812 });
1813 results.push(SubTaskResult {
1814 subtask_id: subtask.id.clone(),
1815 subagent_id: format!("agent-{}", subtask.id),
1816 success: false,
1817 result: String::new(),
1818 steps: 0,
1819 tool_calls: 0,
1820 execution_time_ms: 0,
1821 error: Some(error_text),
1822 artifacts: Vec::new(),
1823 retry_count: 0,
1824 });
1825 continue;
1826 }
1827
1828 let branch = K8sManager::subagent_pod_name(&subtask.id);
1829 subtask_names.insert(subtask.id.clone(), subtask.name.clone());
1830 active.insert(
1831 subtask.id.clone(),
1832 ActiveK8sBranch {
1833 branch: branch.clone(),
1834 started_at: Instant::now(),
1835 },
1836 );
1837
1838 self.try_send_event(SwarmEvent::SubTaskUpdate {
1839 id: subtask.id.clone(),
1840 name: subtask.name.clone(),
1841 status: SubTaskStatus::Running,
1842 agent_name: Some(format!("k8s-{branch}")),
1843 });
1844 self.try_send_event(SwarmEvent::AgentStarted {
1845 subtask_id: subtask.id.clone(),
1846 agent_name: format!("k8s-{branch}"),
1847 specialty: subtask
1848 .specialty
1849 .clone()
1850 .unwrap_or_else(|| "generalist".to_string()),
1851 });
1852
1853 tracing::info!(
1854 subtask_id = %subtask.id,
1855 pod = %branch,
1856 "Spawned Kubernetes sub-agent pod"
1857 );
1858 }
1859
1860 if pending.is_empty() && active.is_empty() {
1861 break;
1862 }
1863
1864 tick.tick().await;
1865
1866 let active_ids: Vec<String> = active.keys().cloned().collect();
1867 let mut finished_results: Vec<SubTaskResult> = Vec::new();
1868 for subtask_id in active_ids {
1869 let Some(active_state) = active.get(&subtask_id).cloned() else {
1870 continue;
1871 };
1872
1873 if active_state.started_at.elapsed()
1874 > Duration::from_secs(self.config.subagent_timeout_secs)
1875 {
1876 let reason = format!(
1877 "Timed out after {}s in Kubernetes pod",
1878 self.config.subagent_timeout_secs
1879 );
1880 kill_reasons.insert(subtask_id.clone(), reason.clone());
1881 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1882 tracing::warn!(
1883 subtask_id = %subtask_id,
1884 error = %error,
1885 "Failed deleting timed-out Kubernetes pod"
1886 );
1887 }
1888 active.remove(&subtask_id);
1889 finished_results.push(SubTaskResult {
1890 subtask_id: subtask_id.clone(),
1891 subagent_id: format!("agent-{subtask_id}"),
1892 success: false,
1893 result: String::new(),
1894 steps: 0,
1895 tool_calls: 0,
1896 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1897 error: Some(reason),
1898 artifacts: Vec::new(),
1899 retry_count: 0,
1900 });
1901 continue;
1902 }
1903
1904 let pod_state = match k8s.get_subagent_pod_state(&subtask_id).await {
1905 Ok(state) => state,
1906 Err(error) => {
1907 tracing::warn!(
1908 subtask_id = %subtask_id,
1909 error = %error,
1910 "Failed to query Kubernetes pod state for sub-agent"
1911 );
1912 continue;
1913 }
1914 };
1915 let Some(pod_state) = pod_state else {
1916 active.remove(&subtask_id);
1917 finished_results.push(SubTaskResult {
1918 subtask_id: subtask_id.clone(),
1919 subagent_id: format!("agent-{subtask_id}"),
1920 success: false,
1921 result: String::new(),
1922 steps: 0,
1923 tool_calls: 0,
1924 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1925 error: Some("Sub-agent pod disappeared".to_string()),
1926 artifacts: Vec::new(),
1927 retry_count: 0,
1928 });
1929 continue;
1930 };
1931
1932 let phase = pod_state.phase.to_ascii_lowercase();
1933 let finished = pod_state.terminated || phase == "succeeded" || phase == "failed";
1934 if !finished {
1935 continue;
1936 }
1937
1938 let logs = k8s
1939 .subagent_logs(&subtask_id, 10_000)
1940 .await
1941 .unwrap_or_default();
1942 let mut result = result_from_logs(&logs).unwrap_or_else(|| SubTaskResult {
1943 subtask_id: subtask_id.clone(),
1944 subagent_id: format!("agent-{subtask_id}"),
1945 success: pod_state.exit_code.unwrap_or(1) == 0,
1946 result: logs,
1947 steps: 0,
1948 tool_calls: 0,
1949 execution_time_ms: active_state.started_at.elapsed().as_millis() as u64,
1950 error: if pod_state.exit_code.unwrap_or(1) == 0 {
1951 None
1952 } else {
1953 Some(
1954 pod_state
1955 .reason
1956 .clone()
1957 .unwrap_or_else(|| "Remote sub-agent failed".to_string()),
1958 )
1959 },
1960 artifacts: Vec::new(),
1961 retry_count: 0,
1962 });
1963
1964 if let Some(reason) = kill_reasons.get(&subtask_id) {
1965 result.success = false;
1966 result.error = Some(format!("Cancelled by collapse controller: {reason}"));
1967 result.result.push_str(&format!(
1968 "\n\n--- Collapse Controller ---\nBranch terminated: {reason}"
1969 ));
1970 }
1971
1972 active.remove(&subtask_id);
1973 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
1974 tracing::warn!(
1975 subtask_id = %subtask_id,
1976 error = %error,
1977 "Failed deleting completed Kubernetes pod"
1978 );
1979 }
1980 finished_results.push(result);
1981 }
1982
1983 for result in finished_results {
1984 if result.success {
1985 completed_results
1986 .write()
1987 .await
1988 .insert(result.subtask_id.clone(), result.result.clone());
1989 }
1990 if result.success
1991 && let Some(ref cache_arc) = self.cache
1992 {
1993 let mut cache_guard = cache_arc.lock().await;
1994 let cache_subtask = SubTask::new(&result.subtask_id, &result.result);
1995 let _ = cache_guard.put(&cache_subtask, &result).await;
1996 }
1997
1998 self.try_send_event(SwarmEvent::SubTaskUpdate {
1999 id: result.subtask_id.clone(),
2000 name: subtask_names
2001 .get(&result.subtask_id)
2002 .cloned()
2003 .unwrap_or_else(|| result.subtask_id.clone()),
2004 status: if result.success {
2005 SubTaskStatus::Completed
2006 } else {
2007 SubTaskStatus::Failed
2008 },
2009 agent_name: Some(format!("k8s-{}", result.subtask_id)),
2010 });
2011 if let Some(ref error) = result.error {
2012 self.try_send_event(SwarmEvent::AgentError {
2013 subtask_id: result.subtask_id.clone(),
2014 error: error.clone(),
2015 });
2016 }
2017 self.try_send_event(SwarmEvent::AgentOutput {
2018 subtask_id: result.subtask_id.clone(),
2019 output: result.result.clone(),
2020 });
2021 self.try_send_event(SwarmEvent::AgentComplete {
2022 subtask_id: result.subtask_id.clone(),
2023 success: result.success,
2024 steps: result.steps,
2025 });
2026 results.push(result);
2027 }
2028
2029 if active.len() > 1 {
2030 let mut observations = Vec::with_capacity(active.len());
2031 for (subtask_id, state) in &active {
2032 let pod_state = match k8s.get_subagent_pod_state(subtask_id).await {
2033 Ok(state) => state,
2034 Err(error) => {
2035 tracing::warn!(
2036 subtask_id = %subtask_id,
2037 error = %error,
2038 "Failed to query pod state while sampling branch observation"
2039 );
2040 None
2041 }
2042 };
2043 let (resource_health_score, infra_unhealthy_signals) =
2044 compute_resource_health(pod_state.as_ref());
2045
2046 let logs = k8s.subagent_logs(subtask_id, 500).await.unwrap_or_default();
2047 if let Some(probe) = latest_probe_from_logs(&logs) {
2048 let compile_ok = pod_state
2049 .as_ref()
2050 .map(|p| probe.compile_ok && !p.phase.eq_ignore_ascii_case("failed"))
2051 .unwrap_or(probe.compile_ok);
2052 observations.push(BranchObservation {
2053 subtask_id: subtask_id.clone(),
2054 branch: state.branch.clone(),
2055 compile_ok,
2056 changed_files: probe_changed_files_set(&probe),
2057 changed_lines: probe.changed_lines,
2058 resource_health_score,
2059 infra_unhealthy_signals,
2060 });
2061 continue;
2062 }
2063 let compile_ok = pod_state
2064 .as_ref()
2065 .map(|p| !p.phase.eq_ignore_ascii_case("failed"))
2066 .unwrap_or(false);
2067 observations.push(BranchObservation {
2068 subtask_id: subtask_id.clone(),
2069 branch: state.branch.clone(),
2070 compile_ok,
2071 changed_files: std::collections::HashSet::new(),
2072 changed_lines: 0,
2073 resource_health_score,
2074 infra_unhealthy_signals,
2075 });
2076 }
2077
2078 let tick = collapse_controller.sample_observations(&observations);
2079 if promoted_subtask_id != tick.promoted_subtask_id {
2080 promoted_subtask_id = tick.promoted_subtask_id.clone();
2081 if let Some(ref promoted) = promoted_subtask_id {
2082 tracing::info!(subtask_id = %promoted, "Collapse controller promoted branch");
2083 if let Some(audit) = crate::audit::try_audit_log() {
2084 audit
2085 .log_with_correlation(
2086 crate::audit::AuditCategory::Swarm,
2087 "collapse_promote_branch",
2088 crate::audit::AuditOutcome::Success,
2089 Some("collapse-controller".to_string()),
2090 Some(serde_json::json!({
2091 "swarm_id": swarm_id,
2092 "subtask_id": promoted,
2093 "execution_mode": "kubernetes_pod",
2094 })),
2095 None,
2096 None,
2097 Some(swarm_id.to_string()),
2098 None,
2099 )
2100 .await;
2101 }
2102 }
2103 }
2104
2105 for kill in tick.kills {
2106 if kill_reasons.contains_key(&kill.subtask_id) {
2107 continue;
2108 }
2109 if !active.contains_key(&kill.subtask_id) {
2110 continue;
2111 }
2112
2113 if let Err(error) = k8s.delete_subagent_pod(&kill.subtask_id).await {
2114 tracing::warn!(
2115 subtask_id = %kill.subtask_id,
2116 error = %error,
2117 "Failed deleting Kubernetes pod after collapse kill"
2118 );
2119 }
2120 kill_reasons.insert(kill.subtask_id.clone(), kill.reason.clone());
2121 let elapsed_ms = active
2122 .remove(&kill.subtask_id)
2123 .map(|s| s.started_at.elapsed().as_millis() as u64)
2124 .unwrap_or(0);
2125
2126 tracing::warn!(
2127 subtask_id = %kill.subtask_id,
2128 branch = %kill.branch,
2129 reason = %kill.reason,
2130 "Collapse controller killed Kubernetes branch"
2131 );
2132
2133 if let Some(audit) = crate::audit::try_audit_log() {
2134 audit
2135 .log_with_correlation(
2136 crate::audit::AuditCategory::Swarm,
2137 "collapse_kill_branch",
2138 crate::audit::AuditOutcome::Success,
2139 Some("collapse-controller".to_string()),
2140 Some(serde_json::json!({
2141 "swarm_id": swarm_id,
2142 "subtask_id": kill.subtask_id.clone(),
2143 "branch": kill.branch.clone(),
2144 "reason": kill.reason.clone(),
2145 "execution_mode": "kubernetes_pod",
2146 })),
2147 None,
2148 None,
2149 Some(swarm_id.to_string()),
2150 None,
2151 )
2152 .await;
2153 }
2154
2155 self.try_send_event(SwarmEvent::SubTaskUpdate {
2156 id: kill.subtask_id.clone(),
2157 name: subtask_names
2158 .get(&kill.subtask_id)
2159 .cloned()
2160 .unwrap_or_else(|| kill.subtask_id.clone()),
2161 status: SubTaskStatus::Cancelled,
2162 agent_name: Some(format!("agent-{}", kill.subtask_id)),
2163 });
2164 self.try_send_event(SwarmEvent::AgentError {
2165 subtask_id: kill.subtask_id.clone(),
2166 error: format!("Cancelled by collapse controller: {}", kill.reason),
2167 });
2168
2169 results.push(SubTaskResult {
2170 subtask_id: kill.subtask_id.clone(),
2171 subagent_id: format!("agent-{}", kill.subtask_id),
2172 success: false,
2173 result: format!(
2174 "\n\n--- Collapse Controller ---\nBranch terminated: {}",
2175 kill.reason
2176 ),
2177 steps: 0,
2178 tool_calls: 0,
2179 execution_time_ms: elapsed_ms,
2180 error: Some(format!("Cancelled by collapse controller: {}", kill.reason)),
2181 artifacts: Vec::new(),
2182 retry_count: 0,
2183 });
2184 }
2185 }
2186 }
2187
2188 if let Some(ref promoted) = promoted_subtask_id {
2189 results.sort_by_key(|result| {
2190 if &result.subtask_id == promoted {
2191 0usize
2192 } else {
2193 1usize
2194 }
2195 });
2196 }
2197
2198 if !active.is_empty() {
2199 let residual_ids: Vec<String> = active.keys().cloned().collect();
2200 for subtask_id in residual_ids {
2201 if let Err(error) = k8s.delete_subagent_pod(&subtask_id).await {
2202 tracing::warn!(
2203 subtask_id = %subtask_id,
2204 error = %error,
2205 "Failed deleting residual Kubernetes pod at stage end"
2206 );
2207 }
2208 }
2209 }
2210
2211 Ok(results)
2212 }
2213
2214 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
2216 let mut aggregated = String::new();
2217
2218 for (i, result) in results.iter().enumerate() {
2219 if result.success {
2220 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
2221 } else {
2222 aggregated.push_str(&format!(
2223 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
2224 i + 1,
2225 result.error.as_deref().unwrap_or("Unknown error")
2226 ));
2227 }
2228 }
2229
2230 Ok(aggregated)
2231 }
2232
2233 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
2235 self.execute(task, DecompositionStrategy::None).await
2236 }
2237}
2238
2239pub struct SwarmExecutorBuilder {
2241 config: SwarmConfig,
2242}
2243
2244impl SwarmExecutorBuilder {
2245 pub fn new() -> Self {
2246 Self {
2247 config: SwarmConfig::default(),
2248 }
2249 }
2250
2251 pub fn max_subagents(mut self, max: usize) -> Self {
2252 self.config.max_subagents = max;
2253 self
2254 }
2255
2256 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
2257 self.config.max_steps_per_subagent = max;
2258 self
2259 }
2260
2261 pub fn max_total_steps(mut self, max: usize) -> Self {
2262 self.config.max_total_steps = max;
2263 self
2264 }
2265
2266 pub fn timeout_secs(mut self, secs: u64) -> Self {
2267 self.config.subagent_timeout_secs = secs;
2268 self
2269 }
2270
2271 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
2272 self.config.parallel_enabled = enabled;
2273 self
2274 }
2275
2276 pub fn build(self) -> SwarmExecutor {
2277 SwarmExecutor::new(self.config)
2278 }
2279}
2280
2281impl Default for SwarmExecutorBuilder {
2282 fn default() -> Self {
2283 Self::new()
2284 }
2285}
2286
2287const WORKTREE_CREATE_PARALLELISM: usize = 8;
2292
2293async fn precreate_worktrees(
2304 mgr: Arc<WorktreeManager>,
2305 subtask_ids: Vec<String>,
2306) -> HashMap<String, WorktreeInfo> {
2307 stream::iter(subtask_ids.into_iter().map(|sid| {
2308 let mgr = Arc::clone(&mgr);
2309 async move {
2310 let slug = sid.replace("-", "_");
2311 match mgr.create(&slug).await {
2312 Ok(wt) => {
2313 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
2314 tracing::warn!(
2315 subtask_id = %sid,
2316 error = %e,
2317 "Failed to inject workspace stub into worktree"
2318 );
2319 }
2320 tracing::info!(
2321 subtask_id = %sid,
2322 worktree_path = %wt.path.display(),
2323 worktree_branch = %wt.branch,
2324 "Created worktree for sub-agent"
2325 );
2326 Some((sid, wt))
2327 }
2328 Err(e) => {
2329 tracing::warn!(
2330 subtask_id = %sid,
2331 error = %e,
2332 "Failed to create worktree, sub-agent will use shared directory"
2333 );
2334 None
2335 }
2336 }
2337 }
2338 }))
2339 .buffer_unordered(WORKTREE_CREATE_PARALLELISM)
2340 .filter_map(|x| async move { x })
2341 .collect()
2342 .await
2343}
2344
2345#[allow(clippy::too_many_arguments)]
2347fn resolve_tool_paths(
2354 tool_name: &str,
2355 args: &mut serde_json::Value,
2356 working_dir: &std::path::Path,
2357) {
2358 match tool_name {
2359 "read" | "write" | "list" | "grep" | "codesearch" => {
2360 if let Some(path) = args.get("path").and_then(|v| v.as_str()).map(String::from)
2361 && !std::path::Path::new(&path).is_absolute()
2362 {
2363 args["path"] = serde_json::json!(working_dir.join(&path).display().to_string());
2364 }
2365 }
2366 "edit" => {
2367 if let Some(path) = args
2368 .get("filePath")
2369 .and_then(|v| v.as_str())
2370 .map(String::from)
2371 && !std::path::Path::new(&path).is_absolute()
2372 {
2373 args["filePath"] = serde_json::json!(working_dir.join(&path).display().to_string());
2374 }
2375 }
2376 "glob" => {
2377 if let Some(pattern) = args
2378 .get("pattern")
2379 .and_then(|v| v.as_str())
2380 .map(String::from)
2381 && !std::path::Path::new(&pattern).is_absolute()
2382 && !pattern.starts_with("*")
2383 {
2384 args["pattern"] =
2385 serde_json::json!(working_dir.join(&pattern).display().to_string());
2386 }
2387 }
2388 "multiedit" => {
2389 if let Some(edits) = args.get_mut("edits").and_then(|v| v.as_array_mut()) {
2390 for edit in edits.iter_mut() {
2391 if let Some(file) = edit.get("file").and_then(|v| v.as_str()).map(String::from)
2392 && !std::path::Path::new(&file).is_absolute()
2393 {
2394 edit["file"] =
2395 serde_json::json!(working_dir.join(&file).display().to_string());
2396 }
2397 }
2398 }
2399 }
2400 "patch" => {
2401 if let Some(path) = args.get("file").and_then(|v| v.as_str()).map(String::from)
2402 && !std::path::Path::new(&path).is_absolute()
2403 {
2404 args["file"] = serde_json::json!(working_dir.join(&path).display().to_string());
2405 }
2406 }
2407 "bash" => {
2408 if args.get("cwd").and_then(|v| v.as_str()).is_none() {
2410 args["cwd"] = serde_json::json!(working_dir.display().to_string());
2411 }
2412 }
2413 _ => {}
2414 }
2415}
2416
2417pub async fn run_agent_loop(
2418 provider: Arc<dyn Provider>,
2419 model: &str,
2420 system_prompt: &str,
2421 user_prompt: &str,
2422 tools: Vec<crate::provider::ToolDefinition>,
2423 registry: Arc<ToolRegistry>,
2424 max_steps: usize,
2425 timeout_secs: u64,
2426 event_tx: Option<mpsc::Sender<SwarmEvent>>,
2427 subtask_id: String,
2428 bus: Option<Arc<AgentBus>>,
2429 working_dir: Option<std::path::PathBuf>,
2430) -> Result<(String, usize, usize, AgentLoopExit)> {
2431 let temperature = 0.7;
2433
2434 tracing::info!(
2435 model = %model,
2436 max_steps = max_steps,
2437 timeout_secs = timeout_secs,
2438 "Sub-agent starting agentic loop"
2439 );
2440 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
2441 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
2442
2443 let mut messages = vec![
2445 Message {
2446 role: Role::System,
2447 content: vec![ContentPart::Text {
2448 text: system_prompt.to_string(),
2449 }],
2450 },
2451 Message {
2452 role: Role::User,
2453 content: vec![ContentPart::Text {
2454 text: user_prompt.to_string(),
2455 }],
2456 },
2457 ];
2458
2459 let mut steps = 0;
2460 let mut total_tool_calls = 0;
2461 let mut final_output = String::new();
2462
2463 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
2464
2465 let exit_reason = loop {
2466 if steps >= max_steps {
2467 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
2468 break AgentLoopExit::MaxStepsReached;
2469 }
2470
2471 if Instant::now() > deadline {
2472 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
2473 break AgentLoopExit::TimedOut;
2474 }
2475
2476 steps += 1;
2477 tracing::info!(step = steps, "Sub-agent step starting");
2478
2479 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
2481
2482 let request = CompletionRequest {
2483 messages: messages.clone(),
2484 tools: tools.clone(),
2485 model: model.to_string(),
2486 temperature: Some(temperature),
2487 top_p: None,
2488 max_tokens: Some(8192),
2489 stop: Vec::new(),
2490 };
2491
2492 let step_start = Instant::now();
2493 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
2494 let step_duration = step_start.elapsed();
2495
2496 tracing::info!(
2497 step = steps,
2498 duration_ms = step_duration.as_millis() as u64,
2499 finish_reason = ?response.finish_reason,
2500 prompt_tokens = response.usage.prompt_tokens,
2501 completion_tokens = response.usage.completion_tokens,
2502 "Sub-agent step completed LLM call"
2503 );
2504
2505 let mut text_parts = Vec::new();
2507 let mut thinking_parts = Vec::new();
2508 let mut tool_calls = Vec::new();
2509
2510 for part in &response.message.content {
2511 match part {
2512 ContentPart::Text { text } => {
2513 text_parts.push(text.clone());
2514 }
2515 ContentPart::Thinking { text } if !text.is_empty() => {
2516 thinking_parts.push(text.clone());
2517 }
2518 ContentPart::ToolCall {
2519 id,
2520 name,
2521 arguments,
2522 ..
2523 } => {
2524 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
2525 }
2526 _ => {}
2527 }
2528 }
2529
2530 if !thinking_parts.is_empty()
2532 && let Some(ref bus) = bus
2533 {
2534 let thinking_text = thinking_parts.join("\n");
2535 let handle = bus.handle(&subtask_id);
2536 handle.send(
2537 format!("agent.{subtask_id}.thinking"),
2538 BusMessage::AgentThinking {
2539 agent_id: subtask_id.clone(),
2540 thinking: thinking_text,
2541 step: steps,
2542 },
2543 );
2544 }
2545
2546 if !text_parts.is_empty() {
2548 let step_output = text_parts.join("\n");
2549 if !final_output.is_empty() {
2550 final_output.push('\n');
2551 }
2552 final_output.push_str(&step_output);
2553 tracing::info!(
2554 step = steps,
2555 output_len = final_output.len(),
2556 "Sub-agent text output"
2557 );
2558 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
2559
2560 if let Some(ref tx) = event_tx {
2562 let preview = if step_output.len() > 500 {
2563 let mut end = 500;
2564 while end > 0 && !step_output.is_char_boundary(end) {
2565 end -= 1;
2566 }
2567 format!("{}...", &step_output[..end])
2568 } else {
2569 step_output.clone()
2570 };
2571 let _ = tx.try_send(SwarmEvent::AgentMessage {
2572 subtask_id: subtask_id.clone(),
2573 entry: AgentMessageEntry {
2574 role: "assistant".to_string(),
2575 content: preview,
2576 is_tool_call: false,
2577 },
2578 });
2579 }
2580 }
2581
2582 if !tool_calls.is_empty() {
2584 tracing::info!(
2585 step = steps,
2586 num_tool_calls = tool_calls.len(),
2587 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
2588 "Sub-agent requesting tool calls"
2589 );
2590 }
2591
2592 messages.push(response.message.clone());
2594
2595 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
2597 tracing::info!(
2598 steps = steps,
2599 total_tool_calls = total_tool_calls,
2600 "Sub-agent finished"
2601 );
2602 break AgentLoopExit::Completed;
2603 }
2604
2605 let mut tool_results = Vec::new();
2607
2608 for (call_id, tool_name, arguments) in tool_calls {
2609 total_tool_calls += 1;
2610
2611 if let Some(ref tx) = event_tx {
2613 let _ = tx.try_send(SwarmEvent::AgentToolCall {
2614 subtask_id: subtask_id.clone(),
2615 tool_name: tool_name.clone(),
2616 });
2617 }
2618
2619 tracing::info!(
2620 step = steps,
2621 tool_call_id = %call_id,
2622 tool = %tool_name,
2623 "Executing tool"
2624 );
2625 tracing::debug!(
2626 tool = %tool_name,
2627 arguments = %arguments,
2628 "Tool call arguments"
2629 );
2630
2631 let tool_start = Instant::now();
2632 let mut tool_success = true;
2633 let result = if let Some(tool) = registry.get(&tool_name) {
2634 let mut args: serde_json::Value =
2636 serde_json::from_str(&arguments).unwrap_or_else(|e| {
2637 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
2638 serde_json::json!({})
2639 });
2640
2641 if let Some(ref wd) = working_dir {
2643 resolve_tool_paths(&tool_name, &mut args, wd);
2644 }
2645 let agent_name = format!("agent-{subtask_id}");
2646 let provenance =
2647 ExecutionProvenance::for_operation(&agent_name, ExecutionOrigin::Swarm);
2648 args = enrich_tool_input_with_runtime_context(
2649 &args,
2650 working_dir
2651 .as_deref()
2652 .unwrap_or_else(|| std::path::Path::new(".")),
2653 Some(model),
2654 &subtask_id,
2655 &agent_name,
2656 Some(&provenance),
2657 );
2658
2659 match tool.execute(args).await {
2660 Ok(r) => {
2661 if r.success {
2662 tracing::info!(
2663 tool = %tool_name,
2664 duration_ms = tool_start.elapsed().as_millis() as u64,
2665 success = true,
2666 "Tool execution completed"
2667 );
2668 r.output
2669 } else {
2670 tool_success = false;
2671 tracing::warn!(
2672 tool = %tool_name,
2673 error = %r.output,
2674 "Tool returned error"
2675 );
2676 format!("Tool error: {}", r.output)
2677 }
2678 }
2679 Err(e) => {
2680 tool_success = false;
2681 tracing::error!(
2682 tool = %tool_name,
2683 error = %e,
2684 "Tool execution failed"
2685 );
2686 format!("Tool execution failed: {}", e)
2687 }
2688 }
2689 } else {
2690 tool_success = false;
2691 tracing::error!(tool = %tool_name, "Unknown tool requested");
2692 format!("Unknown tool: {}", tool_name)
2693 };
2694
2695 if let Some(ref tx) = event_tx {
2697 let input_preview = if arguments.len() > 200 {
2698 let mut end = 200;
2699 while end > 0 && !arguments.is_char_boundary(end) {
2700 end -= 1;
2701 }
2702 format!("{}...", &arguments[..end])
2703 } else {
2704 arguments.clone()
2705 };
2706 let output_preview = if result.len() > 500 {
2707 let mut end = 500;
2708 while end > 0 && !result.is_char_boundary(end) {
2709 end -= 1;
2710 }
2711 format!("{}...", &result[..end])
2712 } else {
2713 result.clone()
2714 };
2715 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
2716 subtask_id: subtask_id.clone(),
2717 detail: AgentToolCallDetail {
2718 tool_name: tool_name.clone(),
2719 input_preview,
2720 output_preview,
2721 success: tool_success,
2722 },
2723 });
2724 }
2725
2726 tracing::debug!(
2727 tool = %tool_name,
2728 result_len = result.len(),
2729 "Tool result"
2730 );
2731
2732 if let Some(ref bus) = bus {
2735 let handle = bus.handle(&subtask_id);
2736 handle.send(
2737 format!("tools.{tool_name}"),
2738 BusMessage::ToolOutputFull {
2739 agent_id: subtask_id.clone(),
2740 tool_name: tool_name.clone(),
2741 output: result.clone(),
2742 success: tool_success,
2743 step: steps,
2744 },
2745 );
2746 }
2747
2748 let result = if result.len() > RLM_THRESHOLD_CHARS {
2750 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
2752 .await
2753 } else {
2754 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
2756 };
2757
2758 tool_results.push((call_id, tool_name, result));
2759 }
2760
2761 for (call_id, _tool_name, result) in tool_results {
2763 messages.push(Message {
2764 role: Role::Tool,
2765 content: vec![ContentPart::ToolResult {
2766 tool_call_id: call_id,
2767 content: result,
2768 }],
2769 });
2770 }
2771
2772 deadline = Instant::now() + Duration::from_secs(timeout_secs);
2774 };
2775
2776 Ok((final_output, steps, total_tool_calls, exit_reason))
2777}