1use super::{
7 CacheConfig, CacheStats, DecompositionStrategy, StageStats, SwarmCache, SwarmConfig,
8 SwarmResult,
9 orchestrator::Orchestrator,
10 result_store::ResultStore,
11 subtask::{SubTask, SubTaskResult, SubTaskStatus},
12};
13use crate::bus::{AgentBus, BusMessage};
14use crate::tui::swarm_view::{AgentMessageEntry, AgentToolCallDetail, SubTaskInfo, SwarmEvent};
15
16pub use super::SwarmMessage;
18use crate::{
19 agent::Agent,
20 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
21 rlm::RlmExecutor,
22 swarm::{SwarmArtifact, SwarmStats},
23 telemetry::SwarmTelemetryCollector,
24 tool::ToolRegistry,
25 worktree::{WorktreeInfo, WorktreeManager},
26};
27use anyhow::Result;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::sync::{RwLock, mpsc};
32use tokio::time::{Duration, timeout};
33
34const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
36
37const RESPONSE_RESERVE_TOKENS: usize = 8_192;
39
40const TRUNCATION_THRESHOLD: f64 = 0.85;
42
43fn estimate_tokens(text: &str) -> usize {
45 (text.len() as f64 / 3.5).ceil() as usize
49}
50
51fn estimate_message_tokens(message: &Message) -> usize {
53 let mut tokens = 4; for part in &message.content {
56 tokens += match part {
57 ContentPart::Text { text } => estimate_tokens(text),
58 ContentPart::ToolCall {
59 id,
60 name,
61 arguments,
62 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
63 ContentPart::ToolResult {
64 tool_call_id,
65 content,
66 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
67 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, ContentPart::Thinking { text } => estimate_tokens(text),
69 };
70 }
71
72 tokens
73}
74
75fn estimate_total_tokens(messages: &[Message]) -> usize {
77 messages.iter().map(estimate_message_tokens).sum()
78}
79
80fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
88 let target_tokens =
89 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
90
91 let current_tokens = estimate_total_tokens(messages);
92 if current_tokens <= target_tokens {
93 return;
94 }
95
96 tracing::warn!(
97 current_tokens = current_tokens,
98 target_tokens = target_tokens,
99 context_limit = context_limit,
100 "Context approaching limit, truncating conversation history"
101 );
102
103 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
107 if after_tool_truncation <= target_tokens {
108 tracing::info!(
109 old_tokens = current_tokens,
110 new_tokens = after_tool_truncation,
111 "Truncated large tool results, context now within limits"
112 );
113 return;
114 }
115
116 if messages.len() <= 6 {
118 tracing::warn!(
119 tokens = after_tool_truncation,
120 target = target_tokens,
121 "Cannot truncate further - conversation too short"
122 );
123 return;
124 }
125
126 let keep_start = 2;
129 let keep_end = 4;
130 let removable_count = messages.len() - keep_start - keep_end;
131
132 if removable_count == 0 {
133 return;
134 }
135
136 let removed_messages: Vec<_> = messages
138 .drain(keep_start..keep_start + removable_count)
139 .collect();
140 let summary = summarize_removed_messages(&removed_messages);
141
142 messages.insert(
144 keep_start,
145 Message {
146 role: Role::User,
147 content: vec![ContentPart::Text {
148 text: format!(
149 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
150 removed_messages.len(),
151 summary
152 ),
153 }],
154 },
155 );
156
157 let new_tokens = estimate_total_tokens(messages);
158 tracing::info!(
159 removed_messages = removed_messages.len(),
160 old_tokens = current_tokens,
161 new_tokens = new_tokens,
162 "Truncated conversation history"
163 );
164}
165
166fn summarize_removed_messages(messages: &[Message]) -> String {
168 let mut summary = String::new();
169 let mut tool_calls: Vec<String> = Vec::new();
170
171 for msg in messages {
172 for part in &msg.content {
173 if let ContentPart::ToolCall { name, .. } = part {
174 if !tool_calls.contains(name) {
175 tool_calls.push(name.clone());
176 }
177 }
178 }
179 }
180
181 if !tool_calls.is_empty() {
182 summary.push_str(&format!(
183 "Tools used in truncated history: {}",
184 tool_calls.join(", ")
185 ));
186 }
187
188 summary
189}
190
191fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
193 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
195 let mut saved_tokens = 0usize;
196
197 for message in messages.iter_mut() {
198 for part in message.content.iter_mut() {
199 if let ContentPart::ToolResult { content, .. } = part {
200 let tokens = estimate_tokens(content);
201 if tokens > max_tokens_per_result {
202 let old_len = content.len();
203 *content = truncate_single_result(content, char_limit);
204 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
205 if content.len() < old_len {
206 truncated_count += 1;
207 }
208 }
209 }
210 }
211 }
212
213 if truncated_count > 0 {
214 tracing::info!(
215 truncated_count = truncated_count,
216 saved_tokens = saved_tokens,
217 max_tokens_per_result = max_tokens_per_result,
218 "Truncated large tool results"
219 );
220 }
221}
222
223fn truncate_single_result(content: &str, max_chars: usize) -> String {
225 if content.len() <= max_chars {
226 return content.to_string();
227 }
228
229 let safe_limit = {
231 let mut limit = max_chars.min(content.len());
232 while limit > 0 && !content.is_char_boundary(limit) {
233 limit -= 1;
234 }
235 limit
236 };
237
238 let break_point = content[..safe_limit].rfind('\n').unwrap_or(safe_limit);
240
241 let truncated = format!(
242 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
243 &content[..break_point],
244 content.len(),
245 break_point
246 );
247
248 tracing::debug!(
249 original_len = content.len(),
250 truncated_len = truncated.len(),
251 "Truncated large result"
252 );
253
254 truncated
255}
256
257const RLM_THRESHOLD_CHARS: usize = 50_000;
259
260const SIMPLE_TRUNCATE_CHARS: usize = 6000;
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
264pub enum AgentLoopExit {
265 Completed,
266 MaxStepsReached,
267 TimedOut,
268}
269
270async fn process_large_result_with_rlm(
272 content: &str,
273 tool_name: &str,
274 provider: Arc<dyn Provider>,
275 model: &str,
276) -> String {
277 if content.len() <= SIMPLE_TRUNCATE_CHARS {
278 return content.to_string();
279 }
280
281 if content.len() <= RLM_THRESHOLD_CHARS {
283 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
284 }
285
286 tracing::info!(
288 tool = %tool_name,
289 content_len = content.len(),
290 "Using RLM to process large tool result"
291 );
292
293 let query = format!(
294 "Summarize the key information from this {} output. \
295 Focus on: errors, warnings, important findings, and actionable items. \
296 Be concise but thorough.",
297 tool_name
298 );
299
300 let mut executor =
301 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
302
303 match executor.analyze(&query).await {
304 Ok(result) => {
305 tracing::info!(
306 tool = %tool_name,
307 original_len = content.len(),
308 summary_len = result.answer.len(),
309 iterations = result.iterations,
310 "RLM summarized large result"
311 );
312
313 format!(
314 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
315 tool_name,
316 content.len(),
317 result.answer.len(),
318 result.answer
319 )
320 }
321 Err(e) => {
322 tracing::warn!(
323 tool = %tool_name,
324 error = %e,
325 "RLM analysis failed, falling back to truncation"
326 );
327 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
328 }
329 }
330}
331
332pub struct SwarmExecutor {
334 config: SwarmConfig,
335 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
337 event_tx: Option<mpsc::Sender<SwarmEvent>>,
339 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
341 cache: Option<Arc<tokio::sync::Mutex<SwarmCache>>>,
343 result_store: Arc<ResultStore>,
345 bus: Option<Arc<AgentBus>>,
347}
348
349impl SwarmExecutor {
350 pub fn new(config: SwarmConfig) -> Self {
352 Self {
353 config,
354 coordinator_agent: None,
355 event_tx: None,
356 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
357 cache: None,
358 result_store: ResultStore::new_arc(),
359 bus: None,
360 }
361 }
362
363 pub async fn with_cache(config: SwarmConfig, cache_config: CacheConfig) -> Result<Self> {
365 let cache = SwarmCache::new(cache_config).await?;
366 Ok(Self {
367 config,
368 coordinator_agent: None,
369 event_tx: None,
370 telemetry: Arc::new(tokio::sync::Mutex::new(SwarmTelemetryCollector::default())),
371 cache: Some(Arc::new(tokio::sync::Mutex::new(cache))),
372 result_store: ResultStore::new_arc(),
373 bus: None,
374 })
375 }
376
377 pub fn with_cache_instance(mut self, cache: Arc<tokio::sync::Mutex<SwarmCache>>) -> Self {
379 self.cache = Some(cache);
380 self
381 }
382
383 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
385 self.bus = Some(bus);
386 self
387 }
388
389 pub fn bus(&self) -> Option<&Arc<AgentBus>> {
391 self.bus.as_ref()
392 }
393
394 pub fn with_event_tx(mut self, tx: mpsc::Sender<SwarmEvent>) -> Self {
396 self.event_tx = Some(tx);
397 self
398 }
399
400 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
402 tracing::debug!("Setting coordinator agent for swarm execution");
403 self.coordinator_agent = Some(agent);
404 self
405 }
406
407 pub fn with_telemetry(
409 mut self,
410 telemetry: Arc<tokio::sync::Mutex<SwarmTelemetryCollector>>,
411 ) -> Self {
412 self.telemetry = telemetry;
413 self
414 }
415
416 pub fn telemetry_arc(&self) -> Arc<tokio::sync::Mutex<SwarmTelemetryCollector>> {
418 Arc::clone(&self.telemetry)
419 }
420 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
422 tracing::debug!(
423 has_coordinator = self.coordinator_agent.is_some(),
424 "Getting coordinator agent"
425 );
426 self.coordinator_agent.as_ref()
427 }
428
429 pub fn result_store(&self) -> &Arc<ResultStore> {
431 &self.result_store
432 }
433
434 pub async fn cache_stats(&self) -> Option<CacheStats> {
436 if let Some(ref cache) = self.cache {
437 let cache_guard = cache.lock().await;
438 Some(cache_guard.stats().clone())
439 } else {
440 None
441 }
442 }
443
444 pub async fn clear_cache(&self) -> Result<()> {
446 if let Some(ref cache) = self.cache {
447 let mut cache_guard = cache.lock().await;
448 cache_guard.clear().await?;
449 }
450 Ok(())
451 }
452
453 fn try_send_event(&self, event: SwarmEvent) {
455 if let Some(ref bus) = self.bus {
457 let handle = bus.handle("swarm-executor");
458 match &event {
459 SwarmEvent::Started { task, .. } => {
460 handle.send(
461 "broadcast",
462 BusMessage::AgentReady {
463 agent_id: "swarm-executor".to_string(),
464 capabilities: vec![format!("executing:{task}")],
465 },
466 );
467 }
468 SwarmEvent::Complete { success, .. } => {
469 let state = if *success {
470 crate::a2a::types::TaskState::Completed
471 } else {
472 crate::a2a::types::TaskState::Failed
473 };
474 handle.send_task_update("swarm", state, None);
475 }
476 _ => {} }
478 }
479
480 if let Some(ref tx) = self.event_tx {
481 let _ = tx.try_send(event);
482 }
483 }
484
485 pub async fn execute(
487 &self,
488 task: &str,
489 strategy: DecompositionStrategy,
490 ) -> Result<SwarmResult> {
491 let start_time = Instant::now();
492
493 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
495
496 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
497
498 let subtasks = orchestrator.decompose(task, strategy).await?;
500
501 if subtasks.is_empty() {
502 self.try_send_event(SwarmEvent::Error("No subtasks generated".to_string()));
503 return Ok(SwarmResult {
504 success: false,
505 result: String::new(),
506 subtask_results: Vec::new(),
507 stats: SwarmStats::default(),
508 artifacts: Vec::new(),
509 error: Some("No subtasks generated".to_string()),
510 });
511 }
512
513 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
514
515 self.try_send_event(SwarmEvent::Started {
516 task: task.to_string(),
517 total_subtasks: subtasks.len(),
518 });
519
520 self.try_send_event(SwarmEvent::Decomposed {
522 subtasks: subtasks
523 .iter()
524 .map(|s| SubTaskInfo {
525 id: s.id.clone(),
526 name: s.name.clone(),
527 status: SubTaskStatus::Pending,
528 stage: s.stage,
529 dependencies: s.dependencies.clone(),
530 agent_name: s.specialty.clone(),
531 current_tool: None,
532 steps: 0,
533 max_steps: self.config.max_steps_per_subagent,
534 tool_call_history: Vec::new(),
535 messages: Vec::new(),
536 output: None,
537 error: None,
538 })
539 .collect(),
540 });
541
542 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
544 let mut all_results: Vec<SubTaskResult> = Vec::new();
545 let artifacts: Vec<SwarmArtifact> = Vec::new();
546
547 let swarm_id = uuid::Uuid::new_v4().to_string();
549 self.telemetry.lock().await.start_swarm(
550 swarm_id.clone(),
551 subtasks.len(),
552 &format!("{:?}", strategy),
553 );
554
555 let completed_results: Arc<RwLock<HashMap<String, String>>> =
557 Arc::new(RwLock::new(HashMap::new()));
558
559 for stage in 0..=max_stage {
560 let stage_start = Instant::now();
561
562 let stage_subtasks: Vec<SubTask> = orchestrator
563 .subtasks_for_stage(stage)
564 .into_iter()
565 .cloned()
566 .collect();
567
568 tracing::debug!(
569 "Stage {} has {} subtasks (max_stage={})",
570 stage,
571 stage_subtasks.len(),
572 max_stage
573 );
574
575 if stage_subtasks.is_empty() {
576 continue;
577 }
578
579 tracing::info!(
580 provider_name = %orchestrator.provider(),
581 "Executing stage {} with {} subtasks",
582 stage,
583 stage_subtasks.len()
584 );
585
586 let stage_results = self
588 .execute_stage(
589 &orchestrator,
590 stage_subtasks,
591 completed_results.clone(),
592 &swarm_id,
593 )
594 .await?;
595
596 {
598 let mut completed = completed_results.write().await;
599 for result in &stage_results {
600 completed.insert(result.subtask_id.clone(), result.result.clone());
601 let tags = vec![
603 format!("stage:{stage}"),
604 format!("subtask:{}", result.subtask_id),
605 ];
606 let _ = self
607 .result_store
608 .publish(
609 &result.subtask_id,
610 &result.subagent_id,
611 &result.result,
612 tags,
613 None,
614 )
615 .await;
616 }
617 }
618
619 let stage_time = stage_start.elapsed().as_millis() as u64;
621 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
622 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
623
624 orchestrator.stats_mut().stages.push(StageStats {
625 stage,
626 subagent_count: stage_results.len(),
627 max_steps,
628 total_steps,
629 execution_time_ms: stage_time,
630 });
631
632 for result in &stage_results {
634 orchestrator.complete_subtask(&result.subtask_id, result.clone());
635 }
636
637 let stage_completed = stage_results.iter().filter(|r| r.success).count();
639 let stage_failed = stage_results.iter().filter(|r| !r.success).count();
640 self.try_send_event(SwarmEvent::StageComplete {
641 stage,
642 completed: stage_completed,
643 failed: stage_failed,
644 });
645
646 all_results.extend(stage_results);
647 }
648
649 let provider_name = orchestrator.provider().to_string();
651
652 self.telemetry
654 .lock()
655 .await
656 .record_swarm_latency("total_execution", start_time.elapsed());
657
658 let stats = orchestrator.stats_mut();
660 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
661 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
662 stats.calculate_critical_path();
663 stats.calculate_speedup();
664
665 let success = all_results.iter().all(|r| r.success);
667
668 let _telemetry_metrics = self.telemetry.lock().await.complete_swarm(success);
670 let result = self.aggregate_results(&all_results).await?;
671
672 tracing::info!(
673 provider_name = %provider_name,
674 "Swarm execution complete: {} subtasks, {:.1}x speedup",
675 all_results.len(),
676 stats.speedup_factor
677 );
678
679 let final_stats = orchestrator.stats().clone();
680 self.try_send_event(SwarmEvent::Complete {
681 success,
682 stats: final_stats.clone(),
683 });
684
685 Ok(SwarmResult {
686 success,
687 result,
688 subtask_results: all_results,
689 stats: final_stats,
690 artifacts,
691 error: None,
692 })
693 }
694
695 async fn execute_stage(
697 &self,
698 orchestrator: &Orchestrator,
699 subtasks: Vec<SubTask>,
700 completed_results: Arc<RwLock<HashMap<String, String>>>,
701 swarm_id: &str,
702 ) -> Result<Vec<SubTaskResult>> {
703 let mut handles: Vec<(
704 String,
705 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
706 )> = Vec::new();
707 let mut cached_results: Vec<SubTaskResult> = Vec::new();
708
709 let semaphore = Arc::new(tokio::sync::Semaphore::new(
711 self.config.max_concurrent_requests,
712 ));
713 let delay_ms = self.config.request_delay_ms;
714
715 let model = orchestrator.model().to_string();
717 let provider_name = orchestrator.provider().to_string();
718 let providers = orchestrator.providers();
719 let provider = providers
720 .get(&provider_name)
721 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
722
723 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
724
725 let base_tool_registry =
727 ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
728 let mut tool_definitions: Vec<_> = base_tool_registry
731 .definitions()
732 .into_iter()
733 .filter(|t| t.name != "question")
734 .collect();
735
736 let swarm_share_def = crate::provider::ToolDefinition {
738 name: "swarm_share".to_string(),
739 description: "Share results with other sub-agents in the swarm. Actions: publish \
740 (share a result), get (retrieve a result by key), query_tags (find \
741 results by tags), query_prefix (find results by key prefix), list \
742 (show all shared results)."
743 .to_string(),
744 parameters: serde_json::json!({
745 "type": "object",
746 "properties": {
747 "action": {
748 "type": "string",
749 "enum": ["publish", "get", "query_tags", "query_prefix", "list"],
750 "description": "Action to perform"
751 },
752 "key": {
753 "type": "string",
754 "description": "Result key (for publish/get)"
755 },
756 "value": {
757 "description": "Result value to publish (any JSON value)"
758 },
759 "tags": {
760 "type": "array",
761 "items": {"type": "string"},
762 "description": "Tags for publish or query_tags"
763 },
764 "prefix": {
765 "type": "string",
766 "description": "Key prefix for query_prefix"
767 }
768 },
769 "required": ["action"]
770 }),
771 };
772 tool_definitions.push(swarm_share_def);
773
774 let result_store = Arc::clone(&self.result_store);
776
777 let worktree_manager = if self.config.worktree_enabled {
779 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
780 std::env::current_dir()
781 .map(|p| p.to_string_lossy().to_string())
782 .unwrap_or_else(|_| ".".to_string())
783 });
784
785 match WorktreeManager::new(&working_dir) {
786 Ok(mgr) => {
787 tracing::info!(
788 working_dir = %working_dir,
789 "Worktree isolation enabled for parallel sub-agents"
790 );
791 Some(Arc::new(mgr) as Arc<WorktreeManager>)
792 }
793 Err(e) => {
794 tracing::warn!(
795 error = %e,
796 "Failed to create worktree manager, falling back to shared directory"
797 );
798 None
799 }
800 }
801 } else {
802 None
803 };
804
805 for (idx, subtask) in subtasks.into_iter().enumerate() {
806 let model = model.clone();
807 let _provider_name = provider_name.clone();
808 let provider = Arc::clone(&provider);
809
810 if let Some(ref cache) = self.cache {
812 let mut cache_guard = cache.lock().await;
813 if let Some(cached_result) = cache_guard.get(&subtask).await {
814 tracing::info!(
815 subtask_id = %subtask.id,
816 task_name = %subtask.name,
817 "Cache hit for subtask, skipping execution"
818 );
819 self.try_send_event(SwarmEvent::SubTaskUpdate {
820 id: subtask.id.clone(),
821 name: subtask.name.clone(),
822 status: SubTaskStatus::Completed,
823 agent_name: Some("cached".to_string()),
824 });
825 cached_results.push(cached_result);
826 continue;
827 }
828 }
829
830 let context = {
832 let completed = completed_results.read().await;
833 let mut dep_context = String::new();
834 for dep_id in &subtask.dependencies {
835 if let Some(result) = completed.get(dep_id) {
836 dep_context.push_str(&format!(
837 "\n--- Result from dependency {} ---\n{}\n",
838 dep_id, result
839 ));
840 }
841 }
842 dep_context
843 };
844
845 let instruction = subtask.instruction.clone();
846 let subtask_name = subtask.name.clone();
847 let specialty = subtask.specialty.clone().unwrap_or_default();
848 let subtask_id = subtask.id.clone();
849 let subtask_id_for_handle = subtask_id.clone();
850 let max_steps = self.config.max_steps_per_subagent;
851 let timeout_secs = self.config.subagent_timeout_secs;
852
853 let tools = tool_definitions.clone();
855 let _base_registry = Arc::clone(&base_tool_registry);
856 let agent_result_store = Arc::clone(&result_store);
857 let sem = Arc::clone(&semaphore);
858 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
860 let event_tx = self.event_tx.clone();
861
862 let subagent_id = format!("agent-{}", uuid::Uuid::new_v4());
864
865 tracing::debug!(subagent_id = %subagent_id, swarm_id = %swarm_id, subtask = %subtask_id, specialty = %specialty, "Starting sub-agent");
867
868 let handle = tokio::spawn(async move {
870 if stagger_delay > 0 {
872 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
873 }
874 let _permit = sem
875 .acquire()
876 .await
877 .map_err(|_| anyhow::anyhow!("Swarm execution cancelled"))?;
878
879 let _agent_start = Instant::now();
880
881 let start = Instant::now();
882
883 let worktree_info = if let Some(ref mgr) = worktree_mgr {
885 let task_slug = subtask_id.replace("-", "_");
886 match mgr.create(&task_slug) {
887 Ok(wt) => {
888 tracing::info!(
889 subtask_id = %subtask_id,
890 worktree_path = %wt.path.display(),
891 worktree_branch = %wt.branch,
892 "Created worktree for sub-agent"
893 );
894 Some(wt)
895 }
896 Err(e) => {
897 tracing::warn!(
898 subtask_id = %subtask_id,
899 error = %e,
900 "Failed to create worktree, using shared directory"
901 );
902 None
903 }
904 }
905 } else {
906 None
907 };
908
909 let working_dir = worktree_info
911 .as_ref()
912 .map(|wt| wt.path.display().to_string())
913 .unwrap_or_else(|| ".".to_string());
914
915 let working_path = std::path::Path::new(&working_dir);
917 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
918 .map(|(content, _)| {
919 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
920 })
921 .unwrap_or_default();
922
923 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
925 let system_prompt = format!(
926 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
927
928WORKING DIRECTORY: {}
929All file operations should be relative to this directory.
930
931IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
932
933Available tools:
934- read: Read file contents
935- write: Write/create files
936- edit: Edit existing files (search and replace)
937- multiedit: Make multiple edits at once
938- glob: Find files by pattern
939- grep: Search file contents
940- bash: Run shell commands (use cwd: \"{}\" parameter)
941- webfetch: Fetch web pages
942- prd: Generate structured PRD for complex tasks
943- ralph: Run autonomous agent loop on a PRD
944- swarm_share: Share results with other sub-agents running in parallel
945
946SHARING RESULTS:
947Use swarm_share to collaborate with other sub-agents:
948- swarm_share({{action: 'publish', key: 'my-finding', value: '...', tags: ['research']}}) to share a result
949- swarm_share({{action: 'get', key: 'some-key'}}) to retrieve a result from another agent
950- swarm_share({{action: 'list'}}) to see all shared results
951- swarm_share({{action: 'query_tags', tags: ['research']}}) to find results by tag
952
953COMPLEX TASKS:
954If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
9551. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
9562. Break down into user stories with acceptance criteria
9573. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
9584. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
959
960NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
961
962When done, provide a brief summary of what you accomplished.{agents_md_content}",
963 specialty,
964 subtask_id,
965 working_dir,
966 working_dir,
967 prd_filename,
968 prd_filename,
969 prd_filename
970 );
971
972 let user_prompt = if context.is_empty() {
973 format!("Complete this task:\n\n{}", instruction)
974 } else {
975 format!(
976 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
977 instruction, context
978 )
979 };
980
981 if let Some(ref tx) = event_tx {
983 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
984 id: subtask_id.clone(),
985 name: subtask_name.clone(),
986 status: SubTaskStatus::Running,
987 agent_name: Some(format!("agent-{}", subtask_id)),
988 });
989 let _ = tx.try_send(SwarmEvent::AgentStarted {
990 subtask_id: subtask_id.clone(),
991 agent_name: format!("agent-{}", subtask_id),
992 specialty: specialty.clone(),
993 });
994 }
995
996 let mut agent_registry =
999 ToolRegistry::with_provider(Arc::clone(&provider), model.clone());
1000 agent_registry.register(Arc::new(crate::tool::swarm_share::SwarmShareTool::new(
1001 Arc::clone(&agent_result_store),
1002 subtask_id.clone(),
1003 )));
1004 let registry = Arc::new(agent_registry);
1005
1006 let result = run_agent_loop(
1007 provider,
1008 &model,
1009 &system_prompt,
1010 &user_prompt,
1011 tools,
1012 registry,
1013 max_steps,
1014 timeout_secs,
1015 event_tx.clone(),
1016 subtask_id.clone(),
1017 )
1018 .await;
1019
1020 match result {
1021 Ok((output, steps, tool_calls, exit_reason)) => {
1022 let (success, status, error) = match exit_reason {
1023 AgentLoopExit::Completed => (true, SubTaskStatus::Completed, None),
1024 AgentLoopExit::MaxStepsReached => (
1025 false,
1026 SubTaskStatus::Failed,
1027 Some(format!("Sub-agent hit max steps ({max_steps})")),
1028 ),
1029 AgentLoopExit::TimedOut => (
1030 false,
1031 SubTaskStatus::TimedOut,
1032 Some(format!("Sub-agent timed out after {timeout_secs}s")),
1033 ),
1034 };
1035
1036 if let Some(ref tx) = event_tx {
1038 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1039 id: subtask_id.clone(),
1040 name: subtask_name.clone(),
1041 status,
1042 agent_name: Some(format!("agent-{}", subtask_id)),
1043 });
1044 if let Some(ref message) = error {
1045 let _ = tx.try_send(SwarmEvent::AgentError {
1046 subtask_id: subtask_id.clone(),
1047 error: message.clone(),
1048 });
1049 }
1050 let _ = tx.try_send(SwarmEvent::AgentOutput {
1051 subtask_id: subtask_id.clone(),
1052 output: output.clone(),
1053 });
1054 let _ = tx.try_send(SwarmEvent::AgentComplete {
1055 subtask_id: subtask_id.clone(),
1056 success,
1057 steps,
1058 });
1059 }
1060 Ok((
1061 SubTaskResult {
1062 subtask_id: subtask_id.clone(),
1063 subagent_id: format!("agent-{}", subtask_id),
1064 success,
1065 result: output,
1066 steps,
1067 tool_calls,
1068 execution_time_ms: start.elapsed().as_millis() as u64,
1069 error,
1070 artifacts: Vec::new(),
1071 },
1072 worktree_info,
1073 ))
1074 }
1075 Err(e) => {
1076 if let Some(ref tx) = event_tx {
1078 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1079 id: subtask_id.clone(),
1080 name: subtask_name.clone(),
1081 status: SubTaskStatus::Failed,
1082 agent_name: Some(format!("agent-{}", subtask_id)),
1083 });
1084 let _ = tx.try_send(SwarmEvent::AgentError {
1085 subtask_id: subtask_id.clone(),
1086 error: e.to_string(),
1087 });
1088 let _ = tx.try_send(SwarmEvent::AgentComplete {
1089 subtask_id: subtask_id.clone(),
1090 success: false,
1091 steps: 0,
1092 });
1093 }
1094 Ok((
1095 SubTaskResult {
1096 subtask_id: subtask_id.clone(),
1097 subagent_id: format!("agent-{}", subtask_id),
1098 success: false,
1099 result: String::new(),
1100 steps: 0,
1101 tool_calls: 0,
1102 execution_time_ms: start.elapsed().as_millis() as u64,
1103 error: Some(e.to_string()),
1104 artifacts: Vec::new(),
1105 },
1106 worktree_info,
1107 ))
1108 }
1109 }
1110 });
1111
1112 handles.push((subtask_id_for_handle, handle));
1113 }
1114
1115 let mut results = cached_results;
1117 let auto_merge = self.config.worktree_auto_merge;
1118
1119 for (subtask_id, handle) in handles {
1120 match handle.await {
1121 Ok(Ok((mut result, worktree_info))) => {
1122 if let Some(wt) = worktree_info {
1124 if result.success && auto_merge {
1125 if let Some(ref mgr) = worktree_manager {
1126 match mgr.merge(&wt) {
1127 Ok(merge_result) => {
1128 if merge_result.success {
1129 tracing::info!(
1130 subtask_id = %result.subtask_id,
1131 files_changed = merge_result.files_changed,
1132 "Merged worktree changes successfully"
1133 );
1134 result.result.push_str(&format!(
1135 "\n\n--- Merge Result ---\n{}",
1136 merge_result.summary
1137 ));
1138 } else if merge_result.aborted {
1139 tracing::warn!(
1141 subtask_id = %result.subtask_id,
1142 summary = %merge_result.summary,
1143 "Merge was aborted"
1144 );
1145 result.result.push_str(&format!(
1146 "\n\n--- Merge Aborted ---\n{}",
1147 merge_result.summary
1148 ));
1149 } else {
1150 tracing::warn!(
1151 subtask_id = %result.subtask_id,
1152 conflicts = ?merge_result.conflicts,
1153 "Merge had conflicts"
1154 );
1155 result.result.push_str(&format!(
1156 "\n\n--- Merge Conflicts ---\n{}",
1157 merge_result.summary
1158 ));
1159 }
1160
1161 if let Err(e) = mgr.cleanup(&wt) {
1163 tracing::warn!(
1164 error = %e,
1165 "Failed to cleanup worktree"
1166 );
1167 }
1168 }
1169 Err(e) => {
1170 tracing::error!(
1171 subtask_id = %result.subtask_id,
1172 error = %e,
1173 "Failed to merge worktree"
1174 );
1175 }
1176 }
1177 }
1178 } else if !result.success {
1179 tracing::info!(
1181 subtask_id = %result.subtask_id,
1182 worktree_path = %wt.path.display(),
1183 "Keeping worktree for debugging (task failed)"
1184 );
1185 }
1186 }
1187
1188 if result.success {
1190 if let Some(ref cache_arc) = self.cache {
1191 let mut cache_guard: tokio::sync::MutexGuard<'_, SwarmCache> =
1192 cache_arc.lock().await;
1193 let cache_subtask = SubTask::new(&subtask_id, &result.result);
1195 if let Err(e) = cache_guard.put(&cache_subtask, &result).await {
1196 tracing::warn!(
1197 subtask_id = %result.subtask_id,
1198 error = %e,
1199 "Failed to cache subtask result"
1200 );
1201 }
1202 }
1203 }
1204
1205 results.push(result);
1206 }
1207 Ok(Err(e)) => {
1208 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
1209 if let Some(ref tx) = self.event_tx {
1210 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1211 id: subtask_id.clone(),
1212 name: subtask_id.clone(),
1213 status: SubTaskStatus::Failed,
1214 agent_name: Some(format!("agent-{}", subtask_id)),
1215 });
1216 let _ = tx.try_send(SwarmEvent::AgentError {
1217 subtask_id: subtask_id.clone(),
1218 error: e.to_string(),
1219 });
1220 let _ = tx.try_send(SwarmEvent::AgentComplete {
1221 subtask_id: subtask_id.clone(),
1222 success: false,
1223 steps: 0,
1224 });
1225 }
1226 results.push(SubTaskResult {
1227 subtask_id: subtask_id.clone(),
1228 subagent_id: format!("agent-{}", subtask_id),
1229 success: false,
1230 result: String::new(),
1231 steps: 0,
1232 tool_calls: 0,
1233 execution_time_ms: 0,
1234 error: Some(e.to_string()),
1235 artifacts: Vec::new(),
1236 });
1237 }
1238 Err(e) => {
1239 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
1240 if let Some(ref tx) = self.event_tx {
1241 let _ = tx.try_send(SwarmEvent::SubTaskUpdate {
1242 id: subtask_id.clone(),
1243 name: subtask_id.clone(),
1244 status: SubTaskStatus::Failed,
1245 agent_name: Some(format!("agent-{}", subtask_id)),
1246 });
1247 let _ = tx.try_send(SwarmEvent::AgentError {
1248 subtask_id: subtask_id.clone(),
1249 error: format!("Task join error: {}", e),
1250 });
1251 let _ = tx.try_send(SwarmEvent::AgentComplete {
1252 subtask_id: subtask_id.clone(),
1253 success: false,
1254 steps: 0,
1255 });
1256 }
1257 results.push(SubTaskResult {
1258 subtask_id: subtask_id.clone(),
1259 subagent_id: format!("agent-{}", subtask_id),
1260 success: false,
1261 result: String::new(),
1262 steps: 0,
1263 tool_calls: 0,
1264 execution_time_ms: 0,
1265 error: Some(format!("Task join error: {}", e)),
1266 artifacts: Vec::new(),
1267 });
1268 }
1269 }
1270 }
1271
1272 Ok(results)
1273 }
1274
1275 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
1277 let mut aggregated = String::new();
1278
1279 for (i, result) in results.iter().enumerate() {
1280 if result.success {
1281 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
1282 } else {
1283 aggregated.push_str(&format!(
1284 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
1285 i + 1,
1286 result.error.as_deref().unwrap_or("Unknown error")
1287 ));
1288 }
1289 }
1290
1291 Ok(aggregated)
1292 }
1293
1294 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
1296 self.execute(task, DecompositionStrategy::None).await
1297 }
1298}
1299
1300pub struct SwarmExecutorBuilder {
1302 config: SwarmConfig,
1303}
1304
1305impl SwarmExecutorBuilder {
1306 pub fn new() -> Self {
1307 Self {
1308 config: SwarmConfig::default(),
1309 }
1310 }
1311
1312 pub fn max_subagents(mut self, max: usize) -> Self {
1313 self.config.max_subagents = max;
1314 self
1315 }
1316
1317 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
1318 self.config.max_steps_per_subagent = max;
1319 self
1320 }
1321
1322 pub fn max_total_steps(mut self, max: usize) -> Self {
1323 self.config.max_total_steps = max;
1324 self
1325 }
1326
1327 pub fn timeout_secs(mut self, secs: u64) -> Self {
1328 self.config.subagent_timeout_secs = secs;
1329 self
1330 }
1331
1332 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
1333 self.config.parallel_enabled = enabled;
1334 self
1335 }
1336
1337 pub fn build(self) -> SwarmExecutor {
1338 SwarmExecutor::new(self.config)
1339 }
1340}
1341
1342impl Default for SwarmExecutorBuilder {
1343 fn default() -> Self {
1344 Self::new()
1345 }
1346}
1347
1348#[allow(clippy::too_many_arguments)]
1350pub async fn run_agent_loop(
1352 provider: Arc<dyn Provider>,
1353 model: &str,
1354 system_prompt: &str,
1355 user_prompt: &str,
1356 tools: Vec<crate::provider::ToolDefinition>,
1357 registry: Arc<ToolRegistry>,
1358 max_steps: usize,
1359 timeout_secs: u64,
1360 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1361 subtask_id: String,
1362) -> Result<(String, usize, usize, AgentLoopExit)> {
1363 let temperature = 0.7;
1365
1366 tracing::info!(
1367 model = %model,
1368 max_steps = max_steps,
1369 timeout_secs = timeout_secs,
1370 "Sub-agent starting agentic loop"
1371 );
1372 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
1373 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
1374
1375 let mut messages = vec![
1377 Message {
1378 role: Role::System,
1379 content: vec![ContentPart::Text {
1380 text: system_prompt.to_string(),
1381 }],
1382 },
1383 Message {
1384 role: Role::User,
1385 content: vec![ContentPart::Text {
1386 text: user_prompt.to_string(),
1387 }],
1388 },
1389 ];
1390
1391 let mut steps = 0;
1392 let mut total_tool_calls = 0;
1393 let mut final_output = String::new();
1394
1395 let mut deadline = Instant::now() + Duration::from_secs(timeout_secs);
1396
1397 let exit_reason = loop {
1398 if steps >= max_steps {
1399 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
1400 break AgentLoopExit::MaxStepsReached;
1401 }
1402
1403 if Instant::now() > deadline {
1404 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
1405 break AgentLoopExit::TimedOut;
1406 }
1407
1408 steps += 1;
1409 tracing::info!(step = steps, "Sub-agent step starting");
1410
1411 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
1413
1414 let request = CompletionRequest {
1415 messages: messages.clone(),
1416 tools: tools.clone(),
1417 model: model.to_string(),
1418 temperature: Some(temperature),
1419 top_p: None,
1420 max_tokens: Some(8192),
1421 stop: Vec::new(),
1422 };
1423
1424 let step_start = Instant::now();
1425 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
1426 let step_duration = step_start.elapsed();
1427
1428 tracing::info!(
1429 step = steps,
1430 duration_ms = step_duration.as_millis() as u64,
1431 finish_reason = ?response.finish_reason,
1432 prompt_tokens = response.usage.prompt_tokens,
1433 completion_tokens = response.usage.completion_tokens,
1434 "Sub-agent step completed LLM call"
1435 );
1436
1437 let mut text_parts = Vec::new();
1439 let mut tool_calls = Vec::new();
1440
1441 for part in &response.message.content {
1442 match part {
1443 ContentPart::Text { text } => {
1444 text_parts.push(text.clone());
1445 }
1446 ContentPart::ToolCall {
1447 id,
1448 name,
1449 arguments,
1450 } => {
1451 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
1452 }
1453 _ => {}
1454 }
1455 }
1456
1457 if !text_parts.is_empty() {
1459 let step_output = text_parts.join("\n");
1460 if !final_output.is_empty() {
1461 final_output.push('\n');
1462 }
1463 final_output.push_str(&step_output);
1464 tracing::info!(
1465 step = steps,
1466 output_len = final_output.len(),
1467 "Sub-agent text output"
1468 );
1469 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
1470
1471 if let Some(ref tx) = event_tx {
1473 let preview = if step_output.len() > 500 {
1474 let mut end = 500;
1475 while end > 0 && !step_output.is_char_boundary(end) {
1476 end -= 1;
1477 }
1478 format!("{}...", &step_output[..end])
1479 } else {
1480 step_output.clone()
1481 };
1482 let _ = tx.try_send(SwarmEvent::AgentMessage {
1483 subtask_id: subtask_id.clone(),
1484 entry: AgentMessageEntry {
1485 role: "assistant".to_string(),
1486 content: preview,
1487 is_tool_call: false,
1488 },
1489 });
1490 }
1491 }
1492
1493 if !tool_calls.is_empty() {
1495 tracing::info!(
1496 step = steps,
1497 num_tool_calls = tool_calls.len(),
1498 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
1499 "Sub-agent requesting tool calls"
1500 );
1501 }
1502
1503 messages.push(response.message.clone());
1505
1506 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1508 tracing::info!(
1509 steps = steps,
1510 total_tool_calls = total_tool_calls,
1511 "Sub-agent finished"
1512 );
1513 break AgentLoopExit::Completed;
1514 }
1515
1516 let mut tool_results = Vec::new();
1518
1519 for (call_id, tool_name, arguments) in tool_calls {
1520 total_tool_calls += 1;
1521
1522 if let Some(ref tx) = event_tx {
1524 let _ = tx.try_send(SwarmEvent::AgentToolCall {
1525 subtask_id: subtask_id.clone(),
1526 tool_name: tool_name.clone(),
1527 });
1528 }
1529
1530 tracing::info!(
1531 step = steps,
1532 tool_call_id = %call_id,
1533 tool = %tool_name,
1534 "Executing tool"
1535 );
1536 tracing::debug!(
1537 tool = %tool_name,
1538 arguments = %arguments,
1539 "Tool call arguments"
1540 );
1541
1542 let tool_start = Instant::now();
1543 let mut tool_success = true;
1544 let result = if let Some(tool) = registry.get(&tool_name) {
1545 let args: serde_json::Value =
1547 serde_json::from_str(&arguments).unwrap_or_else(|e| {
1548 tracing::warn!(tool = %tool_name, error = %e, raw = %arguments, "Failed to parse tool arguments");
1549 serde_json::json!({})
1550 });
1551
1552 match tool.execute(args).await {
1553 Ok(r) => {
1554 if r.success {
1555 tracing::info!(
1556 tool = %tool_name,
1557 duration_ms = tool_start.elapsed().as_millis() as u64,
1558 success = true,
1559 "Tool execution completed"
1560 );
1561 r.output
1562 } else {
1563 tool_success = false;
1564 tracing::warn!(
1565 tool = %tool_name,
1566 error = %r.output,
1567 "Tool returned error"
1568 );
1569 format!("Tool error: {}", r.output)
1570 }
1571 }
1572 Err(e) => {
1573 tool_success = false;
1574 tracing::error!(
1575 tool = %tool_name,
1576 error = %e,
1577 "Tool execution failed"
1578 );
1579 format!("Tool execution failed: {}", e)
1580 }
1581 }
1582 } else {
1583 tool_success = false;
1584 tracing::error!(tool = %tool_name, "Unknown tool requested");
1585 format!("Unknown tool: {}", tool_name)
1586 };
1587
1588 if let Some(ref tx) = event_tx {
1590 let input_preview = if arguments.len() > 200 {
1591 let mut end = 200;
1592 while end > 0 && !arguments.is_char_boundary(end) {
1593 end -= 1;
1594 }
1595 format!("{}...", &arguments[..end])
1596 } else {
1597 arguments.clone()
1598 };
1599 let output_preview = if result.len() > 500 {
1600 let mut end = 500;
1601 while end > 0 && !result.is_char_boundary(end) {
1602 end -= 1;
1603 }
1604 format!("{}...", &result[..end])
1605 } else {
1606 result.clone()
1607 };
1608 let _ = tx.try_send(SwarmEvent::AgentToolCallDetail {
1609 subtask_id: subtask_id.clone(),
1610 detail: AgentToolCallDetail {
1611 tool_name: tool_name.clone(),
1612 input_preview,
1613 output_preview,
1614 success: tool_success,
1615 },
1616 });
1617 }
1618
1619 tracing::debug!(
1620 tool = %tool_name,
1621 result_len = result.len(),
1622 "Tool result"
1623 );
1624
1625 let result = if result.len() > RLM_THRESHOLD_CHARS {
1627 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1629 .await
1630 } else {
1631 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1633 };
1634
1635 tool_results.push((call_id, tool_name, result));
1636 }
1637
1638 for (call_id, _tool_name, result) in tool_results {
1640 messages.push(Message {
1641 role: Role::Tool,
1642 content: vec![ContentPart::ToolResult {
1643 tool_call_id: call_id,
1644 content: result,
1645 }],
1646 });
1647 }
1648
1649 deadline = Instant::now() + Duration::from_secs(timeout_secs);
1651 };
1652
1653 Ok((final_output, steps, total_tool_calls, exit_reason))
1654}