Skip to main content

git_iris/agents/tools/
parallel_analyze.rs

1//! Parallel Analysis Tool
2//!
3//! Enables Iris to spawn multiple independent subagents that analyze different
4//! portions of a codebase concurrently. This prevents context overflow when
5//! dealing with large changesets by distributing work across separate context windows.
6
7use anyhow::Result;
8use rig::{
9    client::{CompletionClient, ProviderClient},
10    completion::{Prompt, ToolDefinition},
11    providers::{anthropic, gemini, openai},
12    tool::Tool,
13};
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16use serde_json::json;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22use crate::agents::debug as agent_debug;
23use crate::agents::provider::{
24    CompletionProfile, anthropic_agent_builder, apply_completion_params, provider_from_name,
25    resolve_api_key,
26};
27use crate::providers::Provider;
28
29/// Default timeout for individual subagent tasks (2 minutes)
30const DEFAULT_SUBAGENT_TIMEOUT_SECS: u64 = 120;
31const DEFAULT_SUBAGENT_MAX_TURNS: usize = 20;
32
33/// Arguments for parallel analysis
34#[derive(Debug, Deserialize, JsonSchema)]
35pub struct ParallelAnalyzeArgs {
36    /// List of analysis tasks to run in parallel.
37    /// Each task should be a focused prompt describing what to analyze.
38    /// Example: `["Analyze security changes in auth/", "Review performance in db/"]`
39    pub tasks: Vec<String>,
40    /// Optional turn budget for each subagent. Defaults to the configured value.
41    #[serde(default)]
42    pub max_turns: Option<usize>,
43}
44
45/// Result from a single subagent analysis
46#[derive(Debug, Serialize, Deserialize, Clone)]
47pub struct SubagentResult {
48    /// The original task prompt
49    pub task: String,
50    /// The analysis result
51    pub result: String,
52    /// Whether the analysis succeeded
53    pub success: bool,
54    /// Error message if failed
55    pub error: Option<String>,
56}
57
58/// Aggregated results from all parallel analyses
59#[derive(Debug, Serialize, Deserialize)]
60pub struct ParallelAnalyzeResult {
61    /// Results from each subagent
62    pub results: Vec<SubagentResult>,
63    /// Number of successful analyses
64    pub successful: usize,
65    /// Number of failed analyses
66    pub failed: usize,
67    /// Total execution time in milliseconds
68    pub execution_time_ms: u64,
69}
70
71/// Provider-specific subagent runner
72#[derive(Clone)]
73enum SubagentRunner {
74    OpenAI {
75        client: openai::Client,
76        model: String,
77        additional_params: HashMap<String, String>,
78    },
79    Anthropic {
80        client: anthropic::Client,
81        model: String,
82        additional_params: HashMap<String, String>,
83    },
84    Gemini {
85        client: gemini::Client,
86        model: String,
87        additional_params: HashMap<String, String>,
88    },
89}
90
91impl SubagentRunner {
92    fn new(
93        provider: &str,
94        model: &str,
95        api_key: Option<&str>,
96        additional_params: HashMap<String, String>,
97    ) -> Result<Self> {
98        match provider {
99            "openai" => {
100                let client = Self::resolve_openai_client(api_key)?;
101                Ok(Self::OpenAI {
102                    client,
103                    model: model.to_string(),
104                    additional_params,
105                })
106            }
107            "anthropic" => {
108                let client = Self::resolve_anthropic_client(api_key)?;
109                Ok(Self::Anthropic {
110                    client,
111                    model: model.to_string(),
112                    additional_params,
113                })
114            }
115            "google" | "gemini" => {
116                let client = Self::resolve_gemini_client(api_key)?;
117                Ok(Self::Gemini {
118                    client,
119                    model: model.to_string(),
120                    additional_params,
121                })
122            }
123            _ => Err(anyhow::anyhow!(
124                "Unsupported provider for parallel analysis: {}. Supported: openai, anthropic, google",
125                provider
126            )),
127        }
128    }
129
130    /// Create `OpenAI` client using shared resolution logic
131    ///
132    /// Uses `resolve_api_key` from provider module to maintain consistent
133    /// resolution order: config → env var → client default
134    fn resolve_openai_client(api_key: Option<&str>) -> Result<openai::Client> {
135        let (resolved_key, _source) = resolve_api_key(api_key, Provider::OpenAI);
136        match resolved_key {
137            Some(key) => openai::Client::new(&key)
138                // Sanitize error to avoid exposing key material
139                .map_err(|_| {
140                    anyhow::anyhow!(
141                        "Failed to create OpenAI client: authentication or configuration error"
142                    )
143                }),
144            None => openai::Client::from_env()
145                .map_err(|_| anyhow::anyhow!("Failed to create OpenAI client from environment")),
146        }
147    }
148
149    /// Create `Anthropic` client using shared resolution logic
150    ///
151    /// Uses `resolve_api_key` from provider module to maintain consistent
152    /// resolution order: config → env var → client default
153    fn resolve_anthropic_client(api_key: Option<&str>) -> Result<anthropic::Client> {
154        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Anthropic);
155        match resolved_key {
156            Some(key) => anthropic::Client::new(&key)
157                // Sanitize error to avoid exposing key material
158                .map_err(|_| {
159                    anyhow::anyhow!(
160                        "Failed to create Anthropic client: authentication or configuration error"
161                    )
162                }),
163            None => anthropic::Client::from_env()
164                .map_err(|_| anyhow::anyhow!("Failed to create Anthropic client from environment")),
165        }
166    }
167
168    /// Create `Gemini` client using shared resolution logic
169    ///
170    /// Uses `resolve_api_key` from provider module to maintain consistent
171    /// resolution order: config → env var → client default
172    fn resolve_gemini_client(api_key: Option<&str>) -> Result<gemini::Client> {
173        let (resolved_key, _source) = resolve_api_key(api_key, Provider::Google);
174        match resolved_key {
175            Some(key) => gemini::Client::new(&key)
176                // Sanitize error to avoid exposing key material
177                .map_err(|_| {
178                    anyhow::anyhow!(
179                        "Failed to create Gemini client: authentication or configuration error"
180                    )
181                }),
182            None => gemini::Client::from_env()
183                .map_err(|_| anyhow::anyhow!("Failed to create Gemini client from environment")),
184        }
185    }
186
187    async fn run_task(&self, task: &str, max_turns: usize) -> SubagentResult {
188        let preamble = "You are a specialized analysis sub-agent. Complete the assigned \
189            task thoroughly and return a focused summary.\n\n\
190            Guidelines:\n\
191            - Use the available tools to gather necessary information\n\
192            - Focus only on what's asked\n\
193            - Return a clear, structured summary\n\
194            - Be concise but comprehensive";
195
196        // Use shared tool registry for consistent tool attachment
197        let result = match self {
198            Self::OpenAI {
199                client,
200                model,
201                additional_params,
202            } => {
203                let builder = client.agent(model).preamble(preamble);
204                let builder = apply_completion_params(
205                    builder,
206                    Provider::OpenAI,
207                    model,
208                    4096,
209                    Some(additional_params),
210                    CompletionProfile::Subagent,
211                );
212                let agent = crate::attach_core_tools!(builder).build();
213                agent.prompt(task).max_turns(max_turns).await
214            }
215            Self::Anthropic {
216                client,
217                model,
218                additional_params,
219            } => {
220                let builder = anthropic_agent_builder(client, model).preamble(preamble);
221                let builder = apply_completion_params(
222                    builder,
223                    Provider::Anthropic,
224                    model,
225                    4096,
226                    Some(additional_params),
227                    CompletionProfile::Subagent,
228                );
229                let agent = crate::attach_core_tools!(builder).build();
230                agent.prompt(task).max_turns(max_turns).await
231            }
232            Self::Gemini {
233                client,
234                model,
235                additional_params,
236            } => {
237                let builder = client.agent(model).preamble(preamble);
238                let builder = apply_completion_params(
239                    builder,
240                    Provider::Google,
241                    model,
242                    4096,
243                    Some(additional_params),
244                    CompletionProfile::Subagent,
245                );
246                let agent = crate::attach_core_tools!(builder).build();
247                agent.prompt(task).max_turns(max_turns).await
248            }
249        };
250
251        match result {
252            Ok(response) => SubagentResult {
253                task: task.to_string(),
254                result: response,
255                success: true,
256                error: None,
257            },
258            Err(e) => SubagentResult {
259                task: task.to_string(),
260                result: format!("Subagent failed: {e}"),
261                success: false,
262                error: Some(e.to_string()),
263            },
264        }
265    }
266}
267
268/// Parallel analysis tool
269/// Spawns multiple subagents to analyze different aspects concurrently
270pub struct ParallelAnalyze {
271    runner: SubagentRunner,
272    model: String,
273    /// Timeout in seconds for each subagent task
274    timeout_secs: u64,
275    /// Default turn budget for each subagent task
276    max_turns: usize,
277}
278
279impl ParallelAnalyze {
280    /// Create a new parallel analyzer with default timeout
281    ///
282    /// # Errors
283    ///
284    /// Returns an error when the requested provider runner cannot be created.
285    pub fn new(provider: &str, model: &str, api_key: Option<&str>) -> Result<Self> {
286        Self::with_timeout(
287            provider,
288            model,
289            DEFAULT_SUBAGENT_TIMEOUT_SECS,
290            api_key,
291            None,
292        )
293    }
294
295    /// Create a new parallel analyzer with custom timeout
296    ///
297    /// # Errors
298    ///
299    /// Returns an error when the requested provider runner cannot be created.
300    pub fn with_timeout(
301        provider: &str,
302        model: &str,
303        timeout_secs: u64,
304        api_key: Option<&str>,
305        additional_params: Option<HashMap<String, String>>,
306    ) -> Result<Self> {
307        Self::with_limits(
308            provider,
309            model,
310            timeout_secs,
311            DEFAULT_SUBAGENT_MAX_TURNS,
312            api_key,
313            additional_params,
314        )
315    }
316
317    /// Create a new parallel analyzer with custom timeout and turn budget.
318    /// The turn budget is clamped to 1..=100.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error when the requested provider runner cannot be created.
323    pub fn with_limits(
324        provider: &str,
325        model: &str,
326        timeout_secs: u64,
327        max_turns: usize,
328        api_key: Option<&str>,
329        additional_params: Option<HashMap<String, String>>,
330    ) -> Result<Self> {
331        let provider_name = provider_from_name(provider)?;
332        // Create runner for the requested provider - no silent fallback
333        // If the user configures Anthropic, they should get Anthropic or a clear error
334        let runner = SubagentRunner::new(
335            provider_name.name(),
336            model,
337            api_key,
338            additional_params.unwrap_or_default(),
339        )
340        .map_err(|e| {
341            anyhow::anyhow!(
342                "Failed to create {} runner: {}. Check API key and network connectivity.",
343                provider,
344                e
345            )
346        })?;
347
348        Ok(Self {
349            runner,
350            model: model.to_string(),
351            timeout_secs,
352            max_turns: max_turns.clamp(1, 100),
353        })
354    }
355}
356
357// Use standard tool error macro for consistency
358crate::define_tool_error!(ParallelAnalyzeError);
359
360impl Tool for ParallelAnalyze {
361    const NAME: &'static str = "parallel_analyze";
362    type Error = ParallelAnalyzeError;
363    type Args = ParallelAnalyzeArgs;
364    type Output = ParallelAnalyzeResult;
365
366    async fn definition(&self, _prompt: String) -> ToolDefinition {
367        ToolDefinition {
368            name: Self::NAME.to_string(),
369            description: "Run multiple analysis tasks in parallel using independent subagents. \
370                         Each subagent has its own context window, preventing overflow when \
371                         analyzing large changesets. Use this when you have multiple independent \
372                         analysis tasks that can run concurrently.\n\n\
373                         Best for:\n\
374                         - Analyzing different directories/modules separately\n\
375                         - Processing many commits in batches\n\
376                         - Running different types of analysis (security, performance, style) in parallel\n\n\
377                         Each task should be a focused prompt. Results are aggregated and returned."
378                .to_string(),
379            parameters: json!({
380                "type": "object",
381                "properties": {
382                    "tasks": {
383                        "type": "array",
384                        "items": { "type": "string" },
385                        "description": "List of analysis task prompts to run in parallel. Each task runs in its own subagent with independent context.",
386                        "minItems": 1,
387                        "maxItems": 10
388                    },
389                    "max_turns": {
390                        "type": "integer",
391                        "description": "Optional per-subagent turn budget. Increase for broad repository searches; lower it to cap cost or runaway tool loops.",
392                        "minimum": 1,
393                        "maximum": 100
394                    }
395                },
396                "required": ["tasks"]
397            }),
398        }
399    }
400
401    #[allow(clippy::cognitive_complexity)]
402    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
403        use std::time::Instant;
404
405        let start = Instant::now();
406        let max_turns = args.max_turns.unwrap_or(self.max_turns).clamp(1, 100);
407        let tasks = args.tasks;
408        let num_tasks = tasks.len();
409
410        agent_debug::debug_context_management(
411            "ParallelAnalyze",
412            &format!(
413                "Spawning {} subagents (fast model: {}, max turns: {})",
414                num_tasks, self.model, max_turns
415            ),
416        );
417
418        let initial_results = tasks
419            .iter()
420            .map(|task| {
421                Some(SubagentResult {
422                    task: task.clone(),
423                    result: "Subagent task did not complete".to_string(),
424                    success: false,
425                    error: Some("Task did not complete".to_string()),
426                })
427            })
428            .collect();
429        let results: Arc<Mutex<Vec<Option<SubagentResult>>>> =
430            Arc::new(Mutex::new(initial_results));
431
432        // Spawn all tasks as parallel tokio tasks, tracking index for ordering
433        let mut handles = Vec::new();
434        let timeout = Duration::from_secs(self.timeout_secs);
435        for (index, task) in tasks.into_iter().enumerate() {
436            let runner = self.runner.clone();
437            let results = Arc::clone(&results);
438            let task_timeout = timeout;
439            let timeout_secs = self.timeout_secs;
440            let task_max_turns = max_turns;
441
442            let handle = tokio::spawn(async move {
443                // Wrap task execution in timeout to prevent hanging
444                let result = match tokio::time::timeout(
445                    task_timeout,
446                    runner.run_task(&task, task_max_turns),
447                )
448                .await
449                {
450                    Ok(result) => result,
451                    Err(_) => SubagentResult {
452                        task: task.clone(),
453                        result: format!("Subagent timed out after {timeout_secs} seconds"),
454                        success: false,
455                        error: Some(format!("Task timed out after {} seconds", timeout_secs)),
456                    },
457                };
458
459                // Store result at original index to preserve ordering
460                let mut guard = results.lock().await;
461                guard[index] = Some(result);
462            });
463
464            handles.push(handle);
465        }
466
467        // Wait for all tasks to complete
468        for handle in handles {
469            if let Err(e) = handle.await {
470                agent_debug::debug_warning(&format!("Subagent task panicked: {}", e));
471            }
472        }
473
474        #[allow(clippy::cast_possible_truncation, clippy::as_conversions)]
475        let execution_time_ms = start.elapsed().as_millis().min(u128::from(u64::MAX)) as u64;
476
477        // Extract results, preserving original task order
478        let final_results: Vec<SubagentResult> = Arc::try_unwrap(results)
479            .map_err(|_| ParallelAnalyzeError("Failed to unwrap results".to_string()))?
480            .into_inner()
481            .into_iter()
482            .enumerate()
483            .map(|(i, opt)| {
484                opt.unwrap_or_else(|| SubagentResult {
485                    task: format!("Task {}", i),
486                    result: "Subagent task did not complete".to_string(),
487                    success: false,
488                    error: Some("Task did not complete".to_string()),
489                })
490            })
491            .collect();
492
493        let successful = final_results.iter().filter(|r| r.success).count();
494        let failed = final_results.iter().filter(|r| !r.success).count();
495
496        agent_debug::debug_context_management(
497            "ParallelAnalyze",
498            &format!(
499                "{}/{} successful in {}ms",
500                successful, num_tasks, execution_time_ms
501            ),
502        );
503
504        Ok(ParallelAnalyzeResult {
505            results: final_results,
506            successful,
507            failed,
508            execution_time_ms,
509        })
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516
517    #[test]
518    fn test_parallel_analyze_args_schema() {
519        let schema = schemars::schema_for!(ParallelAnalyzeArgs);
520        let json = serde_json::to_string_pretty(&schema).expect("schema should serialize");
521        assert!(json.contains("tasks"));
522        assert!(json.contains("max_turns"));
523    }
524}