1use anyhow::Result;
8use rig::{
9 client::{CompletionClient, ProviderClient},
10 completion::{Prompt, ToolDefinition},
11 providers::{anthropic, gemini, openai},
12 tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Mutex;
20
21use crate::agents::debug as agent_debug;
22use crate::agents::provider::resolve_api_key;
23use crate::providers::Provider;
24
25const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
27
28#[derive(Debug, Deserialize, JsonSchema)]
30pub struct ParallelAnalyzeArgs {
31 pub tasks: Vec<String>,
35}
36
37#[derive(Debug, Serialize, Deserialize, Clone)]
39pub struct SubagentResult {
40 pub task: String,
42 pub result: String,
44 pub success: bool,
46 pub error: Option<String>,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
52pub struct ParallelAnalyzeResult {
53 pub results: Vec<SubagentResult>,
55 pub successful: usize,
57 pub failed: usize,
59 pub execution_time_ms: u64,
61}
62
63#[derive(Clone)]
65enum SubagentRunner {
66 OpenAI {
67 client: openai::Client,
68 model: String,
69 },
70 Anthropic {
71 client: anthropic::Client,
72 model: String,
73 },
74 Gemini {
75 client: gemini::Client,
76 model: String,
77 },
78}
79
80impl SubagentRunner {
81 fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
82 match provider {
83 "openai" => {
84 let client = Self::resolve_openai_client(api_key)?;
85 Ok(Self::OpenAI {
86 client,
87 model: model.to_string(),
88 })
89 }
90 "anthropic" => {
91 let client = Self::resolve_anthropic_client(api_key)?;
92 Ok(Self::Anthropic {
93 client,
94 model: model.to_string(),
95 })
96 }
97 "google" | "gemini" => {
98 let client = Self::resolve_gemini_client(api_key)?;
99 Ok(Self::Gemini {
100 client,
101 model: model.to_string(),
102 })
103 }
104 _ => Err(anyhow::anyhow!(
105 "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
106 provider
107 )),
108 }
109 }
110
111 fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
116 let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
117 match resolved_key {
118 Some(key) => openai::Client::new(&key)
119 .map_err(|_| anyhow::anyhow!("Failed to create OpenAI client: authentication or configuration error")),
121 None => Ok(openai::Client::from_env()),
122 }
123 }
124
125 fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
130 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
131 match resolved_key {
132 Some(key) => anthropic::Client::new(&key)
133 .map_err(|_| anyhow::anyhow!("Failed to create Anthropic client: authentication or configuration error")),
135 None => Ok(anthropic::Client::from_env()),
136 }
137 }
138
139 fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
144 let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
145 match resolved_key {
146 Some(key) => gemini::Client::new(&key)
147 .map_err(|_| anyhow::anyhow!("Failed to create Gemini client: authentication or configuration error")),
149 None => Ok(gemini::Client::from_env()),
150 }
151 }
152
153 async fn run_task(&self, task: &str) -> SubagentResult {
154 let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
155 task thoroughly and return a focused summary.\n\n\
156 Guidelines:\n\
157 - Use the available tools to gather necessary information\n\
158 - Focus only on what's asked\n\
159 - Return a clear, structured summary\n\
160 - Be concise but comprehensive";
161
162 let result = match self {
164 Self::OpenAI { client, model } => {
165 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
166 let agent = crate::attach_core_tools!(builder).build();
167 agent.prompt(task).await
168 }
169 Self::Anthropic { client, model } => {
170 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
171 let agent = crate::attach_core_tools!(builder).build();
172 agent.prompt(task).await
173 }
174 Self::Gemini { client, model } => {
175 let builder = client.agent(model).preamble(preamble).max_tokens(4096);
176 let agent = crate::attach_core_tools!(builder).build();
177 agent.prompt(task).await
178 }
179 };
180
181 match result {
182 Ok(response) => SubagentResult {
183 task: task.to_string(),
184 result: response,
185 success: true,
186 error: None,
187 },
188 Err(e) => SubagentResult {
189 task: task.to_string(),
190 result: String::new(),
191 success: false,
192 error: Some(e.to_string()),
193 },
194 }
195 }
196}
197
198pub struct ParallelAnalyze {
201 runner: SubagentRunner,
202 model: String,
203 timeout_secs: u64,
205}
206
207impl ParallelAnalyze {
208 pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
210 Self::with_timeout(provider, model, DEFAULT_SUBAGENT_TIMEOUT_SECS, api_key)
211 }
212
213 pub fn with_timeout(
215 provider: &str,
216 model: &str,
217 timeout_secs: u64,
218 api_key: Option<&str>,
219 ) -> Result<Self> {
220 let runner = SubagentRunner::new(provider, model, api_key).map_err(|e| {
223 anyhow::anyhow!(
224 "Failed to create {} runner: {}. Check API key and network connectivity.",
225 provider,
226 e
227 )
228 })?;
229
230 Ok(Self {
231 runner,
232 model: model.to_string(),
233 timeout_secs,
234 })
235 }
236}
237
238crate::define_tool_error!(ParallelAnalyzeError);
240
241impl Tool for ParallelAnalyze {
242 const NAME: &'static str = "parallel_analyze";
243 type Error = ParallelAnalyzeError;
244 type Args = ParallelAnalyzeArgs;
245 type Output = ParallelAnalyzeResult;
246
247 async fn definition(&self, _prompt: String) -> ToolDefinition {
248 ToolDefinition {
249 name: Self::NAME.to_string(),
250 description: "Run multiple analysis tasks in parallel using independent subagents. \
251 Each subagent has its own context window, preventing overflow when \
252 analyzing large changesets. Use this when you have multiple independent \
253 analysis tasks that can run concurrently.\n\n\
254 Best for:\n\
255 - Analyzing different directories/modules separately\n\
256 - Processing many commits in batches\n\
257 - Running different types of analysis (security, performance, style) in parallel\n\n\
258 Each task should be a focused prompt. Results are aggregated and returned."
259 .to_string(),
260 parameters: json!({
261 "type": "object",
262 "properties": {
263 "tasks": {
264 "type": "array",
265 "items": { "type": "string" },
266 "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
267 "minItems": 1,
268 "maxItems": 10
269 }
270 },
271 "required": ["tasks"]
272 }),
273 }
274 }
275
276 #[allow(clippy::cognitive_complexity)]
277 async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
278 use std::time::Instant;
279
280 let start = Instant::now();
281 let tasks = args.tasks;
282 let num_tasks = tasks.len();
283
284 agent_debug::debug_context_management(
285 "ParallelAnalyze",
286 &format!(
287 "Spawning {} subagents (fast model: {})",
288 num_tasks, self.model
289 ),
290 );
291
292 let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
294 Arc::new(Mutex::new(vec![None; num_tasks]));
295
296 let mut handles = Vec::new();
298 let timeout = Duration::from_secs(self.timeout_secs);
299 for (index, task) in tasks.into_iter().enumerate() {
300 let runner = self.runner.clone();
301 let results = Arc::clone(&results);
302 let task_timeout = timeout;
303 let timeout_secs = self.timeout_secs;
304
305 let handle = tokio::spawn(async move {
306 let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
308 {
309 Ok(result) => result,
310 Err(_) => SubagentResult {
311 task: task.clone(),
312 result: String::new(),
313 success: false,
314 error: Some(format!("Task timed out after {} seconds", timeout_secs)),
315 },
316 };
317
318 let mut guard = results.lock().await;
320 guard[index] = Some(result);
321 });
322
323 handles.push(handle);
324 }
325
326 for handle in handles {
328 if let Err(e) = handle.await {
329 agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
330 }
331 }
332
333 #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
334 let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
335
336 let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
338 .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
339 .into_inner()
340 .into_iter()
341 .enumerate()
342 .map(|(i, opt)| {
343 opt.unwrap_or_else(|| SubagentResult {
344 task: format!("Task {}", i),
345 result: String::new(),
346 success: false,
347 error: Some("Task did not complete".to_string()),
348 })
349 })
350 .collect();
351
352 let successful = final_results.iter().filter(|r| r.success).count();
353 let failed = final_results.iter().filter(|r| !r.success).count();
354
355 agent_debug::debug_context_management(
356 "ParallelAnalyze",
357 &format!(
358 "{}/{} successful in {}ms",
359 successful, num_tasks, execution_time_ms
360 ),
361 );
362
363 Ok(ParallelAnalyzeResult {
364 results: final_results,
365 successful,
366 failed,
367 execution_time_ms,
368 })
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn test_parallel_analyze_args_schema() {
378 let schema = schemars::schema_for!(ParallelAnalyzeArgs);
379 let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
380 assert!(json.contains("tasks"));
381 }
382}