1use super::{
7 DecompositionStrategy, StageStats, SwarmConfig, SwarmResult,
8 orchestrator::Orchestrator,
9 subtask::{SubTask, SubTaskResult},
10};
11
12pub use super::{Actor, ActorStatus, Handler, SwarmMessage};
14use crate::{
15 agent::Agent,
16 provider::{CompletionRequest, ContentPart, FinishReason, Message, Provider, Role},
17 rlm::RlmExecutor,
18 swarm::{SwarmArtifact, SwarmStats},
19 tool::ToolRegistry,
20 worktree::{WorktreeInfo, WorktreeManager},
21};
22use anyhow::Result;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Instant;
26use tokio::sync::RwLock;
27use tokio::time::{Duration, timeout};
28
29const DEFAULT_CONTEXT_LIMIT: usize = 256_000;
31
32const RESPONSE_RESERVE_TOKENS: usize = 8_192;
34
35const TRUNCATION_THRESHOLD: f64 = 0.85;
37
38fn estimate_tokens(text: &str) -> usize {
40 (text.len() as f64 / 3.5).ceil() as usize
44}
45
46fn estimate_message_tokens(message: &Message) -> usize {
48 let mut tokens = 4; for part in &message.content {
51 tokens += match part {
52 ContentPart::Text { text } => estimate_tokens(text),
53 ContentPart::ToolCall {
54 id,
55 name,
56 arguments,
57 } => estimate_tokens(id) + estimate_tokens(name) + estimate_tokens(arguments) + 10,
58 ContentPart::ToolResult {
59 tool_call_id,
60 content,
61 } => estimate_tokens(tool_call_id) + estimate_tokens(content) + 6,
62 ContentPart::Image { .. } | ContentPart::File { .. } => 2000, };
64 }
65
66 tokens
67}
68
69fn estimate_total_tokens(messages: &[Message]) -> usize {
71 messages.iter().map(estimate_message_tokens).sum()
72}
73
74fn truncate_messages_to_fit(messages: &mut Vec<Message>, context_limit: usize) {
82 let target_tokens =
83 ((context_limit as f64) * TRUNCATION_THRESHOLD) as usize - RESPONSE_RESERVE_TOKENS;
84
85 let current_tokens = estimate_total_tokens(messages);
86 if current_tokens <= target_tokens {
87 return;
88 }
89
90 tracing::warn!(
91 current_tokens = current_tokens,
92 target_tokens = target_tokens,
93 context_limit = context_limit,
94 "Context approaching limit, truncating conversation history"
95 );
96
97 truncate_large_tool_results(messages, 2000); let after_tool_truncation = estimate_total_tokens(messages);
101 if after_tool_truncation <= target_tokens {
102 tracing::info!(
103 old_tokens = current_tokens,
104 new_tokens = after_tool_truncation,
105 "Truncated large tool results, context now within limits"
106 );
107 return;
108 }
109
110 if messages.len() <= 6 {
112 tracing::warn!(
113 tokens = after_tool_truncation,
114 target = target_tokens,
115 "Cannot truncate further - conversation too short"
116 );
117 return;
118 }
119
120 let keep_start = 2;
123 let keep_end = 4;
124 let removable_count = messages.len() - keep_start - keep_end;
125
126 if removable_count == 0 {
127 return;
128 }
129
130 let removed_messages: Vec<_> = messages
132 .drain(keep_start..keep_start + removable_count)
133 .collect();
134 let summary = summarize_removed_messages(&removed_messages);
135
136 messages.insert(
138 keep_start,
139 Message {
140 role: Role::User,
141 content: vec![ContentPart::Text {
142 text: format!(
143 "[Context truncated: {} earlier messages removed to fit context window]\n{}",
144 removed_messages.len(),
145 summary
146 ),
147 }],
148 },
149 );
150
151 let new_tokens = estimate_total_tokens(messages);
152 tracing::info!(
153 removed_messages = removed_messages.len(),
154 old_tokens = current_tokens,
155 new_tokens = new_tokens,
156 "Truncated conversation history"
157 );
158}
159
160fn summarize_removed_messages(messages: &[Message]) -> String {
162 let mut summary = String::new();
163 let mut tool_calls: Vec<String> = Vec::new();
164
165 for msg in messages {
166 for part in &msg.content {
167 if let ContentPart::ToolCall { name, .. } = part {
168 if !tool_calls.contains(name) {
169 tool_calls.push(name.clone());
170 }
171 }
172 }
173 }
174
175 if !tool_calls.is_empty() {
176 summary.push_str(&format!(
177 "Tools used in truncated history: {}",
178 tool_calls.join(", ")
179 ));
180 }
181
182 summary
183}
184
185fn truncate_large_tool_results(messages: &mut [Message], max_tokens_per_result: usize) {
187 let char_limit = max_tokens_per_result * 3; let mut truncated_count = 0;
189 let mut saved_tokens = 0usize;
190
191 for message in messages.iter_mut() {
192 for part in message.content.iter_mut() {
193 if let ContentPart::ToolResult { content, .. } = part {
194 let tokens = estimate_tokens(content);
195 if tokens > max_tokens_per_result {
196 let old_len = content.len();
197 *content = truncate_single_result(content, char_limit);
198 saved_tokens += tokens.saturating_sub(estimate_tokens(content));
199 if content.len() < old_len {
200 truncated_count += 1;
201 }
202 }
203 }
204 }
205 }
206
207 if truncated_count > 0 {
208 tracing::info!(
209 truncated_count = truncated_count,
210 saved_tokens = saved_tokens,
211 max_tokens_per_result = max_tokens_per_result,
212 "Truncated large tool results"
213 );
214 }
215}
216
217fn truncate_single_result(content: &str, max_chars: usize) -> String {
219 if content.len() <= max_chars {
220 return content.to_string();
221 }
222
223 let break_point = content[..max_chars.min(content.len())]
225 .rfind('\n')
226 .unwrap_or(max_chars.min(content.len()));
227
228 let truncated = format!(
229 "{}...\n\n[OUTPUT TRUNCATED: {} → {} chars to fit context limit]",
230 &content[..break_point],
231 content.len(),
232 break_point
233 );
234
235 tracing::debug!(
236 original_len = content.len(),
237 truncated_len = truncated.len(),
238 "Truncated large result"
239 );
240
241 truncated
242}
243
244const RLM_THRESHOLD_CHARS: usize = 50_000;
246
247const SIMPLE_TRUNCATE_CHARS: usize = 6000;
249
250async fn process_large_result_with_rlm(
252 content: &str,
253 tool_name: &str,
254 provider: Arc<dyn Provider>,
255 model: &str,
256) -> String {
257 if content.len() <= SIMPLE_TRUNCATE_CHARS {
258 return content.to_string();
259 }
260
261 if content.len() <= RLM_THRESHOLD_CHARS {
263 return truncate_single_result(content, SIMPLE_TRUNCATE_CHARS);
264 }
265
266 tracing::info!(
268 tool = %tool_name,
269 content_len = content.len(),
270 "Using RLM to process large tool result"
271 );
272
273 let query = format!(
274 "Summarize the key information from this {} output. \
275 Focus on: errors, warnings, important findings, and actionable items. \
276 Be concise but thorough.",
277 tool_name
278 );
279
280 let mut executor =
281 RlmExecutor::new(content.to_string(), provider, model.to_string()).with_max_iterations(3);
282
283 match executor.analyze(&query).await {
284 Ok(result) => {
285 tracing::info!(
286 tool = %tool_name,
287 original_len = content.len(),
288 summary_len = result.answer.len(),
289 iterations = result.iterations,
290 "RLM summarized large result"
291 );
292
293 format!(
294 "[RLM Summary of {} output ({} chars → {} chars)]\n\n{}",
295 tool_name,
296 content.len(),
297 result.answer.len(),
298 result.answer
299 )
300 }
301 Err(e) => {
302 tracing::warn!(
303 tool = %tool_name,
304 error = %e,
305 "RLM analysis failed, falling back to truncation"
306 );
307 truncate_single_result(content, SIMPLE_TRUNCATE_CHARS)
308 }
309 }
310}
311
312pub struct SwarmExecutor {
314 config: SwarmConfig,
315 coordinator_agent: Option<Arc<tokio::sync::Mutex<Agent>>>,
317}
318
319impl SwarmExecutor {
320 pub fn new(config: SwarmConfig) -> Self {
322 Self {
323 config,
324 coordinator_agent: None,
325 }
326 }
327
328 pub fn with_coordinator_agent(mut self, agent: Arc<tokio::sync::Mutex<Agent>>) -> Self {
330 tracing::debug!("Setting coordinator agent for swarm execution");
331 self.coordinator_agent = Some(agent);
332 self
333 }
334
335 pub fn coordinator_agent(&self) -> Option<&Arc<tokio::sync::Mutex<Agent>>> {
337 self.coordinator_agent.as_ref()
338 }
339
340 pub async fn execute(
342 &self,
343 task: &str,
344 strategy: DecompositionStrategy,
345 ) -> Result<SwarmResult> {
346 let start_time = Instant::now();
347
348 let mut orchestrator = Orchestrator::new(self.config.clone()).await?;
350
351 tracing::info!(provider_name = %orchestrator.provider(), "Starting swarm execution for task");
352
353 let subtasks = orchestrator.decompose(task, strategy).await?;
355
356 if subtasks.is_empty() {
357 return Ok(SwarmResult {
358 success: false,
359 result: String::new(),
360 subtask_results: Vec::new(),
361 stats: SwarmStats::default(),
362 artifacts: Vec::new(),
363 error: Some("No subtasks generated".to_string()),
364 });
365 }
366
367 tracing::info!(provider_name = %orchestrator.provider(), "Task decomposed into {} subtasks", subtasks.len());
368
369 let max_stage = subtasks.iter().map(|s| s.stage).max().unwrap_or(0);
371 let mut all_results: Vec<SubTaskResult> = Vec::new();
372 let artifacts: Vec<SwarmArtifact> = Vec::new();
373
374 let completed_results: Arc<RwLock<HashMap<String, String>>> =
376 Arc::new(RwLock::new(HashMap::new()));
377
378 for stage in 0..=max_stage {
379 let stage_start = Instant::now();
380
381 let stage_subtasks: Vec<SubTask> = orchestrator
382 .subtasks_for_stage(stage)
383 .into_iter()
384 .cloned()
385 .collect();
386
387 tracing::debug!(
388 "Stage {} has {} subtasks (max_stage={})",
389 stage,
390 stage_subtasks.len(),
391 max_stage
392 );
393
394 if stage_subtasks.is_empty() {
395 continue;
396 }
397
398 tracing::info!(
399 provider_name = %orchestrator.provider(),
400 "Executing stage {} with {} subtasks",
401 stage,
402 stage_subtasks.len()
403 );
404
405 let stage_results = self
407 .execute_stage(&orchestrator, stage_subtasks, completed_results.clone())
408 .await?;
409
410 {
412 let mut completed = completed_results.write().await;
413 for result in &stage_results {
414 completed.insert(result.subtask_id.clone(), result.result.clone());
415 }
416 }
417
418 let stage_time = stage_start.elapsed().as_millis() as u64;
420 let max_steps = stage_results.iter().map(|r| r.steps).max().unwrap_or(0);
421 let total_steps: usize = stage_results.iter().map(|r| r.steps).sum();
422
423 orchestrator.stats_mut().stages.push(StageStats {
424 stage,
425 subagent_count: stage_results.len(),
426 max_steps,
427 total_steps,
428 execution_time_ms: stage_time,
429 });
430
431 for result in &stage_results {
433 orchestrator.complete_subtask(&result.subtask_id, result.clone());
434 }
435
436 all_results.extend(stage_results);
437 }
438
439 let provider_name = orchestrator.provider().to_string();
441
442 let stats = orchestrator.stats_mut();
444 stats.execution_time_ms = start_time.elapsed().as_millis() as u64;
445 stats.sequential_time_estimate_ms = all_results.iter().map(|r| r.execution_time_ms).sum();
446 stats.calculate_critical_path();
447 stats.calculate_speedup();
448
449 let success = all_results.iter().all(|r| r.success);
451 let result = self.aggregate_results(&all_results).await?;
452
453 tracing::info!(
454 provider_name = %provider_name,
455 "Swarm execution complete: {} subtasks, {:.1}x speedup",
456 all_results.len(),
457 stats.speedup_factor
458 );
459
460 Ok(SwarmResult {
461 success,
462 result,
463 subtask_results: all_results,
464 stats: orchestrator.stats().clone(),
465 artifacts,
466 error: None,
467 })
468 }
469
470 async fn execute_stage(
472 &self,
473 orchestrator: &Orchestrator,
474 subtasks: Vec<SubTask>,
475 completed_results: Arc<RwLock<HashMap<String, String>>>,
476 ) -> Result<Vec<SubTaskResult>> {
477 let mut handles: Vec<
478 tokio::task::JoinHandle<Result<(SubTaskResult, Option<WorktreeInfo>), anyhow::Error>>,
479 > = Vec::new();
480
481 let semaphore = Arc::new(tokio::sync::Semaphore::new(
483 self.config.max_concurrent_requests,
484 ));
485 let delay_ms = self.config.request_delay_ms;
486
487 let model = orchestrator.model().to_string();
489 let provider_name = orchestrator.provider().to_string();
490 let providers = orchestrator.providers();
491 let provider = providers
492 .get(&provider_name)
493 .ok_or_else(|| anyhow::anyhow!("Provider {} not found", provider_name))?;
494
495 tracing::info!(provider_name = %provider_name, "Selected provider for subtask execution");
496
497 let tool_registry = ToolRegistry::with_provider_arc(Arc::clone(&provider), model.clone());
499 let tool_definitions: Vec<_> = tool_registry
501 .definitions()
502 .into_iter()
503 .filter(|t| t.name != "question")
504 .collect();
505
506 let worktree_manager = if self.config.worktree_enabled {
508 let working_dir = self.config.working_dir.clone().unwrap_or_else(|| {
509 std::env::current_dir()
510 .map(|p| p.to_string_lossy().to_string())
511 .unwrap_or_else(|_| ".".to_string())
512 });
513
514 match WorktreeManager::new(&working_dir) {
515 Ok(mgr) => {
516 tracing::info!(
517 working_dir = %working_dir,
518 "Worktree isolation enabled for parallel sub-agents"
519 );
520 Some(Arc::new(mgr) as Arc<WorktreeManager>)
521 }
522 Err(e) => {
523 tracing::warn!(
524 error = %e,
525 "Failed to create worktree manager, falling back to shared directory"
526 );
527 None
528 }
529 }
530 } else {
531 None
532 };
533
534 for (idx, subtask) in subtasks.into_iter().enumerate() {
535 let model = model.clone();
536 let _provider_name = provider_name.clone();
537 let provider = Arc::clone(&provider);
538
539 let context = {
541 let completed = completed_results.read().await;
542 let mut dep_context = String::new();
543 for dep_id in &subtask.dependencies {
544 if let Some(result) = completed.get(dep_id) {
545 dep_context.push_str(&format!(
546 "\n--- Result from dependency {} ---\n{}\n",
547 dep_id, result
548 ));
549 }
550 }
551 dep_context
552 };
553
554 let instruction = subtask.instruction.clone();
555 let specialty = subtask.specialty.clone().unwrap_or_default();
556 let subtask_id = subtask.id.clone();
557 let max_steps = self.config.max_steps_per_subagent;
558 let timeout_secs = self.config.subagent_timeout_secs;
559
560 let tools = tool_definitions.clone();
562 let registry = Arc::clone(&tool_registry);
563 let sem = Arc::clone(&semaphore);
564 let stagger_delay = delay_ms * idx as u64; let worktree_mgr = worktree_manager.clone();
566
567 let handle = tokio::spawn(async move {
569 if stagger_delay > 0 {
571 tokio::time::sleep(Duration::from_millis(stagger_delay)).await;
572 }
573 let _permit = sem.acquire().await.expect("semaphore closed");
574
575 let start = Instant::now();
576
577 let worktree_info = if let Some(ref mgr) = worktree_mgr {
579 let task_slug = subtask_id.replace("-", "_");
580 match mgr.create(&task_slug) {
581 Ok(wt) => {
582 tracing::info!(
583 subtask_id = %subtask_id,
584 worktree_path = %wt.path.display(),
585 worktree_branch = %wt.branch,
586 "Created worktree for sub-agent"
587 );
588 Some(wt)
589 }
590 Err(e) => {
591 tracing::warn!(
592 subtask_id = %subtask_id,
593 error = %e,
594 "Failed to create worktree, using shared directory"
595 );
596 None
597 }
598 }
599 } else {
600 None
601 };
602
603 let working_dir = worktree_info
605 .as_ref()
606 .map(|wt| wt.path.display().to_string())
607 .unwrap_or_else(|| ".".to_string());
608
609 let working_path = std::path::Path::new(&working_dir);
611 let agents_md_content = crate::agent::builtin::load_agents_md(working_path)
612 .map(|(content, _)| {
613 format!("\n\nPROJECT INSTRUCTIONS (from AGENTS.md):\n{content}")
614 })
615 .unwrap_or_default();
616
617 let prd_filename = format!("prd_{}.json", subtask_id.replace("-", "_"));
619 let system_prompt = format!(
620 "You are a {} specialist sub-agent (ID: {}). You have access to tools to complete your task.
621
622WORKING DIRECTORY: {}
623All file operations should be relative to this directory.
624
625IMPORTANT: You MUST use tools to make changes. Do not just describe what to do - actually do it using the tools available.
626
627Available tools:
628- read: Read file contents
629- write: Write/create files
630- edit: Edit existing files (search and replace)
631- multiedit: Make multiple edits at once
632- glob: Find files by pattern
633- grep: Search file contents
634- bash: Run shell commands (use cwd: \"{}\" parameter)
635- webfetch: Fetch web pages
636- prd: Generate structured PRD for complex tasks
637- ralph: Run autonomous agent loop on a PRD
638
639COMPLEX TASKS:
640If your task is complex and involves multiple implementation steps, use the prd + ralph workflow:
6411. Call prd({{action: 'analyze', task_description: '...'}}) to understand what's needed
6422. Break down into user stories with acceptance criteria
6433. Call prd({{action: 'save', prd_path: '{}', project: '...', feature: '...', stories: [...]}})
6444. Call ralph({{action: 'run', prd_path: '{}'}}) to execute
645
646NOTE: Use your unique PRD file '{}' so parallel agents don't conflict.
647
648When done, provide a brief summary of what you accomplished.{agents_md_content}",
649 specialty,
650 subtask_id,
651 working_dir,
652 working_dir,
653 prd_filename,
654 prd_filename,
655 prd_filename
656 );
657
658 let user_prompt = if context.is_empty() {
659 format!("Complete this task:\n\n{}", instruction)
660 } else {
661 format!(
662 "Complete this task:\n\n{}\n\nContext from prior work:\n{}",
663 instruction, context
664 )
665 };
666
667 let result = run_agent_loop(
669 provider,
670 &model,
671 &system_prompt,
672 &user_prompt,
673 tools,
674 registry,
675 max_steps,
676 timeout_secs,
677 )
678 .await;
679
680 match result {
681 Ok((output, steps, tool_calls)) => Ok((
682 SubTaskResult {
683 subtask_id: subtask_id.clone(),
684 subagent_id: format!("agent-{}", subtask_id),
685 success: true,
686 result: output,
687 steps,
688 tool_calls,
689 execution_time_ms: start.elapsed().as_millis() as u64,
690 error: None,
691 artifacts: Vec::new(),
692 },
693 worktree_info,
694 )),
695 Err(e) => Ok((
696 SubTaskResult {
697 subtask_id: subtask_id.clone(),
698 subagent_id: format!("agent-{}", subtask_id),
699 success: false,
700 result: String::new(),
701 steps: 0,
702 tool_calls: 0,
703 execution_time_ms: start.elapsed().as_millis() as u64,
704 error: Some(e.to_string()),
705 artifacts: Vec::new(),
706 },
707 worktree_info,
708 )),
709 }
710 });
711
712 handles.push(handle);
713 }
714
715 let mut results = Vec::new();
717 let auto_merge = self.config.worktree_auto_merge;
718
719 for handle in handles {
720 match handle.await {
721 Ok(Ok((mut result, worktree_info))) => {
722 if let Some(wt) = worktree_info {
724 if result.success && auto_merge {
725 if let Some(ref mgr) = worktree_manager {
726 match mgr.merge(&wt) {
727 Ok(merge_result) => {
728 if merge_result.success {
729 tracing::info!(
730 subtask_id = %result.subtask_id,
731 files_changed = merge_result.files_changed,
732 "Merged worktree changes successfully"
733 );
734 result.result.push_str(&format!(
735 "\n\n--- Merge Result ---\n{}",
736 merge_result.summary
737 ));
738 } else if merge_result.aborted {
739 tracing::warn!(
741 subtask_id = %result.subtask_id,
742 summary = %merge_result.summary,
743 "Merge was aborted"
744 );
745 result.result.push_str(&format!(
746 "\n\n--- Merge Aborted ---\n{}",
747 merge_result.summary
748 ));
749 } else {
750 tracing::warn!(
751 subtask_id = %result.subtask_id,
752 conflicts = ?merge_result.conflicts,
753 "Merge had conflicts"
754 );
755 result.result.push_str(&format!(
756 "\n\n--- Merge Conflicts ---\n{}",
757 merge_result.summary
758 ));
759 }
760
761 if let Err(e) = mgr.cleanup(&wt) {
763 tracing::warn!(
764 error = %e,
765 "Failed to cleanup worktree"
766 );
767 }
768 }
769 Err(e) => {
770 tracing::error!(
771 subtask_id = %result.subtask_id,
772 error = %e,
773 "Failed to merge worktree"
774 );
775 }
776 }
777 }
778 } else if !result.success {
779 tracing::info!(
781 subtask_id = %result.subtask_id,
782 worktree_path = %wt.path.display(),
783 "Keeping worktree for debugging (task failed)"
784 );
785 }
786 }
787
788 results.push(result);
789 }
790 Ok(Err(e)) => {
791 tracing::error!(provider_name = %provider_name, "Subtask error: {}", e);
792 }
793 Err(e) => {
794 tracing::error!(provider_name = %provider_name, "Task join error: {}", e);
795 }
796 }
797 }
798
799 Ok(results)
800 }
801
802 async fn aggregate_results(&self, results: &[SubTaskResult]) -> Result<String> {
804 let mut aggregated = String::new();
805
806 for (i, result) in results.iter().enumerate() {
807 if result.success {
808 aggregated.push_str(&format!("=== Subtask {} ===\n{}\n\n", i + 1, result.result));
809 } else {
810 aggregated.push_str(&format!(
811 "=== Subtask {} (FAILED) ===\nError: {}\n\n",
812 i + 1,
813 result.error.as_deref().unwrap_or("Unknown error")
814 ));
815 }
816 }
817
818 Ok(aggregated)
819 }
820
821 pub async fn execute_single(&self, task: &str) -> Result<SwarmResult> {
823 self.execute(task, DecompositionStrategy::None).await
824 }
825}
826
827pub struct SwarmExecutorBuilder {
829 config: SwarmConfig,
830}
831
832impl SwarmExecutorBuilder {
833 pub fn new() -> Self {
834 Self {
835 config: SwarmConfig::default(),
836 }
837 }
838
839 pub fn max_subagents(mut self, max: usize) -> Self {
840 self.config.max_subagents = max;
841 self
842 }
843
844 pub fn max_steps_per_subagent(mut self, max: usize) -> Self {
845 self.config.max_steps_per_subagent = max;
846 self
847 }
848
849 pub fn max_total_steps(mut self, max: usize) -> Self {
850 self.config.max_total_steps = max;
851 self
852 }
853
854 pub fn timeout_secs(mut self, secs: u64) -> Self {
855 self.config.subagent_timeout_secs = secs;
856 self
857 }
858
859 pub fn parallel_enabled(mut self, enabled: bool) -> Self {
860 self.config.parallel_enabled = enabled;
861 self
862 }
863
864 pub fn build(self) -> SwarmExecutor {
865 SwarmExecutor::new(self.config)
866 }
867}
868
869impl Default for SwarmExecutorBuilder {
870 fn default() -> Self {
871 Self::new()
872 }
873}
874
875#[allow(clippy::too_many_arguments)]
877pub async fn run_agent_loop(
879 provider: Arc<dyn Provider>,
880 model: &str,
881 system_prompt: &str,
882 user_prompt: &str,
883 tools: Vec<crate::provider::ToolDefinition>,
884 registry: Arc<ToolRegistry>,
885 max_steps: usize,
886 timeout_secs: u64,
887) -> Result<(String, usize, usize)> {
888 let temperature = 0.7;
890
891 tracing::info!(
892 model = %model,
893 max_steps = max_steps,
894 timeout_secs = timeout_secs,
895 "Sub-agent starting agentic loop"
896 );
897 tracing::debug!(system_prompt = %system_prompt, "Sub-agent system prompt");
898 tracing::debug!(user_prompt = %user_prompt, "Sub-agent user prompt");
899
900 let mut messages = vec![
902 Message {
903 role: Role::System,
904 content: vec![ContentPart::Text {
905 text: system_prompt.to_string(),
906 }],
907 },
908 Message {
909 role: Role::User,
910 content: vec![ContentPart::Text {
911 text: user_prompt.to_string(),
912 }],
913 },
914 ];
915
916 let mut steps = 0;
917 let mut total_tool_calls = 0;
918 let mut final_output = String::new();
919
920 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
921
922 loop {
923 if steps >= max_steps {
924 tracing::warn!(max_steps = max_steps, "Sub-agent reached max steps limit");
925 break;
926 }
927
928 if Instant::now() > deadline {
929 tracing::warn!(timeout_secs = timeout_secs, "Sub-agent timed out");
930 break;
931 }
932
933 steps += 1;
934 tracing::info!(step = steps, "Sub-agent step starting");
935
936 truncate_messages_to_fit(&mut messages, DEFAULT_CONTEXT_LIMIT);
938
939 let request = CompletionRequest {
940 messages: messages.clone(),
941 tools: tools.clone(),
942 model: model.to_string(),
943 temperature: Some(temperature),
944 top_p: None,
945 max_tokens: Some(8192),
946 stop: Vec::new(),
947 };
948
949 let step_start = Instant::now();
950 let response = timeout(Duration::from_secs(120), provider.complete(request)).await??;
951 let step_duration = step_start.elapsed();
952
953 tracing::info!(
954 step = steps,
955 duration_ms = step_duration.as_millis() as u64,
956 finish_reason = ?response.finish_reason,
957 prompt_tokens = response.usage.prompt_tokens,
958 completion_tokens = response.usage.completion_tokens,
959 "Sub-agent step completed LLM call"
960 );
961
962 let mut text_parts = Vec::new();
964 let mut tool_calls = Vec::new();
965
966 for part in &response.message.content {
967 match part {
968 ContentPart::Text { text } => {
969 text_parts.push(text.clone());
970 }
971 ContentPart::ToolCall {
972 id,
973 name,
974 arguments,
975 } => {
976 tool_calls.push((id.clone(), name.clone(), arguments.clone()));
977 }
978 _ => {}
979 }
980 }
981
982 if !text_parts.is_empty() {
984 final_output = text_parts.join("\n");
985 tracing::info!(
986 step = steps,
987 output_len = final_output.len(),
988 "Sub-agent text output"
989 );
990 tracing::debug!(step = steps, output = %final_output, "Sub-agent full output");
991 }
992
993 if !tool_calls.is_empty() {
995 tracing::info!(
996 step = steps,
997 num_tool_calls = tool_calls.len(),
998 tools = ?tool_calls.iter().map(|(_, name, _)| name.as_str()).collect::<Vec<_>>(),
999 "Sub-agent requesting tool calls"
1000 );
1001 }
1002
1003 messages.push(response.message.clone());
1005
1006 if response.finish_reason != FinishReason::ToolCalls || tool_calls.is_empty() {
1008 tracing::info!(
1009 steps = steps,
1010 total_tool_calls = total_tool_calls,
1011 "Sub-agent finished"
1012 );
1013 break;
1014 }
1015
1016 let mut tool_results = Vec::new();
1018
1019 for (call_id, tool_name, arguments) in tool_calls {
1020 total_tool_calls += 1;
1021
1022 tracing::info!(
1023 step = steps,
1024 tool_call_id = %call_id,
1025 tool = %tool_name,
1026 "Executing tool"
1027 );
1028 tracing::debug!(
1029 tool = %tool_name,
1030 arguments = %arguments,
1031 "Tool call arguments"
1032 );
1033
1034 let tool_start = Instant::now();
1035 let result = if let Some(tool) = registry.get(&tool_name) {
1036 let args: serde_json::Value =
1038 serde_json::from_str(&arguments).unwrap_or_else(|_| serde_json::json!({}));
1039
1040 match tool.execute(args).await {
1041 Ok(r) => {
1042 if r.success {
1043 tracing::info!(
1044 tool = %tool_name,
1045 duration_ms = tool_start.elapsed().as_millis() as u64,
1046 success = true,
1047 "Tool execution completed"
1048 );
1049 r.output
1050 } else {
1051 tracing::warn!(
1052 tool = %tool_name,
1053 error = %r.output,
1054 "Tool returned error"
1055 );
1056 format!("Tool error: {}", r.output)
1057 }
1058 }
1059 Err(e) => {
1060 tracing::error!(
1061 tool = %tool_name,
1062 error = %e,
1063 "Tool execution failed"
1064 );
1065 format!("Tool execution failed: {}", e)
1066 }
1067 }
1068 } else {
1069 tracing::error!(tool = %tool_name, "Unknown tool requested");
1070 format!("Unknown tool: {}", tool_name)
1071 };
1072
1073 tracing::debug!(
1074 tool = %tool_name,
1075 result_len = result.len(),
1076 "Tool result"
1077 );
1078
1079 let result = if result.len() > RLM_THRESHOLD_CHARS {
1081 process_large_result_with_rlm(&result, &tool_name, Arc::clone(&provider), model)
1083 .await
1084 } else {
1085 truncate_single_result(&result, SIMPLE_TRUNCATE_CHARS)
1087 };
1088
1089 tool_results.push((call_id, tool_name, result));
1090 }
1091
1092 for (call_id, _tool_name, result) in tool_results {
1094 messages.push(Message {
1095 role: Role::Tool,
1096 content: vec![ContentPart::ToolResult {
1097 tool_call_id: call_id,
1098 content: result,
1099 }],
1100 });
1101 }
1102 }
1103
1104 Ok((final_output, steps, total_tool_calls))
1105}