1use anyhow::Result;
8use rig::{
9 client::{CompletionClient, ProviderClient},
10 completion::{Prompt, ToolDefinition},
11 providers::{anthropic, gemini, openai},
12 tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22use crate::agents::debug as agent_debug;
23use crate::agents::provider::{
24 CompletionProfile, apply_completion_params, provider_from_name, resolve_api_key,
25};
26use crate::providers::Provider;
27
28const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
30
31#[derive(Debug, Deserialize, JsonSchema)]
33pub struct ParallelAnalyzeArgs {
34 pub tasks: Vec<String>,
38}
39
40#[derive(Debug, Serialize, Deserialize, Clone)]
42pub struct SubagentResult {
43 pub task: String,
45 pub result: String,
47 pub success: bool,
49 pub error: Option<String>,
51}
52
53#[derive(Debug, Serialize, Deserialize)]
55pub struct ParallelAnalyzeResult {
56 pub results: Vec<SubagentResult>,
58 pub successful: usize,
60 pub failed: usize,
62 pub execution_time_ms: u64,
64}
65
66#[derive(Clone)]
68enum SubagentRunner {
69 OpenAI {
70 client: openai::Client,
71 model: String,
72 additional_params: HashMap<String, String>,
73 },
74 Anthropic {
75 client: anthropic::Client,
76 model: String,
77 additional_params: HashMap<String, String>,
78 },
79 Gemini {
80 client: gemini::Client,
81 model: String,
82 additional_params: HashMap<String, String>,
83 },
84}
85
86impl SubagentRunner {
87 fn new(
88 provider: &str,
89 model: &str,
90 api_key: Option<&str>,
91 additional_params: HashMap<String, String>,
92 ) -> Result<Self> {
93 match provider {
94 "openai" => {
95 let client = Self::resolve_openai_client(api_key)?;
96 Ok(Self::OpenAI {
97 client,
98 model: model.to_string(),
99 additional_params,
100 })
101 }
102 "anthropic" => {
103 let client = Self::resolve_anthropic_client(api_key)?;
104 Ok(Self::Anthropic {
105 client,
106 model: model.to_string(),
107 additional_params,
108 })
109 }
110 "google" | "gemini" => {
111 let client = Self::resolve_gemini_client(api_key)?;
112 Ok(Self::Gemini {
113 client,
114 model: model.to_string(),
115 additional_params,
116 })
117 }
118 _ => Err(anyhow::anyhow!(
119 "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
120 provider
121 )),
122 }
123 }
124
125 fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
130 let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
131 match resolved_key {
132 Some(key) => openai::Client::new(&key)
133 .map_err(|_| {
135 anyhow::anyhow!(
136 "Failed to create OpenAI client: authentication or configuration error"
137 )
138 }),
139 None => openai::Client::from_env()
140 .map_err(|_| anyhow::anyhow!("Failed to create OpenAI client from environment")),
141 }
142 }
143
144 fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
149 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
150 match resolved_key {
151 Some(key) => anthropic::Client::new(&key)
152 .map_err(|_| {
154 anyhow::anyhow!(
155 "Failed to create Anthropic client: authentication or configuration error"
156 )
157 }),
158 None => anthropic::Client::from_env()
159 .map_err(|_| anyhow::anyhow!("Failed to create Anthropic client from environment")),
160 }
161 }
162
163 fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
168 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
169 match resolved_key {
170 Some(key) => gemini::Client::new(&key)
171 .map_err(|_| {
173 anyhow::anyhow!(
174 "Failed to create Gemini client: authentication or configuration error"
175 )
176 }),
177 None => gemini::Client::from_env()
178 .map_err(|_| anyhow::anyhow!("Failed to create Gemini client from environment")),
179 }
180 }
181
182 async fn run_task(&self, task: &str) -> SubagentResult {
183 let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
184 task thoroughly and return a focused summary.\n\n\
185 Guidelines:\n\
186 - Use the available tools to gather necessary information\n\
187 - Focus only on what's asked\n\
188 - Return a clear, structured summary\n\
189 - Be concise but comprehensive";
190
191 let result = match self {
193 Self::OpenAI {
194 client,
195 model,
196 additional_params,
197 } => {
198 let builder = client.agent(model).preamble(preamble);
199 let builder = apply_completion_params(
200 builder,
201 Provider::OpenAI,
202 model,
203 4096,
204 Some(additional_params),
205 CompletionProfile::Subagent,
206 );
207 let agent = crate::attach_core_tools!(builder).build();
208 agent.prompt(task).await
209 }
210 Self::Anthropic {
211 client,
212 model,
213 additional_params,
214 } => {
215 let builder = client.agent(model).preamble(preamble);
216 let builder = apply_completion_params(
217 builder,
218 Provider::Anthropic,
219 model,
220 4096,
221 Some(additional_params),
222 CompletionProfile::Subagent,
223 );
224 let agent = crate::attach_core_tools!(builder).build();
225 agent.prompt(task).await
226 }
227 Self::Gemini {
228 client,
229 model,
230 additional_params,
231 } => {
232 let builder = client.agent(model).preamble(preamble);
233 let builder = apply_completion_params(
234 builder,
235 Provider::Google,
236 model,
237 4096,
238 Some(additional_params),
239 CompletionProfile::Subagent,
240 );
241 let agent = crate::attach_core_tools!(builder).build();
242 agent.prompt(task).await
243 }
244 };
245
246 match result {
247 Ok(response) => SubagentResult {
248 task: task.to_string(),
249 result: response,
250 success: true,
251 error: None,
252 },
253 Err(e) => SubagentResult {
254 task: task.to_string(),
255 result: String::new(),
256 success: false,
257 error: Some(e.to_string()),
258 },
259 }
260 }
261}
262
263pub struct ParallelAnalyze {
266 runner: SubagentRunner,
267 model: String,
268 timeout_secs: u64,
270}
271
272impl ParallelAnalyze {
273 pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
279 Self::with_timeout(
280 provider,
281 model,
282 DEFAULT_SUBAGENT_TIMEOUT_SECS,
283 api_key,
284 None,
285 )
286 }
287
288 pub fn with_timeout(
294 provider: &str,
295 model: &str,
296 timeout_secs: u64,
297 api_key: Option<&str>,
298 additional_params: Option<HashMap<String, String>>,
299 ) -> Result<Self> {
300 let provider_name = provider_from_name(provider)?;
301 let runner = SubagentRunner::new(
304 provider_name.name(),
305 model,
306 api_key,
307 additional_params.unwrap_or_default(),
308 )
309 .map_err(|e| {
310 anyhow::anyhow!(
311 "Failed to create {} runner: {}. Check API key and network connectivity.",
312 provider,
313 e
314 )
315 })?;
316
317 Ok(Self {
318 runner,
319 model: model.to_string(),
320 timeout_secs,
321 })
322 }
323}
324
325crate::define_tool_error!(ParallelAnalyzeError);
327
328impl Tool for ParallelAnalyze {
329 const NAME: &'static str = "parallel_analyze";
330 type Error = ParallelAnalyzeError;
331 type Args = ParallelAnalyzeArgs;
332 type Output = ParallelAnalyzeResult;
333
334 async fn definition(&self, _prompt: String) -> ToolDefinition {
335 ToolDefinition {
336 name: Self::NAME.to_string(),
337 description: "Run multiple analysis tasks in parallel using independent subagents. \
338 Each subagent has its own context window, preventing overflow when \
339 analyzing large changesets. Use this when you have multiple independent \
340 analysis tasks that can run concurrently.\n\n\
341 Best for:\n\
342 - Analyzing different directories/modules separately\n\
343 - Processing many commits in batches\n\
344 - Running different types of analysis (security, performance, style) in parallel\n\n\
345 Each task should be a focused prompt. Results are aggregated and returned."
346 .to_string(),
347 parameters: json!({
348 "type": "object",
349 "properties": {
350 "tasks": {
351 "type": "array",
352 "items": { "type": "string" },
353 "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
354 "minItems": 1,
355 "maxItems": 10
356 }
357 },
358 "required": ["tasks"]
359 }),
360 }
361 }
362
363 #[allow(clippy::cognitive_complexity)]
364 async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
365 use std::time::Instant;
366
367 let start = Instant::now();
368 let tasks = args.tasks;
369 let num_tasks = tasks.len();
370
371 agent_debug::debug_context_management(
372 "ParallelAnalyze",
373 &format!(
374 "Spawning {} subagents (fast model: {})",
375 num_tasks, self.model
376 ),
377 );
378
379 let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
381 Arc::new(Mutex::new(vec![None; num_tasks]));
382
383 let mut handles = Vec::new();
385 let timeout = Duration::from_secs(self.timeout_secs);
386 for (index, task) in tasks.into_iter().enumerate() {
387 let runner = self.runner.clone();
388 let results = Arc::clone(&results);
389 let task_timeout = timeout;
390 let timeout_secs = self.timeout_secs;
391
392 let handle = tokio::spawn(async move {
393 let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
395 {
396 Ok(result) => result,
397 Err(_) => SubagentResult {
398 task: task.clone(),
399 result: String::new(),
400 success: false,
401 error: Some(format!("Task timed out after {} seconds", timeout_secs)),
402 },
403 };
404
405 let mut guard = results.lock().await;
407 guard[index] = Some(result);
408 });
409
410 handles.push(handle);
411 }
412
413 for handle in handles {
415 if let Err(e) = handle.await {
416 agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
417 }
418 }
419
420 #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
421 let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
422
423 let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
425 .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
426 .into_inner()
427 .into_iter()
428 .enumerate()
429 .map(|(i, opt)| {
430 opt.unwrap_or_else(|| SubagentResult {
431 task: format!("Task {}", i),
432 result: String::new(),
433 success: false,
434 error: Some("Task did not complete".to_string()),
435 })
436 })
437 .collect();
438
439 let successful = final_results.iter().filter(|r| r.success).count();
440 let failed = final_results.iter().filter(|r| !r.success).count();
441
442 agent_debug::debug_context_management(
443 "ParallelAnalyze",
444 &format!(
445 "{}/{} successful in {}ms",
446 successful, num_tasks, execution_time_ms
447 ),
448 );
449
450 Ok(ParallelAnalyzeResult {
451 results: final_results,
452 successful,
453 failed,
454 execution_time_ms,
455 })
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462
463 #[test]
464 fn test_parallel_analyze_args_schema() {
465 let schema = schemars::schema_for!(ParallelAnalyzeArgs);
466 let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
467 assert!(json.contains("tasks"));
468 }
469}