Skip to main content

git_iris/agents/tools/
parallel_analyze.rs

1//! Parallel Analysis Tool
2//!
3//! Enables Iris to spawn multiple independent subagents that analyze different
4//! portions of a codebase concurrently. This prevents context overflow when
5//! dealing with large changesets by distributing work across separate context windows.
6
7use anyhow::Result;
8use rig::{
9    client::{CompletionClient, ProviderClient},
10    completion::{Prompt, ToolDefinition},
11    providers::{anthropic, gemini, openai},
12    tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22use crate::agents::debug as agent_debug;
23use crate::agents::provider::{
24    CompletionProfile, apply_completion_params, provider_from_name, resolve_api_key,
25};
26use crate::providers::Provider;
27
28/// Default timeout for individual subagent tasks (2 minutes)
29const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
30
31/// Arguments for parallel analysis
32#[derive(Debug, Deserialize, JsonSchema)]
33pub struct ParallelAnalyzeArgs {
34    /// List of analysis tasks to run in parallel.
35    /// Each task should be a focused prompt describing what to analyze.
36    /// Example: `["Analyze security changes in auth/", "Review performance in db/"]`
37    pub tasks: Vec<String>,
38}
39
40/// Result from a single subagent analysis
41#[derive(Debug, Serialize, Deserialize, Clone)]
42pub struct SubagentResult {
43    /// The original task prompt
44    pub task: String,
45    /// The analysis result
46    pub result: String,
47    /// Whether the analysis succeeded
48    pub success: bool,
49    /// Error message if failed
50    pub error: Option<String>,
51}
52
53/// Aggregated results from all parallel analyses
54#[derive(Debug, Serialize, Deserialize)]
55pub struct ParallelAnalyzeResult {
56    /// Results from each subagent
57    pub results: Vec<SubagentResult>,
58    /// Number of successful analyses
59    pub successful: usize,
60    /// Number of failed analyses
61    pub failed: usize,
62    /// Total execution time in milliseconds
63    pub execution_time_ms: u64,
64}
65
66/// Provider-specific subagent runner
67#[derive(Clone)]
68enum SubagentRunner {
69    OpenAI {
70        client: openai::Client,
71        model: String,
72        additional_params: HashMap<String, String>,
73    },
74    Anthropic {
75        client: anthropic::Client,
76        model: String,
77        additional_params: HashMap<String, String>,
78    },
79    Gemini {
80        client: gemini::Client,
81        model: String,
82        additional_params: HashMap<String, String>,
83    },
84}
85
86impl SubagentRunner {
87    fn new(
88        provider: &str,
89        model: &str,
90        api_key: Option<&str>,
91        additional_params: HashMap<String, String>,
92    ) -> Result<Self> {
93        match provider {
94            "openai" => {
95                let client = Self::resolve_openai_client(api_key)?;
96                Ok(Self::OpenAI {
97                    client,
98                    model: model.to_string(),
99                    additional_params,
100                })
101            }
102            "anthropic" => {
103                let client = Self::resolve_anthropic_client(api_key)?;
104                Ok(Self::Anthropic {
105                    client,
106                    model: model.to_string(),
107                    additional_params,
108                })
109            }
110            "google" | "gemini" => {
111                let client = Self::resolve_gemini_client(api_key)?;
112                Ok(Self::Gemini {
113                    client,
114                    model: model.to_string(),
115                    additional_params,
116                })
117            }
118            _ => Err(anyhow::anyhow!(
119                "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
120                provider
121            )),
122        }
123    }
124
125    /// Create `OpenAI` client using shared resolution logic
126    ///
127    /// Uses `resolve_api_key` from provider module to maintain consistent
128    /// resolution order: config → env var → client default
129    fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
130        let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
131        match resolved_key {
132            Some(key) => openai::Client::new(&key)
133                // Sanitize error to avoid exposing key material
134                .map_err(|_| {
135                    anyhow::anyhow!(
136                        "Failed to create OpenAI client: authentication or configuration error"
137                    )
138                }),
139            None => openai::Client::from_env()
140                .map_err(|_| anyhow::anyhow!("Failed to create OpenAI client from environment")),
141        }
142    }
143
144    /// Create `Anthropic` client using shared resolution logic
145    ///
146    /// Uses `resolve_api_key` from provider module to maintain consistent
147    /// resolution order: config → env var → client default
148    fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
149        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
150        match resolved_key {
151            Some(key) => anthropic::Client::new(&key)
152                // Sanitize error to avoid exposing key material
153                .map_err(|_| {
154                    anyhow::anyhow!(
155                        "Failed to create Anthropic client: authentication or configuration error"
156                    )
157                }),
158            None => anthropic::Client::from_env()
159                .map_err(|_| anyhow::anyhow!("Failed to create Anthropic client from environment")),
160        }
161    }
162
163    /// Create `Gemini` client using shared resolution logic
164    ///
165    /// Uses `resolve_api_key` from provider module to maintain consistent
166    /// resolution order: config → env var → client default
167    fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
168        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
169        match resolved_key {
170            Some(key) => gemini::Client::new(&key)
171                // Sanitize error to avoid exposing key material
172                .map_err(|_| {
173                    anyhow::anyhow!(
174                        "Failed to create Gemini client: authentication or configuration error"
175                    )
176                }),
177            None => gemini::Client::from_env()
178                .map_err(|_| anyhow::anyhow!("Failed to create Gemini client from environment")),
179        }
180    }
181
182    async fn run_task(&self, task: &str) -> SubagentResult {
183        let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
184            task thoroughly and return a focused summary.\n\n\
185            Guidelines:\n\
186            - Use the available tools to gather necessary information\n\
187            - Focus only on what's asked\n\
188            - Return a clear, structured summary\n\
189            - Be concise but comprehensive";
190
191        // Use shared tool registry for consistent tool attachment
192        let result = match self {
193            Self::OpenAI {
194                client,
195                model,
196                additional_params,
197            } => {
198                let builder = client.agent(model).preamble(preamble);
199                let builder = apply_completion_params(
200                    builder,
201                    Provider::OpenAI,
202                    model,
203                    4096,
204                    Some(additional_params),
205                    CompletionProfile::Subagent,
206                );
207                let agent = crate::attach_core_tools!(builder).build();
208                agent.prompt(task).await
209            }
210            Self::Anthropic {
211                client,
212                model,
213                additional_params,
214            } => {
215                let builder = client.agent(model).preamble(preamble);
216                let builder = apply_completion_params(
217                    builder,
218                    Provider::Anthropic,
219                    model,
220                    4096,
221                    Some(additional_params),
222                    CompletionProfile::Subagent,
223                );
224                let agent = crate::attach_core_tools!(builder).build();
225                agent.prompt(task).await
226            }
227            Self::Gemini {
228                client,
229                model,
230                additional_params,
231            } => {
232                let builder = client.agent(model).preamble(preamble);
233                let builder = apply_completion_params(
234                    builder,
235                    Provider::Google,
236                    model,
237                    4096,
238                    Some(additional_params),
239                    CompletionProfile::Subagent,
240                );
241                let agent = crate::attach_core_tools!(builder).build();
242                agent.prompt(task).await
243            }
244        };
245
246        match result {
247            Ok(response) => SubagentResult {
248                task: task.to_string(),
249                result: response,
250                success: true,
251                error: None,
252            },
253            Err(e) => SubagentResult {
254                task: task.to_string(),
255                result: String::new(),
256                success: false,
257                error: Some(e.to_string()),
258            },
259        }
260    }
261}
262
263/// Parallel analysis tool
264/// Spawns multiple subagents to analyze different aspects concurrently
265pub struct ParallelAnalyze {
266    runner: SubagentRunner,
267    model: String,
268    /// Timeout in seconds for each subagent task
269    timeout_secs: u64,
270}
271
272impl ParallelAnalyze {
273    /// Create a new parallel analyzer with default timeout
274    ///
275    /// # Errors
276    ///
277    /// Returns an error when the requested provider runner cannot be created.
278    pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
279        Self::with_timeout(
280            provider,
281            model,
282            DEFAULT_SUBAGENT_TIMEOUT_SECS,
283            api_key,
284            None,
285        )
286    }
287
288    /// Create a new parallel analyzer with custom timeout
289    ///
290    /// # Errors
291    ///
292    /// Returns an error when the requested provider runner cannot be created.
293    pub fn with_timeout(
294        provider: &str,
295        model: &str,
296        timeout_secs: u64,
297        api_key: Option<&str>,
298        additional_params: Option<HashMap<String, String>>,
299    ) -> Result<Self> {
300        let provider_name = provider_from_name(provider)?;
301        // Create runner for the requested provider - no silent fallback
302        // If the user configures Anthropic, they should get Anthropic or a clear error
303        let runner = SubagentRunner::new(
304            provider_name.name(),
305            model,
306            api_key,
307            additional_params.unwrap_or_default(),
308        )
309        .map_err(|e| {
310            anyhow::anyhow!(
311                "Failed to create {} runner: {}. Check API key and network connectivity.",
312                provider,
313                e
314            )
315        })?;
316
317        Ok(Self {
318            runner,
319            model: model.to_string(),
320            timeout_secs,
321        })
322    }
323}
324
325// Use standard tool error macro for consistency
326crate::define_tool_error!(ParallelAnalyzeError);
327
328impl Tool for ParallelAnalyze {
329    const NAME: &'static str = "parallel_analyze";
330    type Error = ParallelAnalyzeError;
331    type Args = ParallelAnalyzeArgs;
332    type Output = ParallelAnalyzeResult;
333
334    async fn definition(&self, _prompt: String) -> ToolDefinition {
335        ToolDefinition {
336            name: Self::NAME.to_string(),
337            description: "Run multiple analysis tasks in parallel using independent subagents. \
338                         Each subagent has its own context window, preventing overflow when \
339                         analyzing large changesets. Use this when you have multiple independent \
340                         analysis tasks that can run concurrently.\n\n\
341                         Best for:\n\
342                         - Analyzing different directories/modules separately\n\
343                         - Processing many commits in batches\n\
344                         - Running different types of analysis (security, performance, style) in parallel\n\n\
345                         Each task should be a focused prompt. Results are aggregated and returned."
346                .to_string(),
347            parameters: json!({
348                "type": "object",
349                "properties": {
350                    "tasks": {
351                        "type": "array",
352                        "items": { "type": "string" },
353                        "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
354                        "minItems": 1,
355                        "maxItems": 10
356                    }
357                },
358                "required": ["tasks"]
359            }),
360        }
361    }
362
363    #[allow(clippy::cognitive_complexity)]
364    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
365        use std::time::Instant;
366
367        let start = Instant::now();
368        let tasks = args.tasks;
369        let num_tasks = tasks.len();
370
371        agent_debug::debug_context_management(
372            "ParallelAnalyze",
373            &format!(
374                "Spawning {} subagents (fast model: {})",
375                num_tasks, self.model
376            ),
377        );
378
379        // Pre-allocate results vector to preserve task ordering
380        let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
381            Arc::new(Mutex::new(vec![None; num_tasks]));
382
383        // Spawn all tasks as parallel tokio tasks, tracking index for ordering
384        let mut handles = Vec::new();
385        let timeout = Duration::from_secs(self.timeout_secs);
386        for (index, task) in tasks.into_iter().enumerate() {
387            let runner = self.runner.clone();
388            let results = Arc::clone(&results);
389            let task_timeout = timeout;
390            let timeout_secs = self.timeout_secs;
391
392            let handle = tokio::spawn(async move {
393                // Wrap task execution in timeout to prevent hanging
394                let result = match tokio::time::timeout(task_timeout, runner.run_task(&task)).await
395                {
396                    Ok(result) => result,
397                    Err(_) => SubagentResult {
398                        task: task.clone(),
399                        result: String::new(),
400                        success: false,
401                        error: Some(format!("Task timed out after {} seconds", timeout_secs)),
402                    },
403                };
404
405                // Store result at original index to preserve ordering
406                let mut guard = results.lock().await;
407                guard[index] = Some(result);
408            });
409
410            handles.push(handle);
411        }
412
413        // Wait for all tasks to complete
414        for handle in handles {
415            if let Err(e) = handle.await {
416                agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
417            }
418        }
419
420        #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
421        let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
422
423        // Extract results, preserving original task order
424        let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
425            .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
426            .into_inner()
427            .into_iter()
428            .enumerate()
429            .map(|(i, opt)| {
430                opt.unwrap_or_else(|| SubagentResult {
431                    task: format!("Task {}", i),
432                    result: String::new(),
433                    success: false,
434                    error: Some("Task did not complete".to_string()),
435                })
436            })
437            .collect();
438
439        let successful = final_results.iter().filter(|r| r.success).count();
440        let failed = final_results.iter().filter(|r| !r.success).count();
441
442        agent_debug::debug_context_management(
443            "ParallelAnalyze",
444            &format!(
445                "{}/{} successful in {}ms",
446                successful, num_tasks, execution_time_ms
447            ),
448        );
449
450        Ok(ParallelAnalyzeResult {
451            results: final_results,
452            successful,
453            failed,
454            execution_time_ms,
455        })
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    #[test]
464    fn test_parallel_analyze_args_schema() {
465        let schema = schemars::schema_for!(ParallelAnalyzeArgs);
466        let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
467        assert!(json.contains("tasks"));
468    }
469}