1use anyhow::Result;
8use rig::{
9 client::{CompletionClient, ProviderClient},
10 completion::{Prompt, ToolDefinition},
11 providers::{anthropic, gemini, openai},
12 tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22use crate::agents::debug as agent_debug;
23use crate::agents::provider::{
24 CompletionProfile, anthropic_agent_builder, apply_completion_params, provider_from_name,
25 resolve_api_key,
26};
27use crate::providers::Provider;
28
29const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
31const DEFAULT_SUBAGENT_MAX_TURNS: usize = 20;
32
33#[derive(Debug, Deserialize, JsonSchema)]
35pub struct ParallelAnalyzeArgs {
36 pub tasks: Vec<String>,
40 #[serde(default)]
42 pub max_turns: Option<usize>,
43}
44
45#[derive(Debug, Serialize, Deserialize, Clone)]
47pub struct SubagentResult {
48 pub task: String,
50 pub result: String,
52 pub success: bool,
54 pub error: Option<String>,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
60pub struct ParallelAnalyzeResult {
61 pub results: Vec<SubagentResult>,
63 pub successful: usize,
65 pub failed: usize,
67 pub execution_time_ms: u64,
69}
70
71#[derive(Clone)]
73enum SubagentRunner {
74 OpenAI {
75 client: openai::Client,
76 model: String,
77 additional_params: HashMap<String, String>,
78 },
79 Anthropic {
80 client: anthropic::Client,
81 model: String,
82 additional_params: HashMap<String, String>,
83 },
84 Gemini {
85 client: gemini::Client,
86 model: String,
87 additional_params: HashMap<String, String>,
88 },
89}
90
91impl SubagentRunner {
92 fn new(
93 provider: &str,
94 model: &str,
95 api_key: Option<&str>,
96 additional_params: HashMap<String, String>,
97 ) -> Result<Self> {
98 match provider {
99 "openai" => {
100 let client = Self::resolve_openai_client(api_key)?;
101 Ok(Self::OpenAI {
102 client,
103 model: model.to_string(),
104 additional_params,
105 })
106 }
107 "anthropic" => {
108 let client = Self::resolve_anthropic_client(api_key)?;
109 Ok(Self::Anthropic {
110 client,
111 model: model.to_string(),
112 additional_params,
113 })
114 }
115 "google" | "gemini" => {
116 let client = Self::resolve_gemini_client(api_key)?;
117 Ok(Self::Gemini {
118 client,
119 model: model.to_string(),
120 additional_params,
121 })
122 }
123 _ => Err(anyhow::anyhow!(
124 "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
125 provider
126 )),
127 }
128 }
129
130 fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
135 let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
136 match resolved_key {
137 Some(key) => openai::Client::new(&key)
138 .map_err(|_| {
140 anyhow::anyhow!(
141 "Failed to create OpenAI client: authentication or configuration error"
142 )
143 }),
144 None => openai::Client::from_env()
145 .map_err(|_| anyhow::anyhow!("Failed to create OpenAI client from environment")),
146 }
147 }
148
149 fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
154 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
155 match resolved_key {
156 Some(key) => anthropic::Client::new(&key)
157 .map_err(|_| {
159 anyhow::anyhow!(
160 "Failed to create Anthropic client: authentication or configuration error"
161 )
162 }),
163 None => anthropic::Client::from_env()
164 .map_err(|_| anyhow::anyhow!("Failed to create Anthropic client from environment")),
165 }
166 }
167
168 fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
173 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
174 match resolved_key {
175 Some(key) => gemini::Client::new(&key)
176 .map_err(|_| {
178 anyhow::anyhow!(
179 "Failed to create Gemini client: authentication or configuration error"
180 )
181 }),
182 None => gemini::Client::from_env()
183 .map_err(|_| anyhow::anyhow!("Failed to create Gemini client from environment")),
184 }
185 }
186
187 async fn run_task(&self, task: &str, max_turns: usize) -> SubagentResult {
188 let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
189 task thoroughly and return a focused summary.\n\n\
190 Guidelines:\n\
191 - Use the available tools to gather necessary information\n\
192 - Focus only on what's asked\n\
193 - Return a clear, structured summary\n\
194 - Be concise but comprehensive";
195
196 let result = match self {
198 Self::OpenAI {
199 client,
200 model,
201 additional_params,
202 } => {
203 let builder = client.agent(model).preamble(preamble);
204 let builder = apply_completion_params(
205 builder,
206 Provider::OpenAI,
207 model,
208 4096,
209 Some(additional_params),
210 CompletionProfile::Subagent,
211 );
212 let agent = crate::attach_core_tools!(builder).build();
213 agent.prompt(task).max_turns(max_turns).await
214 }
215 Self::Anthropic {
216 client,
217 model,
218 additional_params,
219 } => {
220 let builder = anthropic_agent_builder(client, model).preamble(preamble);
221 let builder = apply_completion_params(
222 builder,
223 Provider::Anthropic,
224 model,
225 4096,
226 Some(additional_params),
227 CompletionProfile::Subagent,
228 );
229 let agent = crate::attach_core_tools!(builder).build();
230 agent.prompt(task).max_turns(max_turns).await
231 }
232 Self::Gemini {
233 client,
234 model,
235 additional_params,
236 } => {
237 let builder = client.agent(model).preamble(preamble);
238 let builder = apply_completion_params(
239 builder,
240 Provider::Google,
241 model,
242 4096,
243 Some(additional_params),
244 CompletionProfile::Subagent,
245 );
246 let agent = crate::attach_core_tools!(builder).build();
247 agent.prompt(task).max_turns(max_turns).await
248 }
249 };
250
251 match result {
252 Ok(response) => SubagentResult {
253 task: task.to_string(),
254 result: response,
255 success: true,
256 error: None,
257 },
258 Err(e) => SubagentResult {
259 task: task.to_string(),
260 result: format!("Subagent failed: {e}"),
261 success: false,
262 error: Some(e.to_string()),
263 },
264 }
265 }
266}
267
268pub struct ParallelAnalyze {
271 runner: SubagentRunner,
272 model: String,
273 timeout_secs: u64,
275 max_turns: usize,
277}
278
279impl ParallelAnalyze {
280 pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
286 Self::with_timeout(
287 provider,
288 model,
289 DEFAULT_SUBAGENT_TIMEOUT_SECS,
290 api_key,
291 None,
292 )
293 }
294
295 pub fn with_timeout(
301 provider: &str,
302 model: &str,
303 timeout_secs: u64,
304 api_key: Option<&str>,
305 additional_params: Option<HashMap<String, String>>,
306 ) -> Result<Self> {
307 Self::with_limits(
308 provider,
309 model,
310 timeout_secs,
311 DEFAULT_SUBAGENT_MAX_TURNS,
312 api_key,
313 additional_params,
314 )
315 }
316
317 pub fn with_limits(
324 provider: &str,
325 model: &str,
326 timeout_secs: u64,
327 max_turns: usize,
328 api_key: Option<&str>,
329 additional_params: Option<HashMap<String, String>>,
330 ) -> Result<Self> {
331 let provider_name = provider_from_name(provider)?;
332 let runner = SubagentRunner::new(
335 provider_name.name(),
336 model,
337 api_key,
338 additional_params.unwrap_or_default(),
339 )
340 .map_err(|e| {
341 anyhow::anyhow!(
342 "Failed to create {} runner: {}. Check API key and network connectivity.",
343 provider,
344 e
345 )
346 })?;
347
348 Ok(Self {
349 runner,
350 model: model.to_string(),
351 timeout_secs,
352 max_turns: max_turns.clamp(1, 100),
353 })
354 }
355}
356
357crate::define_tool_error!(ParallelAnalyzeError);
359
360impl Tool for ParallelAnalyze {
361 const NAME: &'static str = "parallel_analyze";
362 type Error = ParallelAnalyzeError;
363 type Args = ParallelAnalyzeArgs;
364 type Output = ParallelAnalyzeResult;
365
366 async fn definition(&self, _prompt: String) -> ToolDefinition {
367 ToolDefinition {
368 name: Self::NAME.to_string(),
369 description: "Run multiple analysis tasks in parallel using independent subagents. \
370 Each subagent has its own context window, preventing overflow when \
371 analyzing large changesets. Use this when you have multiple independent \
372 analysis tasks that can run concurrently.\n\n\
373 Best for:\n\
374 - Analyzing different directories/modules separately\n\
375 - Processing many commits in batches\n\
376 - Running different types of analysis (security, performance, style) in parallel\n\n\
377 Each task should be a focused prompt. Results are aggregated and returned."
378 .to_string(),
379 parameters: json!({
380 "type": "object",
381 "properties": {
382 "tasks": {
383 "type": "array",
384 "items": { "type": "string" },
385 "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
386 "minItems": 1,
387 "maxItems": 10
388 },
389 "max_turns": {
390 "type": "integer",
391 "description": "Optional per-subagent turn budget. Increase for broad repository searches; lower it to cap cost or runaway tool loops.",
392 "minimum": 1,
393 "maximum": 100
394 }
395 },
396 "required": ["tasks"]
397 }),
398 }
399 }
400
401 #[allow(clippy::cognitive_complexity)]
402 async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
403 use std::time::Instant;
404
405 let start = Instant::now();
406 let max_turns = args.max_turns.unwrap_or(self.max_turns).clamp(1, 100);
407 let tasks = args.tasks;
408 let num_tasks = tasks.len();
409
410 agent_debug::debug_context_management(
411 "ParallelAnalyze",
412 &format!(
413 "Spawning {} subagents (fast model: {}, max turns: {})",
414 num_tasks, self.model, max_turns
415 ),
416 );
417
418 let initial_results = tasks
419 .iter()
420 .map(|task| {
421 Some(SubagentResult {
422 task: task.clone(),
423 result: "Subagent task did not complete".to_string(),
424 success: false,
425 error: Some("Task did not complete".to_string()),
426 })
427 })
428 .collect();
429 let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
430 Arc::new(Mutex::new(initial_results));
431
432 let mut handles = Vec::new();
434 let timeout = Duration::from_secs(self.timeout_secs);
435 for (index, task) in tasks.into_iter().enumerate() {
436 let runner = self.runner.clone();
437 let results = Arc::clone(&results);
438 let task_timeout = timeout;
439 let timeout_secs = self.timeout_secs;
440 let task_max_turns = max_turns;
441
442 let handle = tokio::spawn(async move {
443 let result = match tokio::time::timeout(
445 task_timeout,
446 runner.run_task(&task, task_max_turns),
447 )
448 .await
449 {
450 Ok(result) => result,
451 Err(_) => SubagentResult {
452 task: task.clone(),
453 result: format!("Subagent timed out after {timeout_secs} seconds"),
454 success: false,
455 error: Some(format!("Task timed out after {} seconds", timeout_secs)),
456 },
457 };
458
459 let mut guard = results.lock().await;
461 guard[index] = Some(result);
462 });
463
464 handles.push(handle);
465 }
466
467 for handle in handles {
469 if let Err(e) = handle.await {
470 agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
471 }
472 }
473
474 #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
475 let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
476
477 let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
479 .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
480 .into_inner()
481 .into_iter()
482 .enumerate()
483 .map(|(i, opt)| {
484 opt.unwrap_or_else(|| SubagentResult {
485 task: format!("Task {}", i),
486 result: "Subagent task did not complete".to_string(),
487 success: false,
488 error: Some("Task did not complete".to_string()),
489 })
490 })
491 .collect();
492
493 let successful = final_results.iter().filter(|r| r.success).count();
494 let failed = final_results.iter().filter(|r| !r.success).count();
495
496 agent_debug::debug_context_management(
497 "ParallelAnalyze",
498 &format!(
499 "{}/{} successful in {}ms",
500 successful, num_tasks, execution_time_ms
501 ),
502 );
503
504 Ok(ParallelAnalyzeResult {
505 results: final_results,
506 successful,
507 failed,
508 execution_time_ms,
509 })
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 #[test]
518 fn test_parallel_analyze_args_schema() {
519 let schema = schemars::schema_for!(ParallelAnalyzeArgs);
520 let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
521 assert!(json.contains("tasks"));
522 assert!(json.contains("max_turns"));
523 }
524}