1use anyhow::Result;
8use rig::{
9 client::{CompletionClient, ProviderClient},
10 completion::{Prompt, ToolDefinition},
11 providers::{anthropic, gemini, openai},
12 tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Mutex;
20
21use crate::agents::debug as agent_debug;
22use crate::agents::provider::resolve_api_key;
23use crate::providers::Provider;
24
25const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
27
28#[derive(Debug, Deserialize, JsonSchema)]
30pub struct ParallelAnalyzeArgs {
31 pub tasks: Vec<String>,
35}
36
37#[derive(Debug, Serialize, Deserialize, Clone)]
39pub struct SubagentResult {
40 pub task: String,
42 pub result: String,
44 pub success: bool,
46 pub error: Option<String>,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
52pub struct ParallelAnalyzeResult {
53 pub results: Vec<SubagentResult>,
55 pub successful: usize,
57 pub failed: usize,
59 pub execution_time_ms: u64,
61}
62
63#[derive(Clone)]
65enum SubagentRunner {
66 OpenAI {
67 client: openai::Client,
68 model: String,
69 },
70 Anthropic {
71 client: anthropic::Client,
72 model: String,
73 },
74 Gemini {
75 client: gemini::Client,
76 model: String,
77 },
78}
79
80impl SubagentRunner {
81 fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
82 match provider {
83 "openai" => {
84 let client = Self::resolve_openai_client(api_key)?;
85 Ok(Self::OpenAI {
86 client,
87 model: model.to_string(),
88 })
89 }
90 "anthropic" => {
91 let client = Self::resolve_anthropic_client(api_key)?;
92 Ok(Self::Anthropic {
93 client,
94 model: model.to_string(),
95 })
96 }
97 "google" | "gemini" => {
98 let client = Self::resolve_gemini_client(api_key)?;
99 Ok(Self::Gemini {
100 client,
101 model: model.to_string(),
102 })
103 }
104 _ => Err(anyhow::anyhow!(
105 "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
106 provider
107 )),
108 }
109 }
110
111 fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
116 let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
117 match resolved_key {
118 Some(key) => openai::Client::new(&key)
119 .map_err(|_| {
121 anyhow::anyhow!(
122 "Failed to create OpenAI client: authentication or configuration error"
123 )
124 }),
125 None => Ok(openai::Client::from_env()),
126 }
127 }
128
129 fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
134 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
135 match resolved_key {
136 Some(key) => anthropic::Client::new(&key)
137 .map_err(|_| {
139 anyhow::anyhow!(
140 "Failed to create Anthropic client: authentication or configuration error"
141 )
142 }),
143 None => Ok(anthropic::Client::from_env()),
144 }
145 }
146
147 fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
152 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
153 match resolved_key {
154 Some(key) => gemini::Client::new(&key)
155 .map_err(|_| {
157 anyhow::anyhow!(
158 "Failed to create Gemini client: authentication or configuration error"
159 )
160 }),
161 None => Ok(gemini::Client::from_env()),
162 }
163 }
164
165 async fn run_task(&self, task: &str) -> SubagentResult {
166 let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
167 task thoroughly and return a focused summary.\n\n\
168 Guidelines:\n\
169 - Use the available tools to gather necessary information\n\
170 - Focus only on what's asked\n\
171 - Return a clear, structured summary\n\
172 - Be concise but comprehensive";
173
174 let result = match self {
176 Self::OpenAI { client, model } => {
177 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
178 let agent = crate::attach_core_tools!(builder).build();
179 agent.prompt(task).await
180 }
181 Self::Anthropic { client, model } => {
182 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
183 let agent = crate::attach_core_tools!(builder).build();
184 agent.prompt(task).await
185 }
186 Self::Gemini { client, model } => {
187 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
188 let agent = crate::attach_core_tools!(builder).build();
189 agent.prompt(task).await
190 }
191 };
192
193 match result {
194 Ok(response) => SubagentResult {
195 task: task.to_string(),
196 result: response,
197 success: true,
198 error: None,
199 },
200 Err(e) => SubagentResult {
201 task: task.to_string(),
202 result: String::new(),
203 success: false,
204 error: Some(e.to_string()),
205 },
206 }
207 }
208}
209
210pub struct ParallelAnalyze {
213 runner: SubagentRunner,
214 model: String,
215 timeout_secs: u64,
217}
218
219impl ParallelAnalyze {
220 pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
222 Self::with_timeout(provider, model, DEFAULT_SUBAGENT_TIMEOUT_SECS, api_key)
223 }
224
225 pub fn with_timeout(
227 provider: &str,
228 model: &str,
229 timeout_secs: u64,
230 api_key: Option<&str>,
231 ) -> Result<Self> {
232 let runner = SubagentRunner::new(provider, model, api_key).map_err(|e| {
235 anyhow::anyhow!(
236 "Failed to create {} runner: {}. Check API key and network connectivity.",
237 provider,
238 e
239 )
240 })?;
241
242 Ok(Self {
243 runner,
244 model: model.to_string(),
245 timeout_secs,
246 })
247 }
248}
249
250crate::define_tool_error!(ParallelAnalyzeError);
252
253impl Tool for ParallelAnalyze {
254 const NAME: &'static str = "parallel_analyze";
255 type Error = ParallelAnalyzeError;
256 type Args = ParallelAnalyzeArgs;
257 type Output = ParallelAnalyzeResult;
258
259 async fn definition(&self, _prompt: String) -> ToolDefinition {
260 ToolDefinition {
261 name: Self::NAME.to_string(),
262 description: "Run multiple analysis tasks in parallel using independent subagents. \
263 Each subagent has its own context window, preventing overflow when \
264 analyzing large changesets. Use this when you have multiple independent \
265 analysis tasks that can run concurrently.\n\n\
266 Best for:\n\
267 - Analyzing different directories/modules separately\n\
268 - Processing many commits in batches\n\
269 - Running different types of analysis (security, performance, style) in parallel\n\n\
270 Each task should be a focused prompt. Results are aggregated and returned."
271 .to_string(),
272 parameters: json!({
273 "type": "object",
274 "properties": {
275 "tasks": {
276 "type": "array",
277 "items": { "type": "string" },
278 "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
279 "minItems": 1,
280 "maxItems": 10
281 }
282 },
283 "required": ["tasks"]
284 }),
285 }
286 }
287
288 #[allow(clippy::cognitive_complexity)]
289 async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
290 use std::time::Instant;
291
292 let start = Instant::now();
293 let tasks = args.tasks;
294 let num_tasks = tasks.len();
295
296 agent_debug::debug_context_management(
297 "ParallelAnalyze",
298 &format!(
299 "Spawning {} subagents (fast model: {})",
300 num_tasks, self.model
301 ),
302 );
303
304 let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
306 Arc::new(Mutex::new(vec![None; num_tasks]));
307
308 let mut handles = Vec::new();
310 let timeout = Duration::from_secs(self.timeout_secs);
311 for (index, task) in tasks.into_iter().enumerate() {
312 let runner = self.runner.clone();
313 let results = Arc::clone(&results);
314 let task_timeout = timeout;
315 let timeout_secs = self.timeout_secs;
316
317 let handle = tokio::spawn(async move {
318 let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
320 {
321 Ok(result) => result,
322 Err(_) => SubagentResult {
323 task: task.clone(),
324 result: String::new(),
325 success: false,
326 error: Some(format!("Task timed out after {} seconds", timeout_secs)),
327 },
328 };
329
330 let mut guard = results.lock().await;
332 guard[index] = Some(result);
333 });
334
335 handles.push(handle);
336 }
337
338 for handle in handles {
340 if let Err(e) = handle.await {
341 agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
342 }
343 }
344
345 #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
346 let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
347
348 let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
350 .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
351 .into_inner()
352 .into_iter()
353 .enumerate()
354 .map(|(i, opt)| {
355 opt.unwrap_or_else(|| SubagentResult {
356 task: format!("Task {}", i),
357 result: String::new(),
358 success: false,
359 error: Some("Task did not complete".to_string()),
360 })
361 })
362 .collect();
363
364 let successful = final_results.iter().filter(|r| r.success).count();
365 let failed = final_results.iter().filter(|r| !r.success).count();
366
367 agent_debug::debug_context_management(
368 "ParallelAnalyze",
369 &format!(
370 "{}/{} successful in {}ms",
371 successful, num_tasks, execution_time_ms
372 ),
373 );
374
375 Ok(ParallelAnalyzeResult {
376 results: final_results,
377 successful,
378 failed,
379 execution_time_ms,
380 })
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn test_parallel_analyze_args_schema() {
390 let schema = schemars::schema_for!(ParallelAnalyzeArgs);
391 let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
392 assert!(json.contains("tasks"));
393 }
394}