1use anyhow::Result;
8use rig::{
9 client::{CompletionClient, ProviderClient},
10 completion::{Prompt, ToolDefinition},
11 providers::{anthropic, gemini, openai},
12 tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22use crate::agents::debug as agent_debug;
23use crate::agents::provider::{
24 CompletionProfile, apply_completion_params, provider_from_name, resolve_api_key,
25};
26use crate::providers::Provider;
27
28const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
30
31#[derive(Debug, Deserialize, JsonSchema)]
33pub struct ParallelAnalyzeArgs {
34 pub tasks: Vec<String>,
38}
39
40#[derive(Debug, Serialize, Deserialize, Clone)]
42pub struct SubagentResult {
43 pub task: String,
45 pub result: String,
47 pub success: bool,
49 pub error: Option<String>,
51}
52
53#[derive(Debug, Serialize, Deserialize)]
55pub struct ParallelAnalyzeResult {
56 pub results: Vec<SubagentResult>,
58 pub successful: usize,
60 pub failed: usize,
62 pub execution_time_ms: u64,
64}
65
66#[derive(Clone)]
68enum SubagentRunner {
69 OpenAI {
70 client: openai::Client,
71 model: String,
72 additional_params: HashMap<String, String>,
73 },
74 Anthropic {
75 client: anthropic::Client,
76 model: String,
77 additional_params: HashMap<String, String>,
78 },
79 Gemini {
80 client: gemini::Client,
81 model: String,
82 additional_params: HashMap<String, String>,
83 },
84}
85
86impl SubagentRunner {
87 fn new(
88 provider: &str,
89 model: &str,
90 api_key: Option<&str>,
91 additional_params: HashMap<String, String>,
92 ) -> Result<Self> {
93 match provider {
94 "openai" => {
95 let client = Self::resolve_openai_client(api_key)?;
96 Ok(Self::OpenAI {
97 client,
98 model: model.to_string(),
99 additional_params,
100 })
101 }
102 "anthropic" => {
103 let client = Self::resolve_anthropic_client(api_key)?;
104 Ok(Self::Anthropic {
105 client,
106 model: model.to_string(),
107 additional_params,
108 })
109 }
110 "google" | "gemini" => {
111 let client = Self::resolve_gemini_client(api_key)?;
112 Ok(Self::Gemini {
113 client,
114 model: model.to_string(),
115 additional_params,
116 })
117 }
118 _ => Err(anyhow::anyhow!(
119 "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
120 provider
121 )),
122 }
123 }
124
125 fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
130 let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
131 match resolved_key {
132 Some(key) => openai::Client::new(&key)
133 .map_err(|_| {
135 anyhow::anyhow!(
136 "Failed to create OpenAI client: authentication or configuration error"
137 )
138 }),
139 None => Ok(openai::Client::from_env()),
140 }
141 }
142
143 fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
148 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
149 match resolved_key {
150 Some(key) => anthropic::Client::new(&key)
151 .map_err(|_| {
153 anyhow::anyhow!(
154 "Failed to create Anthropic client: authentication or configuration error"
155 )
156 }),
157 None => Ok(anthropic::Client::from_env()),
158 }
159 }
160
161 fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
166 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
167 match resolved_key {
168 Some(key) => gemini::Client::new(&key)
169 .map_err(|_| {
171 anyhow::anyhow!(
172 "Failed to create Gemini client: authentication or configuration error"
173 )
174 }),
175 None => Ok(gemini::Client::from_env()),
176 }
177 }
178
179 async fn run_task(&self, task: &str) -> SubagentResult {
180 let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
181 task thoroughly and return a focused summary.\n\n\
182 Guidelines:\n\
183 - Use the available tools to gather necessary information\n\
184 - Focus only on what's asked\n\
185 - Return a clear, structured summary\n\
186 - Be concise but comprehensive";
187
188 let result = match self {
190 Self::OpenAI {
191 client,
192 model,
193 additional_params,
194 } => {
195 let builder = client.agent(model).preamble(preamble);
196 let builder = apply_completion_params(
197 builder,
198 Provider::OpenAI,
199 model,
200 4096,
201 Some(additional_params),
202 CompletionProfile::Subagent,
203 );
204 let agent = crate::attach_core_tools!(builder).build();
205 agent.prompt(task).await
206 }
207 Self::Anthropic {
208 client,
209 model,
210 additional_params,
211 } => {
212 let builder = client.agent(model).preamble(preamble);
213 let builder = apply_completion_params(
214 builder,
215 Provider::Anthropic,
216 model,
217 4096,
218 Some(additional_params),
219 CompletionProfile::Subagent,
220 );
221 let agent = crate::attach_core_tools!(builder).build();
222 agent.prompt(task).await
223 }
224 Self::Gemini {
225 client,
226 model,
227 additional_params,
228 } => {
229 let builder = client.agent(model).preamble(preamble);
230 let builder = apply_completion_params(
231 builder,
232 Provider::Google,
233 model,
234 4096,
235 Some(additional_params),
236 CompletionProfile::Subagent,
237 );
238 let agent = crate::attach_core_tools!(builder).build();
239 agent.prompt(task).await
240 }
241 };
242
243 match result {
244 Ok(response) => SubagentResult {
245 task: task.to_string(),
246 result: response,
247 success: true,
248 error: None,
249 },
250 Err(e) => SubagentResult {
251 task: task.to_string(),
252 result: String::new(),
253 success: false,
254 error: Some(e.to_string()),
255 },
256 }
257 }
258}
259
260pub struct ParallelAnalyze {
263 runner: SubagentRunner,
264 model: String,
265 timeout_secs: u64,
267}
268
269impl ParallelAnalyze {
270 pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
276 Self::with_timeout(
277 provider,
278 model,
279 DEFAULT_SUBAGENT_TIMEOUT_SECS,
280 api_key,
281 None,
282 )
283 }
284
285 pub fn with_timeout(
291 provider: &str,
292 model: &str,
293 timeout_secs: u64,
294 api_key: Option<&str>,
295 additional_params: Option<HashMap<String, String>>,
296 ) -> Result<Self> {
297 let provider_name = provider_from_name(provider)?;
298 let runner = SubagentRunner::new(
301 provider_name.name(),
302 model,
303 api_key,
304 additional_params.unwrap_or_default(),
305 )
306 .map_err(|e| {
307 anyhow::anyhow!(
308 "Failed to create {} runner: {}. Check API key and network connectivity.",
309 provider,
310 e
311 )
312 })?;
313
314 Ok(Self {
315 runner,
316 model: model.to_string(),
317 timeout_secs,
318 })
319 }
320}
321
322crate::define_tool_error!(ParallelAnalyzeError);
324
325impl Tool for ParallelAnalyze {
326 const NAME: &'static str = "parallel_analyze";
327 type Error = ParallelAnalyzeError;
328 type Args = ParallelAnalyzeArgs;
329 type Output = ParallelAnalyzeResult;
330
331 async fn definition(&self, _prompt: String) -> ToolDefinition {
332 ToolDefinition {
333 name: Self::NAME.to_string(),
334 description: "Run multiple analysis tasks in parallel using independent subagents. \
335 Each subagent has its own context window, preventing overflow when \
336 analyzing large changesets. Use this when you have multiple independent \
337 analysis tasks that can run concurrently.\n\n\
338 Best for:\n\
339 - Analyzing different directories/modules separately\n\
340 - Processing many commits in batches\n\
341 - Running different types of analysis (security, performance, style) in parallel\n\n\
342 Each task should be a focused prompt. Results are aggregated and returned."
343 .to_string(),
344 parameters: json!({
345 "type": "object",
346 "properties": {
347 "tasks": {
348 "type": "array",
349 "items": { "type": "string" },
350 "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
351 "minItems": 1,
352 "maxItems": 10
353 }
354 },
355 "required": ["tasks"]
356 }),
357 }
358 }
359
360 #[allow(clippy::cognitive_complexity)]
361 async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
362 use std::time::Instant;
363
364 let start = Instant::now();
365 let tasks = args.tasks;
366 let num_tasks = tasks.len();
367
368 agent_debug::debug_context_management(
369 "ParallelAnalyze",
370 &format!(
371 "Spawning {} subagents (fast model: {})",
372 num_tasks, self.model
373 ),
374 );
375
376 let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
378 Arc::new(Mutex::new(vec![None; num_tasks]));
379
380 let mut handles = Vec::new();
382 let timeout = Duration::from_secs(self.timeout_secs);
383 for (index, task) in tasks.into_iter().enumerate() {
384 let runner = self.runner.clone();
385 let results = Arc::clone(&results);
386 let task_timeout = timeout;
387 let timeout_secs = self.timeout_secs;
388
389 let handle = tokio::spawn(async move {
390 let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
392 {
393 Ok(result) => result,
394 Err(_) => SubagentResult {
395 task: task.clone(),
396 result: String::new(),
397 success: false,
398 error: Some(format!("Task timed out after {} seconds", timeout_secs)),
399 },
400 };
401
402 let mut guard = results.lock().await;
404 guard[index] = Some(result);
405 });
406
407 handles.push(handle);
408 }
409
410 for handle in handles {
412 if let Err(e) = handle.await {
413 agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
414 }
415 }
416
417 #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
418 let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
419
420 let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
422 .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
423 .into_inner()
424 .into_iter()
425 .enumerate()
426 .map(|(i, opt)| {
427 opt.unwrap_or_else(|| SubagentResult {
428 task: format!("Task {}", i),
429 result: String::new(),
430 success: false,
431 error: Some("Task did not complete".to_string()),
432 })
433 })
434 .collect();
435
436 let successful = final_results.iter().filter(|r| r.success).count();
437 let failed = final_results.iter().filter(|r| !r.success).count();
438
439 agent_debug::debug_context_management(
440 "ParallelAnalyze",
441 &format!(
442 "{}/{} successful in {}ms",
443 successful, num_tasks, execution_time_ms
444 ),
445 );
446
447 Ok(ParallelAnalyzeResult {
448 results: final_results,
449 successful,
450 failed,
451 execution_time_ms,
452 })
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[test]
461 fn test_parallel_analyze_args_schema() {
462 let schema = schemars::schema_for!(ParallelAnalyzeArgs);
463 let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
464 assert!(json.contains("tasks"));
465 }
466}