1use super::{
7 CacheConfig, CacheStats, DecompositionStrategy, StageStats, SwarmCache, SwarmConfig,
8 SwarmResult,
9 orchestrator::Orchestrator,
10 result_store::ResultStore,
11 subtask::{SubTask, SubTaskResult, SubTaskStatus},
12};
13use crate::bus::{AgentBus, BusMessage};
14use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
15
16pub use super::SwarmMessage;
18use crate::{
19 agent::Agent,
20 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
21 rlm::RlmExecutor,
22 swarm::{SwarmArtifact, SwarmStats},
23 telemetry::SwarmTelemetryCollector,
24 tool::ToolRegistry,
25 worktree::{WorktreeInfo, WorktreeManager},
26};
27use anyhow::Result;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::sync::{RwLock, mpsc};
32use tokio::time::{Duration, timeout};
33
34const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
36
37const RESPONSE_RESERVE_TOKENS: usize = 8_192;
39
40const TRUNCATION_THRESHOLD: f64 = 0.85;
42
43fn estimate_tokens(text: &str) -> usize {
45 (text.len() as f64 / 3.5).ceil() as usize
49}
50
51fn estimate_message_tokens(message: &Message) -> usize {
53 let mut tokens = 4; for part in &message.content {
56 tokens += match part {
57 ContentPart::Text { text } => estimate_tokens(text),
58 ContentPart::ToolCall {
59 id,
60 name,
61 arguments,
62 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
63 ContentPart::ToolResult {
64 tool_call_id,
65 content,
66 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
67 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, ContentPart::Thinking { text } => estimate_tokens(text),
69 };
70 }
71
72 tokens
73}
74
75fn estimate_total_tokens(messages: &[Message]) -> usize {
77 messages.iter().map(estimate_message_tokens).sum()
78}
79
80fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
88 let target_tokens =
89 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
90
91 let current_tokens = estimate_total_tokens(messages);
92 if current_tokens <= target_tokens {
93 return;
94 }
95
96 tracing::warn!(
97 current_tokens = current_tokens,
98 target_tokens = target_tokens,
99 context_limit = context_limit,
100 "Context approaching limit, truncating conversation history"
101 );
102
103 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
107 if after_tool_truncation <= target_tokens {
108 tracing::info!(
109 old_tokens = current_tokens,
110 new_tokens = after_tool_truncation,
111 "Truncated large tool results, context now within limits"
112 );
113 return;
114 }
115
116 if messages.len() <= 6 {
118 tracing::warn!(
119 tokens = after_tool_truncation,
120 target = target_tokens,
121 "Cannot truncate further - conversation too short"
122 );
123 return;
124 }
125
126 let keep_start = 2;
129 let keep_end = 4;
130 let removable_count = messages.len() - keep_start - keep_end;
131
132 if removable_count == 0 {
133 return;
134 }
135
136 let removed_messages: Vec<_> = messages
138 .drain(keep_start..keep_start + removable_count)
139 .collect();
140 let summary = summarize_removed_messages(&removed_messages);
141
142 messages.insert(
144 keep_start,
145 Message {
146 role: Role::User,
147 content: vec![ContentPart::Text {
148 text: format!(
149 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
150 removed_messages.len(),
151 summary
152 ),
153 }],
154 },
155 );
156
157 let new_tokens = estimate_total_tokens(messages);
158 tracing::info!(
159 removed_messages = removed_messages.len(),
160 old_tokens = current_tokens,
161 new_tokens = new_tokens,
162 "Truncated conversation history"
163 );
164}
165
166fn summarize_removed_messages(messages: &[Message]) -> String {
168 let mut summary = String::new();
169 let mut tool_calls: Vec<String> = Vec::new();
170
171 for msg in messages {
172 for part in &msg.content {
173 if let ContentPart::ToolCall { name, .. } = part {
174 if !tool_calls.contains(name) {
175 tool_calls.push(name.clone());
176 }
177 }
178 }
179 }
180
181 if !tool_calls.is_empty() {
182 summary.push_str(&format!(
183 "Tools used in truncated history: {}",
184 tool_calls.join(", ")
185 ));
186 }
187
188 summary
189}
190
191fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
193 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
195 let mut saved_tokens = 0usize;
196
197 for message in messages.iter_mut() {
198 for part in message.content.iter_mut() {
199 if let ContentPart::ToolResult { content, .. } = part {
200 let tokens = estimate_tokens(content);
201 if tokens > max_tokens_per_result {
202 let old_len = content.len();
203 *content = truncate_single_result(content, char_limit);
204 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
205 if content.len() < old_len {
206 truncated_count += 1;
207 }
208 }
209 }
210 }
211 }
212
213 if truncated_count > 0 {
214 tracing::info!(
215 truncated_count = truncated_count,
216 saved_tokens = saved_tokens,
217 max_tokens_per_result = max_tokens_per_result,
218 "Truncated large tool results"
219 );
220 }
221}
222
223fn truncate_single_result(content: &str, max_chars: usize) -> String {
225 if content.len() <= max_chars {
226 return content.to_string();
227 }
228
229 let safe_limit = {
231 let mut limit = max_chars.min(content.len());
232 while limit > 0 && !content.is_char_boundary(limit) {
233 limit -= 1;
234 }
235 limit
236 };
237
238 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
240
241 let truncated = format!(
242 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
243 &content[..break_point],
244 content.len(),
245 break_point
246 );
247
248 tracing::debug!(
249 original_len = content.len(),
250 truncated_len = truncated.len(),
251 "Truncated large result"
252 );
253
254 truncated
255}
256
257const RLM_THRESHOLD_CHARS: usize = 50_000;
259
260const SIMPLE_TRUNCATE_CHARS: usize = 6000;
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum AgentLoopExit {
265 Completed,
266 MaxStepsReached,
267 TimedOut,
268}
269
270async fn process_large_result_with_rlm(
272 content: &str,
273 tool_name: &str,
274 provider: Arc<dyn Provider>,
275 model: &str,
276) -> String {
277 if content.len() <= SIMPLE_TRUNCATE_CHARS {
278 return content.to_string();
279 }
280
281 if content.len() <= RLM_THRESHOLD_CHARS {
283 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
284 }
285
286 tracing::info!(
288 tool = %tool_name,
289 content_len = content.len(),
290 "Using RLM to process large tool result"
291 );
292
293 let query = format!(
294 "Summarize the key information from this {} output. \
295 Focus on: errors, warnings, important findings, and actionable items. \
296 Be concise but thorough.",
297 tool_name
298 );
299
300 let mut executor =
301 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
302
303 match executor.analyze(&query).await {
304 Ok(result) => {
305 tracing::info!(
306 tool = %tool_name,
307 original_len = content.len(),
308 summary_len = result.answer.len(),
309 iterations = result.iterations,
310 "RLM summarized large result"
311 );
312
313 format!(
314 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
315 tool_name,
316 content.len(),
317 result.answer.len(),
318 result.answer
319 )
320 }
321 Err(e) => {
322 tracing::warn!(
323 tool = %tool_name,
324 error = %e,
325 "RLM analysis failed, falling back to truncation"
326 );
327 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
328 }
329 }
330}
331
332pub struct SwarmExecutor {
334 config: SwarmConfig,
335 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
337 event_tx: Option<mpsc::Sender<SwarmEvent>>,
339 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
341 cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
343 result_store: Arc<ResultStore>,
345 bus: Option<Arc<AgentBus>>,
347}
348
349impl SwarmExecutor {
350 pub fn new(config: SwarmConfig) -> Self {
352 Self {
353 config,
354 coordinator_agent: None,
355 event_tx: None,
356 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
357 cache: None,
358 result_store: ResultStore::new_arc(),
359 bus: None,
360 }
361 }
362
363 pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
365 let cache = SwarmCache::new(cache_config).await?;
366 Ok(Self {
367 config,
368 coordinator_agent: None,
369 event_tx: None,
370 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
371 cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
372 result_store: ResultStore::new_arc(),
373 bus: None,
374 })
375 }
376
377 pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
379 self.cache = Some(cache);
380 self
381 }
382
383 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
385 self.bus = Some(bus);
386 self
387 }
388
389 pub fn bus(&self) -> Option<&Arc<AgentBus>> {
391 self.bus.as_ref()
392 }
393
394 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
396 self.event_tx = Some(tx);
397 self
398 }
399
400 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
402 tracing::debug!("Setting coordinator agent for swarm execution");
403 self.coordinator_agent = Some(agent);
404 self
405 }
406
407 pub fn with_telemetry(
409 mut self,
410 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
411 ) -> Self {
412 self.telemetry = telemetry;
413 self
414 }
415
416 pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
418 Arc::clone(&self.telemetry)
419 }
420 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
422 self.coordinator_agent.as_ref()
423 }
424
425 pub fn result_store(&self) -> &Arc<ResultStore> {
427 &self.result_store
428 }
429
430 pub async fn cache_stats(&self) -> Option<CacheStats> {
432 if let Some(ref cache) = self.cache {
433 let cache_guard = cache.lock().await;
434 Some(cache_guard.stats().clone())
435 } else {
436 None
437 }
438 }
439
440 pub async fn clear_cache(&self) -> Result<()> {
442 if let Some(ref cache) = self.cache {
443 let mut cache_guard = cache.lock().await;
444 cache_guard.clear().await?;
445 }
446 Ok(())
447 }
448
449 fn try_send_event(&self, event: SwarmEvent) {
451 if let Some(ref bus) = self.bus {
453 let handle = bus.handle("swarm-executor");
454 match &event {
455 SwarmEvent::Started { task, .. } => {
456 handle.send(
457 "broadcast",
458 BusMessage::AgentReady {
459 agent_id: "swarm-executor".to_string(),
460 capabilities: vec![format!("executing:{task}")],
461 },
462 );
463 }
464 SwarmEvent::Complete { success, .. } => {
465 let state = if *success {
466 crate::a2a::types::TaskState::Completed
467 } else {
468 crate::a2a::types::TaskState::Failed
469 };
470 handle.send_task_update("swarm", state, None);
471 }
472 _ => {} }
474 }
475
476 if let Some(ref tx) = self.event_tx {
477 let _ = tx.try_send(event);
478 }
479 }
480
481 pub async fn execute(
483 &self,
484 task: &str,
485 strategy: DecompositionStrategy,
486 ) -> Result<SwarmResult> {
487 let start_time = Instant::now();
488
489 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
491
492 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
493
494 let subtasks = orchestrator.decompose(task, strategy).await?;
496
497 if subtasks.is_empty() {
498 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
499 return Ok(SwarmResult {
500 success: false,
501 result: String::new(),
502 subtask_results: Vec::new(),
503 stats: SwarmStats::default(),
504 artifacts: Vec::new(),
505 error: Some("No subtasks generated".to_string()),
506 });
507 }
508
509 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
510
511 self.try_send_event(SwarmEvent::Started {
512 task: task.to_string(),
513 total_subtasks: subtasks.len(),
514 });
515
516 self.try_send_event(SwarmEvent::Decomposed {
518 subtasks: subtasks
519 .iter()
520 .map(|s| SubTaskInfo {
521 id: s.id.clone(),
522 name: s.name.clone(),
523 status: SubTaskStatus::Pending,
524 stage: s.stage,
525 dependencies: s.dependencies.clone(),
526 agent_name: s.specialty.clone(),
527 current_tool: None,
528 steps: 0,
529 max_steps: self.config.max_steps_per_subagent,
530 tool_call_history: Vec::new(),
531 messages: Vec::new(),
532 output: None,
533 error: None,
534 })
535 .collect(),
536 });
537
538 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
540 let mut all_results: Vec<SubTaskResult> = Vec::new();
541 let artifacts: Vec<SwarmArtifact> = Vec::new();
542
543 let swarm_id = uuid::Uuid::new_v4().to_string();
545 self.telemetry.lock().await.start_swarm(
546 swarm_id.clone(),
547 subtasks.len(),
548 &format!("{:?}", strategy),
549 );
550
551 let completed_results: Arc<RwLock<HashMap<String, String>>> =
553 Arc::new(RwLock::new(HashMap::new()));
554
555 for stage in 0..=max_stage {
556 let stage_start = Instant::now();
557
558 let stage_subtasks: Vec<SubTask> = orchestrator
559 .subtasks_for_stage(stage)
560 .into_iter()
561 .cloned()
562 .collect();
563
564 tracing::debug!(
565 "Stage {} has {} subtasks (max_stage={})",
566 stage,
567 stage_subtasks.len(),
568 max_stage
569 );
570
571 if stage_subtasks.is_empty() {
572 continue;
573 }
574
575 tracing::info!(
576 provider_name = %orchestrator.provider(),
577 "Executing stage {} with {} subtasks",
578 stage,
579 stage_subtasks.len()
580 );
581
582 let stage_results = self
584 .execute_stage(
585 &orchestrator,
586 stage_subtasks,
587 completed_results.clone(),
588 &swarm_id,
589 )
590 .await?;
591
592 {
594 let mut completed = completed_results.write().await;
595 for result in &stage_results {
596 completed.insert(result.subtask_id.clone(), result.result.clone());
597 let tags = vec![
599 format!("stage:{stage}"),
600 format!("subtask:{}", result.subtask_id),
601 ];
602 let _ = self
603 .result_store
604 .publish(
605 &result.subtask_id,
606 &result.subagent_id,
607 &result.result,
608 tags,
609 None,
610 )
611 .await;
612 }
613 }
614
615 let stage_time = stage_start.elapsed().as_millis() as u64;
617 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
618 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
619
620 orchestrator.stats_mut().stages.push(StageStats {
621 stage,
622 subagent_count: stage_results.len(),
623 max_steps,
624 total_steps,
625 execution_time_ms: stage_time,
626 });
627
628 for result in &stage_results {
630 orchestrator.complete_subtask(&result.subtask_id, result.clone());
631 }
632
633 let stage_completed = stage_results.iter().filter(|r| r.success).count();
635 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
636 self.try_send_event(SwarmEvent::StageComplete {
637 stage,
638 completed: stage_completed,
639 failed: stage_failed,
640 });
641
642 all_results.extend(stage_results);
643 }
644
645 let provider_name = orchestrator.provider().to_string();
647
648 self.telemetry
650 .lock()
651 .await
652 .record_swarm_latency("total_execution", start_time.elapsed());
653
654 let stats = orchestrator.stats_mut();
656 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
657 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
658 stats.calculate_critical_path();
659 stats.calculate_speedup();
660
661 let success = all_results.iter().all(|r| r.success);
663
664 let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
666 let result = self.aggregate_results(&all_results).await?;
667
668 tracing::info!(
669 provider_name = %provider_name,
670 "Swarm execution complete: {} subtasks, {:.1}x speedup",
671 all_results.len(),
672 stats.speedup_factor
673 );
674
675 let final_stats = orchestrator.stats().clone();
676 self.try_send_event(SwarmEvent::Complete {
677 success,
678 stats: final_stats.clone(),
679 });
680
681 Ok(SwarmResult {
682 success,
683 result,
684 subtask_results: all_results,
685 stats: final_stats,
686 artifacts,
687 error: None,
688 })
689 }
690
691 async fn execute_stage(
693 &self,
694 orchestrator: &Orchestrator,
695 subtasks: Vec<SubTask>,
696 completed_results: Arc<RwLock<HashMap<String, String>>>,
697 swarm_id: &str,
698 ) -> Result<Vec<SubTaskResult>> {
699 let mut handles: Vec<(
700 String,
701 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
702 )> = Vec::new();
703 let mut cached_results: Vec<SubTaskResult> = Vec::new();
704
705 let semaphore = Arc::new(tokio::sync::Semaphore::new(
707 self.config.max_concurrent_requests,
708 ));
709 let delay_ms = self.config.request_delay_ms;
710
711 let model = orchestrator.model().to_string();
713 let provider_name = orchestrator.provider().to_string();
714 let providers = orchestrator.providers();
715 let provider = providers
716 .get(&provider_name)
717 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
718
719 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
720
721 let base_tool_registry =
723 ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
724 let mut tool_definitions: Vec<_> = base_tool_registry
727 .definitions()
728 .into_iter()
729 .filter(|t| t.name != "question")
730 .collect();
731
732 let swarm_share_def = crate::provider::ToolDefinition {
734 name: "swarm_share".to_string(),
735 description: "Share results with other sub-agents in the swarm. Actions: publish \
736 (share a result), get (retrieve a result by key), query_tags (find \
737 results by tags), query_prefix (find results by key prefix), list \
738 (show all shared results)."
739 .to_string(),
740 parameters: serde_json::json!({
741 "type": "object",
742 "properties": {
743 "action": {
744 "type": "string",
745 "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
746 "description": "Action to perform"
747 },
748 "key": {
749 "type": "string",
750 "description": "Result key (for publish/get)"
751 },
752 "value": {
753 "description": "Result value to publish (any JSON value)"
754 },
755 "tags": {
756 "type": "array",
757 "items": {"type": "string"},
758 "description": "Tags for publish or query_tags"
759 },
760 "prefix": {
761 "type": "string",
762 "description": "Key prefix for query_prefix"
763 }
764 },
765 "required": ["action"]
766 }),
767 };
768 tool_definitions.push(swarm_share_def);
769
770 let result_store = Arc::clone(&self.result_store);
772
773 let worktree_manager = if self.config.worktree_enabled {
775 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
776 std::env::current_dir()
777 .map(|p| p.to_string_lossy().to_string())
778 .unwrap_or_else(|_| ".".to_string())
779 });
780
781 match WorktreeManager::new(&working_dir) {
782 Ok(mgr) => {
783 tracing::info!(
784 working_dir = %working_dir,
785 "Worktree isolation enabled for parallel sub-agents"
786 );
787 Some(Arc::new(mgr) as Arc<WorktreeManager>)
788 }
789 Err(e) => {
790 tracing::warn!(
791 error = %e,
792 "Failed to create worktree manager, falling back to shared directory"
793 );
794 None
795 }
796 }
797 } else {
798 None
799 };
800
801 for (idx, subtask) in subtasks.into_iter().enumerate() {
802 let model = model.clone();
803 let _provider_name = provider_name.clone();
804 let provider = Arc::clone(&provider);
805
806 if let Some(ref cache) = self.cache {
808 let mut cache_guard = cache.lock().await;
809 if let Some(cached_result) = cache_guard.get(&subtask).await {
810 tracing::info!(
811 subtask_id = %subtask.id,
812 task_name = %subtask.name,
813 "Cache hit for subtask, skipping execution"
814 );
815 self.try_send_event(SwarmEvent::SubTaskUpdate {
816 id: subtask.id.clone(),
817 name: subtask.name.clone(),
818 status: SubTaskStatus::Completed,
819 agent_name: Some("cached".to_string()),
820 });
821 cached_results.push(cached_result);
822 continue;
823 }
824 }
825
826 let context = {
828 let completed = completed_results.read().await;
829 let mut dep_context = String::new();
830 for dep_id in &subtask.dependencies {
831 if let Some(result) = completed.get(dep_id) {
832 dep_context.push_str(&format!(
833 "\n--- Result from dependency {} ---\n{}\n",
834 dep_id, result
835 ));
836 }
837 }
838 dep_context
839 };
840
841 let instruction = subtask.instruction.clone();
842 let subtask_name = subtask.name.clone();
843 let specialty = subtask.specialty.clone().unwrap_or_default();
844 let subtask_id = subtask.id.clone();
845 let subtask_id_for_handle = subtask_id.clone();
846 let max_steps = self.config.max_steps_per_subagent;
847 let timeout_secs = self.config.subagent_timeout_secs;
848
849 let tools = tool_definitions.clone();
851 let _base_registry = Arc::clone(&base_tool_registry);
852 let agent_result_store = Arc::clone(&result_store);
853 let sem = Arc::clone(&semaphore);
854 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
856 let event_tx = self.event_tx.clone();
857
858 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
860
861 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
863
864 let handle = tokio::spawn(async move {
866 if stagger_delay > 0 {
868 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
869 }
870 let _permit = sem
871 .acquire()
872 .await
873 .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
874
875 let _agent_start = Instant::now();
876
877 let start = Instant::now();
878
879 let worktree_info = if let Some(ref mgr) = worktree_mgr {
881 let task_slug = subtask_id.replace("-", "_");
882 match mgr.create(&task_slug) {
883 Ok(wt) => {
884 tracing::info!(
885 subtask_id = %subtask_id,
886 worktree_path = %wt.path.display(),
887 worktree_branch = %wt.branch,
888 "Created worktree for sub-agent"
889 );
890 Some(wt)
891 }
892 Err(e) => {
893 tracing::warn!(
894 subtask_id = %subtask_id,
895 error = %e,
896 "Failed to create worktree, using shared directory"
897 );
898 None
899 }
900 }
901 } else {
902 None
903 };
904
905 let working_dir = worktree_info
907 .as_ref()
908 .map(|wt| wt.path.display().to_string())
909 .unwrap_or_else(|| ".".to_string());
910
911 let working_path = std::path::Path::new(&working_dir);
913 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
914 .map(|(content, _)| {
915 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
916 })
917 .unwrap_or_default();
918
919 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
921 let system_prompt = format!(
922 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
923
924WORKING DIRECTORY: {}
925All file operations should be relative to this directory.
926
927IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
928
929Available tools:
930- read: Read file contents
931- write: Write/create files
932- edit: Edit existing files (search and replace)
933- multiedit: Make multiple edits at once
934- glob: Find files by pattern
935- grep: Search file contents
936- bash: Run shell commands (use cwd: \"{}\" parameter)
937- webfetch: Fetch web pages
938- prd: Generate structured PRD for complex tasks
939- ralph: Run autonomous agent loop on a PRD
940- swarm_share: Share results with other sub-agents running in parallel
941
942SHARING RESULTS:
943Use swarm_share to collaborate with other sub-agents:
944- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
945- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
946- swarm_share({{action: 'list'}}) to see all shared results
947- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
948
949COMPLEX TASKS:
950If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
9511. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
9522. Break down into user stories with acceptance criteria
9533. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
9544. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
955
956NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
957
958When done, provide a brief summary of what you accomplished.{agents_md_content}",
959 specialty,
960 subtask_id,
961 working_dir,
962 working_dir,
963 prd_filename,
964 prd_filename,
965 prd_filename
966 );
967
968 let user_prompt = if context.is_empty() {
969 format!("Complete this task:\n\n{}", instruction)
970 } else {
971 format!(
972 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
973 instruction, context
974 )
975 };
976
977 if let Some(ref tx) = event_tx {
979 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
980 id: subtask_id.clone(),
981 name: subtask_name.clone(),
982 status: SubTaskStatus::Running,
983 agent_name: Some(format!("agent-{}", subtask_id)),
984 });
985 let _ = tx.try_send(SwarmEvent::AgentStarted {
986 subtask_id: subtask_id.clone(),
987 agent_name: format!("agent-{}", subtask_id),
988 specialty: specialty.clone(),
989 });
990 }
991
992 let mut agent_registry =
995 ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
996 agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
997 Arc::clone(&agent_result_store),
998 subtask_id.clone(),
999 )));
1000 let registry = Arc::new(agent_registry);
1001
1002 let result = run_agent_loop(
1003 provider,
1004 &model,
1005 &system_prompt,
1006 &user_prompt,
1007 tools,
1008 registry,
1009 max_steps,
1010 timeout_secs,
1011 event_tx.clone(),
1012 subtask_id.clone(),
1013 )
1014 .await;
1015
1016 match result {
1017 Ok((output, steps, tool_calls, exit_reason)) => {
1018 let (success, status, error) = match exit_reason {
1019 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1020 AgentLoopExit::MaxStepsReached => (
1021 false,
1022 SubTaskStatus::Failed,
1023 Some(format!("Sub-agent hit max steps ({max_steps})")),
1024 ),
1025 AgentLoopExit::TimedOut => (
1026 false,
1027 SubTaskStatus::TimedOut,
1028 Some(format!("Sub-agent timed out after {timeout_secs}s")),
1029 ),
1030 };
1031
1032 if let Some(ref tx) = event_tx {
1034 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1035 id: subtask_id.clone(),
1036 name: subtask_name.clone(),
1037 status,
1038 agent_name: Some(format!("agent-{}", subtask_id)),
1039 });
1040 if let Some(ref message) = error {
1041 let _ = tx.try_send(SwarmEvent::AgentError {
1042 subtask_id: subtask_id.clone(),
1043 error: message.clone(),
1044 });
1045 }
1046 let _ = tx.try_send(SwarmEvent::AgentOutput {
1047 subtask_id: subtask_id.clone(),
1048 output: output.clone(),
1049 });
1050 let _ = tx.try_send(SwarmEvent::AgentComplete {
1051 subtask_id: subtask_id.clone(),
1052 success,
1053 steps,
1054 });
1055 }
1056 Ok((
1057 SubTaskResult {
1058 subtask_id: subtask_id.clone(),
1059 subagent_id: format!("agent-{}", subtask_id),
1060 success,
1061 result: output,
1062 steps,
1063 tool_calls,
1064 execution_time_ms: start.elapsed().as_millis() as u64,
1065 error,
1066 artifacts: Vec::new(),
1067 },
1068 worktree_info,
1069 ))
1070 }
1071 Err(e) => {
1072 if let Some(ref tx) = event_tx {
1074 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1075 id: subtask_id.clone(),
1076 name: subtask_name.clone(),
1077 status: SubTaskStatus::Failed,
1078 agent_name: Some(format!("agent-{}", subtask_id)),
1079 });
1080 let _ = tx.try_send(SwarmEvent::AgentError {
1081 subtask_id: subtask_id.clone(),
1082 error: e.to_string(),
1083 });
1084 let _ = tx.try_send(SwarmEvent::AgentComplete {
1085 subtask_id: subtask_id.clone(),
1086 success: false,
1087 steps: 0,
1088 });
1089 }
1090 Ok((
1091 SubTaskResult {
1092 subtask_id: subtask_id.clone(),
1093 subagent_id: format!("agent-{}", subtask_id),
1094 success: false,
1095 result: String::new(),
1096 steps: 0,
1097 tool_calls: 0,
1098 execution_time_ms: start.elapsed().as_millis() as u64,
1099 error: Some(e.to_string()),
1100 artifacts: Vec::new(),
1101 },
1102 worktree_info,
1103 ))
1104 }
1105 }
1106 });
1107
1108 handles.push((subtask_id_for_handle, handle));
1109 }
1110
1111 let mut results = cached_results;
1113 let auto_merge = self.config.worktree_auto_merge;
1114
1115 for (subtask_id, handle) in handles {
1116 match handle.await {
1117 Ok(Ok((mut result, worktree_info))) => {
1118 if let Some(wt) = worktree_info {
1120 if result.success && auto_merge {
1121 if let Some(ref mgr) = worktree_manager {
1122 match mgr.merge(&wt) {
1123 Ok(merge_result) => {
1124 if merge_result.success {
1125 tracing::info!(
1126 subtask_id = %result.subtask_id,
1127 files_changed = merge_result.files_changed,
1128 "Merged worktree changes successfully"
1129 );
1130 result.result.push_str(&format!(
1131 "\n\n--- Merge Result ---\n{}",
1132 merge_result.summary
1133 ));
1134 } else if merge_result.aborted {
1135 tracing::warn!(
1137 subtask_id = %result.subtask_id,
1138 summary = %merge_result.summary,
1139 "Merge was aborted"
1140 );
1141 result.result.push_str(&format!(
1142 "\n\n--- Merge Aborted ---\n{}",
1143 merge_result.summary
1144 ));
1145 } else {
1146 tracing::warn!(
1147 subtask_id = %result.subtask_id,
1148 conflicts = ?merge_result.conflicts,
1149 "Merge had conflicts"
1150 );
1151 result.result.push_str(&format!(
1152 "\n\n--- Merge Conflicts ---\n{}",
1153 merge_result.summary
1154 ));
1155 }
1156
1157 if let Err(e) = mgr.cleanup(&wt) {
1159 tracing::warn!(
1160 error = %e,
1161 "Failed to cleanup worktree"
1162 );
1163 }
1164 }
1165 Err(e) => {
1166 tracing::error!(
1167 subtask_id = %result.subtask_id,
1168 error = %e,
1169 "Failed to merge worktree"
1170 );
1171 }
1172 }
1173 }
1174 } else if !result.success {
1175 tracing::info!(
1177 subtask_id = %result.subtask_id,
1178 worktree_path = %wt.path.display(),
1179 "Keeping worktree for debugging (task failed)"
1180 );
1181 }
1182 }
1183
1184 if result.success {
1186 if let Some(ref cache_arc) = self.cache {
1187 let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1188 cache_arc.lock().await;
1189 let cache_subtask = SubTask::new(&subtask_id, &result.result);
1191 if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1192 tracing::warn!(
1193 subtask_id = %result.subtask_id,
1194 error = %e,
1195 "Failed to cache subtask result"
1196 );
1197 }
1198 }
1199 }
1200
1201 results.push(result);
1202 }
1203 Ok(Err(e)) => {
1204 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
1205 if let Some(ref tx) = self.event_tx {
1206 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1207 id: subtask_id.clone(),
1208 name: subtask_id.clone(),
1209 status: SubTaskStatus::Failed,
1210 agent_name: Some(format!("agent-{}", subtask_id)),
1211 });
1212 let _ = tx.try_send(SwarmEvent::AgentError {
1213 subtask_id: subtask_id.clone(),
1214 error: e.to_string(),
1215 });
1216 let _ = tx.try_send(SwarmEvent::AgentComplete {
1217 subtask_id: subtask_id.clone(),
1218 success: false,
1219 steps: 0,
1220 });
1221 }
1222 results.push(SubTaskResult {
1223 subtask_id: subtask_id.clone(),
1224 subagent_id: format!("agent-{}", subtask_id),
1225 success: false,
1226 result: String::new(),
1227 steps: 0,
1228 tool_calls: 0,
1229 execution_time_ms: 0,
1230 error: Some(e.to_string()),
1231 artifacts: Vec::new(),
1232 });
1233 }
1234 Err(e) => {
1235 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1236 if let Some(ref tx) = self.event_tx {
1237 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1238 id: subtask_id.clone(),
1239 name: subtask_id.clone(),
1240 status: SubTaskStatus::Failed,
1241 agent_name: Some(format!("agent-{}", subtask_id)),
1242 });
1243 let _ = tx.try_send(SwarmEvent::AgentError {
1244 subtask_id: subtask_id.clone(),
1245 error: format!("Task join error: {}", e),
1246 });
1247 let _ = tx.try_send(SwarmEvent::AgentComplete {
1248 subtask_id: subtask_id.clone(),
1249 success: false,
1250 steps: 0,
1251 });
1252 }
1253 results.push(SubTaskResult {
1254 subtask_id: subtask_id.clone(),
1255 subagent_id: format!("agent-{}", subtask_id),
1256 success: false,
1257 result: String::new(),
1258 steps: 0,
1259 tool_calls: 0,
1260 execution_time_ms: 0,
1261 error: Some(format!("Task join error: {}", e)),
1262 artifacts: Vec::new(),
1263 });
1264 }
1265 }
1266 }
1267
1268 Ok(results)
1269 }
1270
1271 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1273 let mut aggregated = String::new();
1274
1275 for (i, result) in results.iter().enumerate() {
1276 if result.success {
1277 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1278 } else {
1279 aggregated.push_str(&format!(
1280 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1281 i + 1,
1282 result.error.as_deref().unwrap_or("Unknown error")
1283 ));
1284 }
1285 }
1286
1287 Ok(aggregated)
1288 }
1289
1290 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1292 self.execute(task, DecompositionStrategy::None).await
1293 }
1294}
1295
1296pub struct SwarmExecutorBuilder {
1298 config: SwarmConfig,
1299}
1300
1301impl SwarmExecutorBuilder {
1302 pub fn new() -> Self {
1303 Self {
1304 config: SwarmConfig::default(),
1305 }
1306 }
1307
1308 pub fn max_subagents(mut self, max: usize) -> Self {
1309 self.config.max_subagents = max;
1310 self
1311 }
1312
1313 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1314 self.config.max_steps_per_subagent = max;
1315 self
1316 }
1317
1318 pub fn max_total_steps(mut self, max: usize) -> Self {
1319 self.config.max_total_steps = max;
1320 self
1321 }
1322
1323 pub fn timeout_secs(mut self, secs: u64) -> Self {
1324 self.config.subagent_timeout_secs = secs;
1325 self
1326 }
1327
1328 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1329 self.config.parallel_enabled = enabled;
1330 self
1331 }
1332
1333 pub fn build(self) -> SwarmExecutor {
1334 SwarmExecutor::new(self.config)
1335 }
1336}
1337
1338impl Default for SwarmExecutorBuilder {
1339 fn default() -> Self {
1340 Self::new()
1341 }
1342}
1343
1344#[allow(clippy::too_many_arguments)]
1346pub async fn run_agent_loop(
1348 provider: Arc<dyn Provider>,
1349 model: &str,
1350 system_prompt: &str,
1351 user_prompt: &str,
1352 tools: Vec<crate::provider::ToolDefinition>,
1353 registry: Arc<ToolRegistry>,
1354 max_steps: usize,
1355 timeout_secs: u64,
1356 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1357 subtask_id: String,
1358) -> Result<(String, usize, usize, AgentLoopExit)> {
1359 let temperature = 0.7;
1361
1362 tracing::info!(
1363 model = %model,
1364 max_steps = max_steps,
1365 timeout_secs = timeout_secs,
1366 "Sub-agent starting agentic loop"
1367 );
1368 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1369 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1370
1371 let mut messages = vec![
1373 Message {
1374 role: Role::System,
1375 content: vec![ContentPart::Text {
1376 text: system_prompt.to_string(),
1377 }],
1378 },
1379 Message {
1380 role: Role::User,
1381 content: vec![ContentPart::Text {
1382 text: user_prompt.to_string(),
1383 }],
1384 },
1385 ];
1386
1387 let mut steps = 0;
1388 let mut total_tool_calls = 0;
1389 let mut final_output = String::new();
1390
1391 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1392
1393 let exit_reason = loop {
1394 if steps >= max_steps {
1395 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1396 break AgentLoopExit::MaxStepsReached;
1397 }
1398
1399 if Instant::now() > deadline {
1400 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1401 break AgentLoopExit::TimedOut;
1402 }
1403
1404 steps += 1;
1405 tracing::info!(step = steps, "Sub-agent step starting");
1406
1407 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1409
1410 let request = CompletionRequest {
1411 messages: messages.clone(),
1412 tools: tools.clone(),
1413 model: model.to_string(),
1414 temperature: Some(temperature),
1415 top_p: None,
1416 max_tokens: Some(8192),
1417 stop: Vec::new(),
1418 };
1419
1420 let step_start = Instant::now();
1421 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1422 let step_duration = step_start.elapsed();
1423
1424 tracing::info!(
1425 step = steps,
1426 duration_ms = step_duration.as_millis() as u64,
1427 finish_reason = ?response.finish_reason,
1428 prompt_tokens = response.usage.prompt_tokens,
1429 completion_tokens = response.usage.completion_tokens,
1430 "Sub-agent step completed LLM call"
1431 );
1432
1433 let mut text_parts = Vec::new();
1435 let mut tool_calls = Vec::new();
1436
1437 for part in &response.message.content {
1438 match part {
1439 ContentPart::Text { text } => {
1440 text_parts.push(text.clone());
1441 }
1442 ContentPart::ToolCall {
1443 id,
1444 name,
1445 arguments,
1446 } => {
1447 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1448 }
1449 _ => {}
1450 }
1451 }
1452
1453 if !text_parts.is_empty() {
1455 let step_output = text_parts.join("\n");
1456 if !final_output.is_empty() {
1457 final_output.push('\n');
1458 }
1459 final_output.push_str(&step_output);
1460 tracing::info!(
1461 step = steps,
1462 output_len = final_output.len(),
1463 "Sub-agent text output"
1464 );
1465 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1466
1467 if let Some(ref tx) = event_tx {
1469 let preview = if step_output.len() > 500 {
1470 let mut end = 500;
1471 while end > 0 && !step_output.is_char_boundary(end) {
1472 end -= 1;
1473 }
1474 format!("{}...", &step_output[..end])
1475 } else {
1476 step_output.clone()
1477 };
1478 let _ = tx.try_send(SwarmEvent::AgentMessage {
1479 subtask_id: subtask_id.clone(),
1480 entry: AgentMessageEntry {
1481 role: "assistant".to_string(),
1482 content: preview,
1483 is_tool_call: false,
1484 },
1485 });
1486 }
1487 }
1488
1489 if !tool_calls.is_empty() {
1491 tracing::info!(
1492 step = steps,
1493 num_tool_calls = tool_calls.len(),
1494 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1495 "Sub-agent requesting tool calls"
1496 );
1497 }
1498
1499 messages.push(response.message.clone());
1501
1502 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1504 tracing::info!(
1505 steps = steps,
1506 total_tool_calls = total_tool_calls,
1507 "Sub-agent finished"
1508 );
1509 break AgentLoopExit::Completed;
1510 }
1511
1512 let mut tool_results = Vec::new();
1514
1515 for (call_id, tool_name, arguments) in tool_calls {
1516 total_tool_calls += 1;
1517
1518 if let Some(ref tx) = event_tx {
1520 let _ = tx.try_send(SwarmEvent::AgentToolCall {
1521 subtask_id: subtask_id.clone(),
1522 tool_name: tool_name.clone(),
1523 });
1524 }
1525
1526 tracing::info!(
1527 step = steps,
1528 tool_call_id = %call_id,
1529 tool = %tool_name,
1530 "Executing tool"
1531 );
1532 tracing::debug!(
1533 tool = %tool_name,
1534 arguments = %arguments,
1535 "Tool call arguments"
1536 );
1537
1538 let tool_start = Instant::now();
1539 let mut tool_success = true;
1540 let result = if let Some(tool) = registry.get(&tool_name) {
1541 let args: serde_json::Value =
1543 serde_json::from_str(&arguments).unwrap_or_else(|e| {
1544 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1545 serde_json::json!({})
1546 });
1547
1548 match tool.execute(args).await {
1549 Ok(r) => {
1550 if r.success {
1551 tracing::info!(
1552 tool = %tool_name,
1553 duration_ms = tool_start.elapsed().as_millis() as u64,
1554 success = true,
1555 "Tool execution completed"
1556 );
1557 r.output
1558 } else {
1559 tool_success = false;
1560 tracing::warn!(
1561 tool = %tool_name,
1562 error = %r.output,
1563 "Tool returned error"
1564 );
1565 format!("Tool error: {}", r.output)
1566 }
1567 }
1568 Err(e) => {
1569 tool_success = false;
1570 tracing::error!(
1571 tool = %tool_name,
1572 error = %e,
1573 "Tool execution failed"
1574 );
1575 format!("Tool execution failed: {}", e)
1576 }
1577 }
1578 } else {
1579 tool_success = false;
1580 tracing::error!(tool = %tool_name, "Unknown tool requested");
1581 format!("Unknown tool: {}", tool_name)
1582 };
1583
1584 if let Some(ref tx) = event_tx {
1586 let input_preview = if arguments.len() > 200 {
1587 let mut end = 200;
1588 while end > 0 && !arguments.is_char_boundary(end) {
1589 end -= 1;
1590 }
1591 format!("{}...", &arguments[..end])
1592 } else {
1593 arguments.clone()
1594 };
1595 let output_preview = if result.len() > 500 {
1596 let mut end = 500;
1597 while end > 0 && !result.is_char_boundary(end) {
1598 end -= 1;
1599 }
1600 format!("{}...", &result[..end])
1601 } else {
1602 result.clone()
1603 };
1604 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1605 subtask_id: subtask_id.clone(),
1606 detail: AgentToolCallDetail {
1607 tool_name: tool_name.clone(),
1608 input_preview,
1609 output_preview,
1610 success: tool_success,
1611 },
1612 });
1613 }
1614
1615 tracing::debug!(
1616 tool = %tool_name,
1617 result_len = result.len(),
1618 "Tool result"
1619 );
1620
1621 let result = if result.len() > RLM_THRESHOLD_CHARS {
1623 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1625 .await
1626 } else {
1627 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1629 };
1630
1631 tool_results.push((call_id, tool_name, result));
1632 }
1633
1634 for (call_id, _tool_name, result) in tool_results {
1636 messages.push(Message {
1637 role: Role::Tool,
1638 content: vec![ContentPart::ToolResult {
1639 tool_call_id: call_id,
1640 content: result,
1641 }],
1642 });
1643 }
1644
1645 deadline = Instant::now() + Duration::from_secs(timeout_secs);
1647 };
1648
1649 Ok((final_output, steps, total_tool_calls, exit_reason))
1650}