1use super::{
7 DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8 orchestrator::Orchestrator,
9 subtask::{SubTask, SubTaskResult},
10};
11
12pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
14use crate::{
15 agent::Agent,
16 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
17 rlm::RlmExecutor,
18 swarm::{SwarmArtifact, SwarmStats},
19 tool::ToolRegistry,
20 worktree::{WorktreeInfo, WorktreeManager},
21};
22use anyhow::Result;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Instant;
26use tokio::sync::RwLock;
27use tokio::time::{Duration, timeout};
28
29const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
31
32const RESPONSE_RESERVE_TOKENS: usize = 8_192;
34
35const TRUNCATION_THRESHOLD: f64 = 0.85;
37
38fn estimate_tokens(text: &str) -> usize {
40 (text.len() as f64 / 3.5).ceil() as usize
44}
45
46fn estimate_message_tokens(message: &Message) -> usize {
48 let mut tokens = 4; for part in &message.content {
51 tokens += match part {
52 ContentPart::Text { text } => estimate_tokens(text),
53 ContentPart::ToolCall {
54 id,
55 name,
56 arguments,
57 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
58 ContentPart::ToolResult {
59 tool_call_id,
60 content,
61 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
62 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, };
64 }
65
66 tokens
67}
68
69fn estimate_total_tokens(messages: &[Message]) -> usize {
71 messages.iter().map(estimate_message_tokens).sum()
72}
73
74fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
82 let target_tokens =
83 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
84
85 let current_tokens = estimate_total_tokens(messages);
86 if current_tokens <= target_tokens {
87 return;
88 }
89
90 tracing::warn!(
91 current_tokens = current_tokens,
92 target_tokens = target_tokens,
93 context_limit = context_limit,
94 "Context approaching limit, truncating conversation history"
95 );
96
97 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
101 if after_tool_truncation <= target_tokens {
102 tracing::info!(
103 old_tokens = current_tokens,
104 new_tokens = after_tool_truncation,
105 "Truncated large tool results, context now within limits"
106 );
107 return;
108 }
109
110 if messages.len() <= 6 {
112 tracing::warn!(
113 tokens = after_tool_truncation,
114 target = target_tokens,
115 "Cannot truncate further - conversation too short"
116 );
117 return;
118 }
119
120 let keep_start = 2;
123 let keep_end = 4;
124 let removable_count = messages.len() - keep_start - keep_end;
125
126 if removable_count == 0 {
127 return;
128 }
129
130 let removed_messages: Vec<_> = messages
132 .drain(keep_start..keep_start + removable_count)
133 .collect();
134 let summary = summarize_removed_messages(&removed_messages);
135
136 messages.insert(
138 keep_start,
139 Message {
140 role: Role::User,
141 content: vec![ContentPart::Text {
142 text: format!(
143 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
144 removed_messages.len(),
145 summary
146 ),
147 }],
148 },
149 );
150
151 let new_tokens = estimate_total_tokens(messages);
152 tracing::info!(
153 removed_messages = removed_messages.len(),
154 old_tokens = current_tokens,
155 new_tokens = new_tokens,
156 "Truncated conversation history"
157 );
158}
159
160fn summarize_removed_messages(messages: &[Message]) -> String {
162 let mut summary = String::new();
163 let mut tool_calls: Vec<String> = Vec::new();
164
165 for msg in messages {
166 for part in &msg.content {
167 if let ContentPart::ToolCall { name, .. } = part {
168 if !tool_calls.contains(name) {
169 tool_calls.push(name.clone());
170 }
171 }
172 }
173 }
174
175 if !tool_calls.is_empty() {
176 summary.push_str(&format!(
177 "Tools used in truncated history: {}",
178 tool_calls.join(", ")
179 ));
180 }
181
182 summary
183}
184
185fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
187 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
189 let mut saved_tokens = 0usize;
190
191 for message in messages.iter_mut() {
192 for part in message.content.iter_mut() {
193 if let ContentPart::ToolResult { content, .. } = part {
194 let tokens = estimate_tokens(content);
195 if tokens > max_tokens_per_result {
196 let old_len = content.len();
197 *content = truncate_single_result(content, char_limit);
198 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
199 if content.len() < old_len {
200 truncated_count += 1;
201 }
202 }
203 }
204 }
205 }
206
207 if truncated_count > 0 {
208 tracing::info!(
209 truncated_count = truncated_count,
210 saved_tokens = saved_tokens,
211 max_tokens_per_result = max_tokens_per_result,
212 "Truncated large tool results"
213 );
214 }
215}
216
217fn truncate_single_result(content: &str, max_chars: usize) -> String {
219 if content.len() <= max_chars {
220 return content.to_string();
221 }
222
223 let break_point = content[..max_chars.min(content.len())]
225 .rfind('\n')
226 .unwrap_or(max_chars.min(content.len()));
227
228 let truncated = format!(
229 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
230 &content[..break_point],
231 content.len(),
232 break_point
233 );
234
235 tracing::debug!(
236 original_len = content.len(),
237 truncated_len = truncated.len(),
238 "Truncated large result"
239 );
240
241 truncated
242}
243
244const RLM_THRESHOLD_CHARS: usize = 50_000;
246
247const SIMPLE_TRUNCATE_CHARS: usize = 6000;
249
250async fn process_large_result_with_rlm(
252 content: &str,
253 tool_name: &str,
254 provider: Arc<dyn Provider>,
255 model: &str,
256) -> String {
257 if content.len() <= SIMPLE_TRUNCATE_CHARS {
258 return content.to_string();
259 }
260
261 if content.len() <= RLM_THRESHOLD_CHARS {
263 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
264 }
265
266 tracing::info!(
268 tool = %tool_name,
269 content_len = content.len(),
270 "Using RLM to process large tool result"
271 );
272
273 let query = format!(
274 "Summarize the key information from this {} output. \
275 Focus on: errors, warnings, important findings, and actionable items. \
276 Be concise but thorough.",
277 tool_name
278 );
279
280 let mut executor =
281 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
282
283 match executor.analyze(&query).await {
284 Ok(result) => {
285 tracing::info!(
286 tool = %tool_name,
287 original_len = content.len(),
288 summary_len = result.answer.len(),
289 iterations = result.iterations,
290 "RLM summarized large result"
291 );
292
293 format!(
294 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
295 tool_name,
296 content.len(),
297 result.answer.len(),
298 result.answer
299 )
300 }
301 Err(e) => {
302 tracing::warn!(
303 tool = %tool_name,
304 error = %e,
305 "RLM analysis failed, falling back to truncation"
306 );
307 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
308 }
309 }
310}
311
312pub struct SwarmExecutor {
314 config: SwarmConfig,
315 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
317}
318
319impl SwarmExecutor {
320 pub fn new(config: SwarmConfig) -> Self {
322 Self {
323 config,
324 coordinator_agent: None,
325 }
326 }
327
328 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
330 tracing::debug!("Setting coordinator agent for swarm execution");
331 self.coordinator_agent = Some(agent);
332 self
333 }
334
335 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
337 self.coordinator_agent.as_ref()
338 }
339
340 pub async fn execute(
342 &self,
343 task: &str,
344 strategy: DecompositionStrategy,
345 ) -> Result<SwarmResult> {
346 let start_time = Instant::now();
347
348 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
350
351 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
352
353 let subtasks = orchestrator.decompose(task, strategy).await?;
355
356 if subtasks.is_empty() {
357 return Ok(SwarmResult {
358 success: false,
359 result: String::new(),
360 subtask_results: Vec::new(),
361 stats: SwarmStats::default(),
362 artifacts: Vec::new(),
363 error: Some("No subtasks generated".to_string()),
364 });
365 }
366
367 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
368
369 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
371 let mut all_results: Vec<SubTaskResult> = Vec::new();
372 let artifacts: Vec<SwarmArtifact> = Vec::new();
373
374 let completed_results: Arc<RwLock<HashMap<String, String>>> =
376 Arc::new(RwLock::new(HashMap::new()));
377
378 for stage in 0..=max_stage {
379 let stage_start = Instant::now();
380
381 let stage_subtasks: Vec<SubTask> = orchestrator
382 .subtasks_for_stage(stage)
383 .into_iter()
384 .cloned()
385 .collect();
386
387 tracing::debug!(
388 "Stage {} has {} subtasks (max_stage={})",
389 stage,
390 stage_subtasks.len(),
391 max_stage
392 );
393
394 if stage_subtasks.is_empty() {
395 continue;
396 }
397
398 tracing::info!(
399 provider_name = %orchestrator.provider(),
400 "Executing stage {} with {} subtasks",
401 stage,
402 stage_subtasks.len()
403 );
404
405 let stage_results = self
407 .execute_stage(&orchestrator, stage_subtasks, completed_results.clone())
408 .await?;
409
410 {
412 let mut completed = completed_results.write().await;
413 for result in &stage_results {
414 completed.insert(result.subtask_id.clone(), result.result.clone());
415 }
416 }
417
418 let stage_time = stage_start.elapsed().as_millis() as u64;
420 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
421 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
422
423 orchestrator.stats_mut().stages.push(StageStats {
424 stage,
425 subagent_count: stage_results.len(),
426 max_steps,
427 total_steps,
428 execution_time_ms: stage_time,
429 });
430
431 for result in &stage_results {
433 orchestrator.complete_subtask(&result.subtask_id, result.clone());
434 }
435
436 all_results.extend(stage_results);
437 }
438
439 let provider_name = orchestrator.provider().to_string();
441
442 let stats = orchestrator.stats_mut();
444 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
445 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
446 stats.calculate_critical_path();
447 stats.calculate_speedup();
448
449 let success = all_results.iter().all(|r| r.success);
451 let result = self.aggregate_results(&all_results).await?;
452
453 tracing::info!(
454 provider_name = %provider_name,
455 "Swarm execution complete: {} subtasks, {:.1}x speedup",
456 all_results.len(),
457 stats.speedup_factor
458 );
459
460 Ok(SwarmResult {
461 success,
462 result,
463 subtask_results: all_results,
464 stats: orchestrator.stats().clone(),
465 artifacts,
466 error: None,
467 })
468 }
469
470 async fn execute_stage(
472 &self,
473 orchestrator: &Orchestrator,
474 subtasks: Vec<SubTask>,
475 completed_results: Arc<RwLock<HashMap<String, String>>>,
476 ) -> Result<Vec<SubTaskResult>> {
477 let mut handles: Vec<
478 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
479 > = Vec::new();
480
481 let semaphore = Arc::new(tokio::sync::Semaphore::new(
483 self.config.max_concurrent_requests,
484 ));
485 let delay_ms = self.config.request_delay_ms;
486
487 let model = orchestrator.model().to_string();
489 let provider_name = orchestrator.provider().to_string();
490 let providers = orchestrator.providers();
491 let provider = providers
492 .get(&provider_name)
493 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
494
495 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
496
497 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
499 let tool_definitions: Vec<_> = tool_registry
501 .definitions()
502 .into_iter()
503 .filter(|t| t.name != "question")
504 .collect();
505
506 let worktree_manager = if self.config.worktree_enabled {
508 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
509 std::env::current_dir()
510 .map(|p| p.to_string_lossy().to_string())
511 .unwrap_or_else(|_| ".".to_string())
512 });
513
514 match WorktreeManager::new(&working_dir) {
515 Ok(mgr) => {
516 tracing::info!(
517 working_dir = %working_dir,
518 "Worktree isolation enabled for parallel sub-agents"
519 );
520 Some(Arc::new(mgr) as Arc<WorktreeManager>)
521 }
522 Err(e) => {
523 tracing::warn!(
524 error = %e,
525 "Failed to create worktree manager, falling back to shared directory"
526 );
527 None
528 }
529 }
530 } else {
531 None
532 };
533
534 for (idx, subtask) in subtasks.into_iter().enumerate() {
535 let model = model.clone();
536 let _provider_name = provider_name.clone();
537 let provider = Arc::clone(&provider);
538
539 let context = {
541 let completed = completed_results.read().await;
542 let mut dep_context = String::new();
543 for dep_id in &subtask.dependencies {
544 if let Some(result) = completed.get(dep_id) {
545 dep_context.push_str(&format!(
546 "\n--- Result from dependency {} ---\n{}\n",
547 dep_id, result
548 ));
549 }
550 }
551 dep_context
552 };
553
554 let instruction = subtask.instruction.clone();
555 let specialty = subtask.specialty.clone().unwrap_or_default();
556 let subtask_id = subtask.id.clone();
557 let max_steps = self.config.max_steps_per_subagent;
558 let timeout_secs = self.config.subagent_timeout_secs;
559
560 let tools = tool_definitions.clone();
562 let registry = Arc::clone(&tool_registry);
563 let sem = Arc::clone(&semaphore);
564 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
566
567 let handle = tokio::spawn(async move {
569 if stagger_delay > 0 {
571 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
572 }
573 let _permit = sem.acquire().await.expect("semaphore closed");
574
575 let start = Instant::now();
576
577 let worktree_info = if let Some(ref mgr) = worktree_mgr {
579 let task_slug = subtask_id.replace("-", "_");
580 match mgr.create(&task_slug) {
581 Ok(wt) => {
582 tracing::info!(
583 subtask_id = %subtask_id,
584 worktree_path = %wt.path.display(),
585 worktree_branch = %wt.branch,
586 "Created worktree for sub-agent"
587 );
588 Some(wt)
589 }
590 Err(e) => {
591 tracing::warn!(
592 subtask_id = %subtask_id,
593 error = %e,
594 "Failed to create worktree, using shared directory"
595 );
596 None
597 }
598 }
599 } else {
600 None
601 };
602
603 let working_dir = worktree_info
605 .as_ref()
606 .map(|wt| wt.path.display().to_string())
607 .unwrap_or_else(|| ".".to_string());
608
609 let working_path = std::path::Path::new(&working_dir);
611 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
612 .map(|(content, _)| {
613 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
614 })
615 .unwrap_or_default();
616
617 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
619 let system_prompt = format!(
620 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
621
622WORKING DIRECTORY: {}
623All file operations should be relative to this directory.
624
625IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
626
627Available tools:
628- read: Read file contents
629- write: Write/create files
630- edit: Edit existing files (search and replace)
631- multiedit: Make multiple edits at once
632- glob: Find files by pattern
633- grep: Search file contents
634- bash: Run shell commands (use cwd: \"{}\" parameter)
635- webfetch: Fetch web pages
636- prd: Generate structured PRD for complex tasks
637- ralph: Run autonomous agent loop on a PRD
638
639COMPLEX TASKS:
640If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
6411. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
6422. Break down into user stories with acceptance criteria
6433. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
6444. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
645
646NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
647
648When done, provide a brief summary of what you accomplished.{agents_md_content}",
649 specialty,
650 subtask_id,
651 working_dir,
652 working_dir,
653 prd_filename,
654 prd_filename,
655 prd_filename
656 );
657
658 let user_prompt = if context.is_empty() {
659 format!("Complete this task:\n\n{}", instruction)
660 } else {
661 format!(
662 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
663 instruction, context
664 )
665 };
666
667 let result = run_agent_loop(
669 provider,
670 &model,
671 &system_prompt,
672 &user_prompt,
673 tools,
674 registry,
675 max_steps,
676 timeout_secs,
677 )
678 .await;
679
680 match result {
681 Ok((output, steps, tool_calls)) => Ok((
682 SubTaskResult {
683 subtask_id: subtask_id.clone(),
684 subagent_id: format!("agent-{}", subtask_id),
685 success: true,
686 result: output,
687 steps,
688 tool_calls,
689 execution_time_ms: start.elapsed().as_millis() as u64,
690 error: None,
691 artifacts: Vec::new(),
692 },
693 worktree_info,
694 )),
695 Err(e) => Ok((
696 SubTaskResult {
697 subtask_id: subtask_id.clone(),
698 subagent_id: format!("agent-{}", subtask_id),
699 success: false,
700 result: String::new(),
701 steps: 0,
702 tool_calls: 0,
703 execution_time_ms: start.elapsed().as_millis() as u64,
704 error: Some(e.to_string()),
705 artifacts: Vec::new(),
706 },
707 worktree_info,
708 )),
709 }
710 });
711
712 handles.push(handle);
713 }
714
715 let mut results = Vec::new();
717 let auto_merge = self.config.worktree_auto_merge;
718
719 for handle in handles {
720 match handle.await {
721 Ok(Ok((mut result, worktree_info))) => {
722 if let Some(wt) = worktree_info {
724 if result.success && auto_merge {
725 if let Some(ref mgr) = worktree_manager {
726 match mgr.merge(&wt) {
727 Ok(merge_result) => {
728 if merge_result.success {
729 tracing::info!(
730 subtask_id = %result.subtask_id,
731 files_changed = merge_result.files_changed,
732 "Merged worktree changes successfully"
733 );
734 result.result.push_str(&format!(
735 "\n\n--- Merge Result ---\n{}",
736 merge_result.summary
737 ));
738 } else {
739 tracing::warn!(
740 subtask_id = %result.subtask_id,
741 conflicts = ?merge_result.conflicts,
742 "Merge had conflicts"
743 );
744 result.result.push_str(&format!(
745 "\n\n--- Merge Conflicts ---\n{}",
746 merge_result.summary
747 ));
748 }
749
750 if let Err(e) = mgr.cleanup(&wt) {
752 tracing::warn!(
753 error = %e,
754 "Failed to cleanup worktree"
755 );
756 }
757 }
758 Err(e) => {
759 tracing::error!(
760 subtask_id = %result.subtask_id,
761 error = %e,
762 "Failed to merge worktree"
763 );
764 }
765 }
766 }
767 } else if !result.success {
768 tracing::info!(
770 subtask_id = %result.subtask_id,
771 worktree_path = %wt.path.display(),
772 "Keeping worktree for debugging (task failed)"
773 );
774 }
775 }
776
777 results.push(result);
778 }
779 Ok(Err(e)) => {
780 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
781 }
782 Err(e) => {
783 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
784 }
785 }
786 }
787
788 Ok(results)
789 }
790
791 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
793 let mut aggregated = String::new();
794
795 for (i, result) in results.iter().enumerate() {
796 if result.success {
797 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
798 } else {
799 aggregated.push_str(&format!(
800 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
801 i + 1,
802 result.error.as_deref().unwrap_or("Unknown error")
803 ));
804 }
805 }
806
807 Ok(aggregated)
808 }
809
810 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
812 self.execute(task, DecompositionStrategy::None).await
813 }
814}
815
816pub struct SwarmExecutorBuilder {
818 config: SwarmConfig,
819}
820
821impl SwarmExecutorBuilder {
822 pub fn new() -> Self {
823 Self {
824 config: SwarmConfig::default(),
825 }
826 }
827
828 pub fn max_subagents(mut self, max: usize) -> Self {
829 self.config.max_subagents = max;
830 self
831 }
832
833 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
834 self.config.max_steps_per_subagent = max;
835 self
836 }
837
838 pub fn max_total_steps(mut self, max: usize) -> Self {
839 self.config.max_total_steps = max;
840 self
841 }
842
843 pub fn timeout_secs(mut self, secs: u64) -> Self {
844 self.config.subagent_timeout_secs = secs;
845 self
846 }
847
848 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
849 self.config.parallel_enabled = enabled;
850 self
851 }
852
853 pub fn build(self) -> SwarmExecutor {
854 SwarmExecutor::new(self.config)
855 }
856}
857
858impl Default for SwarmExecutorBuilder {
859 fn default() -> Self {
860 Self::new()
861 }
862}
863
864#[allow(clippy::too_many_arguments)]
866pub async fn run_agent_loop(
868 provider: Arc<dyn Provider>,
869 model: &str,
870 system_prompt: &str,
871 user_prompt: &str,
872 tools: Vec<crate::provider::ToolDefinition>,
873 registry: Arc<ToolRegistry>,
874 max_steps: usize,
875 timeout_secs: u64,
876) -> Result<(String, usize, usize)> {
877 let temperature = 0.7;
879
880 tracing::info!(
881 model = %model,
882 max_steps = max_steps,
883 timeout_secs = timeout_secs,
884 "Sub-agent starting agentic loop"
885 );
886 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
887 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
888
889 let mut messages = vec![
891 Message {
892 role: Role::System,
893 content: vec![ContentPart::Text {
894 text: system_prompt.to_string(),
895 }],
896 },
897 Message {
898 role: Role::User,
899 content: vec![ContentPart::Text {
900 text: user_prompt.to_string(),
901 }],
902 },
903 ];
904
905 let mut steps = 0;
906 let mut total_tool_calls = 0;
907 let mut final_output = String::new();
908
909 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
910
911 loop {
912 if steps >= max_steps {
913 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
914 break;
915 }
916
917 if Instant::now() > deadline {
918 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
919 break;
920 }
921
922 steps += 1;
923 tracing::info!(step = steps, "Sub-agent step starting");
924
925 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
927
928 let request = CompletionRequest {
929 messages: messages.clone(),
930 tools: tools.clone(),
931 model: model.to_string(),
932 temperature: Some(temperature),
933 top_p: None,
934 max_tokens: Some(8192),
935 stop: Vec::new(),
936 };
937
938 let step_start = Instant::now();
939 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
940 let step_duration = step_start.elapsed();
941
942 tracing::info!(
943 step = steps,
944 duration_ms = step_duration.as_millis() as u64,
945 finish_reason = ?response.finish_reason,
946 prompt_tokens = response.usage.prompt_tokens,
947 completion_tokens = response.usage.completion_tokens,
948 "Sub-agent step completed LLM call"
949 );
950
951 let mut text_parts = Vec::new();
953 let mut tool_calls = Vec::new();
954
955 for part in &response.message.content {
956 match part {
957 ContentPart::Text { text } => {
958 text_parts.push(text.clone());
959 }
960 ContentPart::ToolCall {
961 id,
962 name,
963 arguments,
964 } => {
965 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
966 }
967 _ => {}
968 }
969 }
970
971 if !text_parts.is_empty() {
973 final_output = text_parts.join("\n");
974 tracing::info!(
975 step = steps,
976 output_len = final_output.len(),
977 "Sub-agent text output"
978 );
979 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
980 }
981
982 if !tool_calls.is_empty() {
984 tracing::info!(
985 step = steps,
986 num_tool_calls = tool_calls.len(),
987 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
988 "Sub-agent requesting tool calls"
989 );
990 }
991
992 messages.push(response.message.clone());
994
995 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
997 tracing::info!(
998 steps = steps,
999 total_tool_calls = total_tool_calls,
1000 "Sub-agent finished"
1001 );
1002 break;
1003 }
1004
1005 let mut tool_results = Vec::new();
1007
1008 for (call_id, tool_name, arguments) in tool_calls {
1009 total_tool_calls += 1;
1010
1011 tracing::info!(
1012 step = steps,
1013 tool_call_id = %call_id,
1014 tool = %tool_name,
1015 "Executing tool"
1016 );
1017 tracing::debug!(
1018 tool = %tool_name,
1019 arguments = %arguments,
1020 "Tool call arguments"
1021 );
1022
1023 let tool_start = Instant::now();
1024 let result = if let Some(tool) = registry.get(&tool_name) {
1025 let args: serde_json::Value =
1027 serde_json::from_str(&arguments).unwrap_or_else(|_| serde_json::json!({}));
1028
1029 match tool.execute(args).await {
1030 Ok(r) => {
1031 if r.success {
1032 tracing::info!(
1033 tool = %tool_name,
1034 duration_ms = tool_start.elapsed().as_millis() as u64,
1035 success = true,
1036 "Tool execution completed"
1037 );
1038 r.output
1039 } else {
1040 tracing::warn!(
1041 tool = %tool_name,
1042 error = %r.output,
1043 "Tool returned error"
1044 );
1045 format!("Tool error: {}", r.output)
1046 }
1047 }
1048 Err(e) => {
1049 tracing::error!(
1050 tool = %tool_name,
1051 error = %e,
1052 "Tool execution failed"
1053 );
1054 format!("Tool execution failed: {}", e)
1055 }
1056 }
1057 } else {
1058 tracing::error!(tool = %tool_name, "Unknown tool requested");
1059 format!("Unknown tool: {}", tool_name)
1060 };
1061
1062 tracing::debug!(
1063 tool = %tool_name,
1064 result_len = result.len(),
1065 "Tool result"
1066 );
1067
1068 let result = if result.len() > RLM_THRESHOLD_CHARS {
1070 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1072 .await
1073 } else {
1074 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1076 };
1077
1078 tool_results.push((call_id, tool_name, result));
1079 }
1080
1081 for (call_id, _tool_name, result) in tool_results {
1083 messages.push(Message {
1084 role: Role::Tool,
1085 content: vec![ContentPart::ToolResult {
1086 tool_call_id: call_id,
1087 content: result,
1088 }],
1089 });
1090 }
1091 }
1092
1093 Ok((final_output, steps, total_tool_calls))
1094}